You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/01/12 18:42:48 UTC
[accumulo] branch main updated: Fixes #1665 use old compaction
thread pool config if set (#1861)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 3e23ad8 Fixes #1665 use old compaction thread pool config if set (#1861)
3e23ad8 is described below
commit 3e23ad8eb0e8309b71811ded490603343f070f79
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jan 12 13:42:39 2021 -0500
Fixes #1665 use old compaction thread pool config if set (#1861)
---
.../tserver/compactions/CompactionManager.java | 74 +++++++++++++++++++++-
1 file changed, 71 insertions(+), 3 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index af09499..2e3084c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.server.ServerContext;
@@ -62,14 +63,16 @@ public class CompactionManager {
public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default");
- private static class Config {
+ private String lastDeprecationWarning = "";
+
+ private class Config {
Map<String,String> planners = new HashMap<>();
Map<String,Long> rateLimits = new HashMap<>();
Map<String,Map<String,String>> options = new HashMap<>();
long defaultRateLimit = 0;
@SuppressWarnings("removal")
- private static long getDefaultThroughput(AccumuloConfiguration aconf) {
+ private long getDefaultThroughput(AccumuloConfiguration aconf) {
if (aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true)) {
return aconf.getAsBytes(Property.TSERV_MAJC_THROUGHPUT);
}
@@ -78,10 +81,75 @@ public class CompactionManager {
.getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
}
- Config(AccumuloConfiguration aconf) {
+ @SuppressWarnings("removal")
+ private Map<String,String> getConfiguration(AccumuloConfiguration aconf) {
+
Map<String,String> configs =
aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX);
+ // check if deprecated properties for compaction executor are set
+ if (aconf.isPropertySet(Property.TSERV_MAJC_MAXCONCURRENT, true)) {
+
+ String defaultServicePrefix =
+ Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + DEFAULT_SERVICE.canonical() + ".";
+
+ // check if any properties for the default compaction service are set
+ boolean defaultServicePropsSet = configs.keySet().stream()
+ .filter(key -> key.startsWith(defaultServicePrefix)).map(Property::getPropertyByKey)
+ .anyMatch(prop -> prop == null || aconf.isPropertySet(prop, true));
+
+ if (defaultServicePropsSet) {
+
+ String warning = String.format(
+ "The deprecated property %s was set. Properties with the prefix %s "
+ + "were also set, which replace the deprecated properties. The deprecated "
+ + "property was therefore ignored.",
+ Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix);
+
+ if (!warning.equals(lastDeprecationWarning)) {
+ log.warn(warning);
+ lastDeprecationWarning = warning;
+ }
+ } else {
+ String numThreads = aconf.get(Property.TSERV_MAJC_MAXCONCURRENT);
+
+ // Its possible a user has configured the others compaction services, but not the default
+ // service. In this case want to produce a config with the default service configs
+ // overridden using deprecated configs.
+
+ HashMap<String,String> configsCopy = new HashMap<>(configs);
+
+ Map<String,String> defaultServiceConfigs =
+ Map.of(defaultServicePrefix + "planner", DefaultCompactionPlanner.class.getName(),
+ defaultServicePrefix + "planner.opts.executors",
+ "[{'name':'deprecated','numThreads':" + numThreads + "}]");
+
+ configsCopy.putAll(defaultServiceConfigs);
+
+ String warning = String.format(
+ "The deprecated property %s was set. Properties with the prefix %s "
+ + "were not set, these should replace the deprecated properties. The old "
+ + "properties were automatically mapped to the new properties in process "
+ + "creating : %s.",
+ Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix,
+ defaultServiceConfigs);
+
+ if (!warning.equals(lastDeprecationWarning)) {
+ log.warn(warning);
+ lastDeprecationWarning = warning;
+ }
+
+ configs = Map.copyOf(configsCopy);
+ }
+ }
+
+ return configs;
+
+ }
+
+ Config(AccumuloConfiguration aconf) {
+ Map<String,String> configs = getConfiguration(aconf);
+
configs.forEach((prop, val) -> {
var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length());