You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dd...@apache.org on 2023/11/25 03:22:52 UTC
(accumulo) branch main updated: Moves compaction properties and adds 'compaction.opts.queue' property. (#3915)
This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 9dd819437a Moves compaction properties and adds 'compaction.opts.queue' property. (#3915)
9dd819437a is described below
commit 9dd819437a5e553004ce329acec369b6394be3ac
Author: Daniel Roberts <dd...@gmail.com>
AuthorDate: Fri Nov 24 22:22:46 2023 -0500
Moves compaction properties and adds 'compaction.opts.queue' property. (#3915)
* Migrated compaction properties away from the "TSERV" prefix to `compaction.service` prefix and added
"queues" property support for defining external compaction queues.
* Adds additional property validation to ignore properties that are
defined using the old compaction service prefix.
* Adds a top-level `compaction` prefix for all compaction based properties
that are separate from specific server functions.
* Moves `warn.time` under this `compaction` prefix.
Passes in the prefix used so fully qualified properties are accurate.
* Adds additional json parsing to compare field names and drop any
additional field information that is not needed by the
DefaultCompactionPlanner.
---------
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../org/apache/accumulo/core/conf/Property.java | 36 ++++-
.../spi/compaction/DefaultCompactionPlanner.java | 179 ++++++++++++++++-----
.../compaction/CompactionPlannerInitParams.java | 9 +-
.../util/compaction/CompactionServicesConfig.java | 112 ++++++++++---
.../compaction/DefaultCompactionPlannerTest.java | 166 ++++++++++++++++++-
.../compaction/CompactionServicesConfigTest.java | 105 ++++++++++++
.../server/compaction/CompactionWatcher.java | 2 +-
.../server/conf/CheckCompactionConfig.java | 3 +-
.../server/conf/CheckCompactionConfigTest.java | 47 +++++-
.../tserver/compactions/CompactionManager.java | 7 +-
.../tserver/compactions/CompactionService.java | 14 +-
.../compaction/BadCompactionServiceConfigIT.java | 2 +-
.../test/compaction/CompactionConfigChangeIT.java | 6 +-
.../test/compaction/CompactionExecutorIT.java | 21 +--
.../apache/accumulo/test/shell/ConfigSetIT.java | 2 +-
.../org/apache/accumulo/test/util/SlowOps.java | 1 +
16 files changed, 612 insertions(+), 100 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 6c1296fa17..3ab9924d8f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -44,6 +44,23 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
public enum Property {
+ COMPACTION_PREFIX("compaction.", null, PropertyType.PREFIX,
+ "Both major and minor compaction properties can be included under this prefix.", "3.1.0"),
+ COMPACTION_SERVICE_PREFIX(COMPACTION_PREFIX + "service.", null, PropertyType.PREFIX,
+ "This prefix should be used to define all properties for the compaction services."
+ + "See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.\n"
+ + "A new external compaction service would be defined like the following:\n"
+ + "`compaction.service.newService.planner="
+ + "\"org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner\".`\n"
+ + "`compaction.service.newService.opts.queues=\""
+ + "[{\"name\": \"small\", \"maxSize\":\"32M\"},"
+ + "{ \"name\":\"medium\", \"maxSize\":\"512M\"},{\"name\":\"large\"}]`\n"
+ + "`compaction.service.newService.opts.maxOpen=50`.\n"
+ + "Additional options can be defined using the `compaction.service.<service>.opts.<option>` property.",
+ "3.1.0"),
+ COMPACTION_WARN_TIME(COMPACTION_PREFIX + "warn.time", "10m", PropertyType.TIMEDURATION,
+ "When a compaction has not made progress for this time period, a warning will be logged.",
+ "3.1.0"),
// SSL properties local to each node (see also instance.ssl.enabled which must be consistent
// across all nodes in an instance)
RPC_PREFIX("rpc.", null, PropertyType.PREFIX,
@@ -205,12 +222,12 @@ public enum Property {
+ "encryption, replace this classname with an implementation of the"
+ "org.apache.accumulo.core.spi.crypto.CryptoFactory interface.",
"2.1.0"),
-
// general properties
GENERAL_PREFIX("general.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of accumulo overall, but"
+ " do not have to be consistent throughout a cloud.",
"1.3.5"),
+
GENERAL_CONTEXT_CLASSLOADER_FACTORY("general.context.class.loader.factory", "",
PropertyType.CLASSNAME,
"Name of classloader factory to be used to create classloaders for named contexts,"
@@ -568,8 +585,11 @@ public enum Property {
"The maximum number of concurrent tablet migrations for a tablet server.", "1.3.5"),
TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s", PropertyType.TIMEDURATION,
"Time a tablet server will sleep between checking which tablets need compaction.", "1.3.5"),
+ @Deprecated(since = "3.1", forRemoval = true)
+ @ReplacedBy(property = COMPACTION_SERVICE_PREFIX)
TSERV_COMPACTION_SERVICE_PREFIX("tserver.compaction.major.service.", null, PropertyType.PREFIX,
"Prefix for compaction services.", "2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_ROOT_PLANNER("tserver.compaction.major.service.root.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for root tablet service.", "2.1.0"),
@@ -579,9 +599,11 @@ public enum Property {
"Maximum number of bytes to read or write per second over all major"
+ " compactions in this compaction service, or 0B for unlimited.",
"2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_ROOT_MAX_OPEN(
"tserver.compaction.major.service.root.planner.opts.maxOpen", "30", PropertyType.COUNT,
"The maximum number of files a compaction will open.", "2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_ROOT_EXECUTORS(
"tserver.compaction.major.service.root.planner.opts.executors",
"[{'name':'small','type':'internal','maxSize':'32M','numThreads':1},{'name':'huge','type':'internal','numThreads':1}]"
@@ -589,6 +611,7 @@ public enum Property {
PropertyType.STRING,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_META_PLANNER("tserver.compaction.major.service.meta.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for metadata table.", "2.1.0"),
@@ -598,9 +621,11 @@ public enum Property {
"Maximum number of bytes to read or write per second over all major"
+ " compactions in this compaction service, or 0B for unlimited.",
"2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_META_MAX_OPEN(
"tserver.compaction.major.service.meta.planner.opts.maxOpen", "30", PropertyType.COUNT,
"The maximum number of files a compaction will open.", "2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_META_EXECUTORS(
"tserver.compaction.major.service.meta.planner.opts.executors",
"[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'huge','type':'internal','numThreads':2}]"
@@ -608,6 +633,7 @@ public enum Property {
PropertyType.JSON,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER("tserver.compaction.major.service.default.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Planner for default compaction service.", "2.1.0"),
@@ -617,9 +643,11 @@ public enum Property {
"Maximum number of bytes to read or write per second over all major"
+ " compactions in this compaction service, or 0B for unlimited.",
"2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN(
"tserver.compaction.major.service.default.planner.opts.maxOpen", "10", PropertyType.COUNT,
"The maximum number of files a compaction will open.", "2.1.0"),
+ @Deprecated(since = "3.1", forRemoval = true)
TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS(
"tserver.compaction.major.service.default.planner.opts.executors",
"[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]"
@@ -629,6 +657,8 @@ public enum Property {
"2.1.0"),
TSERV_MINC_MAXCONCURRENT("tserver.compaction.minor.concurrent.max", "4", PropertyType.COUNT,
"The maximum number of concurrent minor compactions for a tablet server.", "1.3.5"),
+ @Deprecated(since = "3.1")
+ @ReplacedBy(property = COMPACTION_WARN_TIME)
TSERV_COMPACTION_WARN_TIME("tserver.compaction.warn.time", "10m", PropertyType.TIMEDURATION,
"When a compaction has not made progress for this time period, a warning will be logged.",
"1.6.0"),
@@ -1315,6 +1345,8 @@ public enum Property {
ReplacedBy rb = getAnnotation(ReplacedBy.class);
if (rb != null) {
replacedBy = rb.property();
+ } else {
+ isReplaced = false;
}
annotationsComputed = true;
}
@@ -1455,9 +1487,11 @@ public enum Property {
// white list prefixes
return key.startsWith(Property.TABLE_PREFIX.getKey())
|| key.startsWith(Property.TSERV_PREFIX.getKey())
+ || key.startsWith(Property.COMPACTION_SERVICE_PREFIX.getKey())
|| key.startsWith(Property.MANAGER_PREFIX.getKey())
|| key.startsWith(Property.GC_PREFIX.getKey())
|| key.startsWith(Property.GENERAL_ARBITRARY_PROP_PREFIX.getKey())
+ || key.equals(Property.COMPACTION_WARN_TIME.getKey())
|| key.equals(Property.GENERAL_FILE_NAME_ALLOCATION_BATCH_SIZE_MIN.getKey())
|| key.equals(Property.GENERAL_FILE_NAME_ALLOCATION_BATCH_SIZE_MAX.getKey());
}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index bf30ceb2ad..cf6c099bf3 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -20,9 +20,11 @@ package org.apache.accumulo.core.spi.compaction;
import static org.apache.accumulo.core.util.LazySingletons.GSON;
+import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -30,12 +32,19 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -80,16 +89,19 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
* </tr>
* </table>
* <br>
- * The maxSize field determines the maximum size of compaction that will run on an executor. The
- * maxSize field can have a suffix of K,M,G for kilobytes, megabytes, or gigabytes and represents
- * the sum of the input files for a given compaction. One executor can have no max size and it will
- * run everything that is too large for the other executors. If all executors have a max size, then
- * system compactions will only run for compactions smaller than the largest max size. User, chop,
- * and selector compactions will always run, even if there is no executor for their size. These
- * compactions will run on the executor with the largest max size. The following example value for
- * this property will create 3 threads to run compactions of files whose file size sum is less than
- * 100M, 3 threads to run compactions of files whose file size sum is less than 500M, and run all
- * other compactions on Compactors configured to run compactions for Queue1:
+ * Note: The "executors" option has been deprecated in 3.1 and will be removed in a future release.
+ * The property prefix "tserver.compaction.major.service" has also been deprecated in 3.1 and will
+ * be removed in a future release. The maxSize field determines the maximum size of compaction that
+ * will run on an executor. The maxSize field can have a suffix of K,M,G for kilobytes, megabytes,
+ * or gigabytes and represents the sum of the input files for a given compaction. One executor can
+ * have no max size and it will run everything that is too large for the other executors. If all
+ * executors have a max size, then system compactions will only run for compactions smaller than the
+ * largest max size. User, chop, and selector compactions will always run, even if there is no
+ * executor for their size. These compactions will run on the executor with the largest max size.
+ * The following example value for this property will create 3 threads to run compactions of files
+ * whose file size sum is less than 100M, 3 threads to run compactions of files whose file size sum
+ * is less than 500M, and run all other compactions on Compactors configured to run compactions for
+ * Queue1:
*
* <pre>
* {@code
@@ -102,16 +114,38 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
*
* Note that the use of 'external' requires that the CompactionCoordinator and at least one
* Compactor for Queue1 is running.
- * <li>{@code tserver.compaction.major.service.<service>.opts.maxOpen} This determines the maximum
- * number of files that will be included in a single compaction.
+ * <li>{@code compaction.service.<service>.opts.maxOpen} This determines the maximum number of files
+ * that will be included in a single compaction.
+ * <li>{@code compaction.service.<service>.opts.queues} This is a json array of queue objects which
+ * have the following fields:
+ * <table>
+ * <caption>Default Compaction Planner Queue options</caption>
+ * <tr>
+ * <th>Field Name</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td>name</td>
+ * <td>name or alias of the queue (required)</td>
+ * </tr>
+ * <tr>
+ * <td>maxSize</td>
+ * <td>threshold sum of the input files (required for all but one of the configs)</td>
+ * </tr>
+ * </table>
+ * <br>
+ * This 'queues' object is used for defining external compaction queues without needing to use the
+ * thread-based 'executors' property.
* </ul>
*
- * @since 2.1.0
+ * @since 3.1.0
* @see org.apache.accumulo.core.spi.compaction
*/
public class DefaultCompactionPlanner implements CompactionPlanner {
+ private final static Logger log = LoggerFactory.getLogger(DefaultCompactionPlanner.class);
+
private static class ExecutorConfig {
String type;
String name;
@@ -120,6 +154,11 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
String queue;
}
+ private static class QueueConfig {
+ String name;
+ String maxSize;
+ }
+
private static class Executor {
final CompactionExecutorId ceid;
final Long maxSize;
@@ -147,44 +186,78 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
justification = "Field is written by Gson")
@Override
public void init(InitParameters params) {
- ExecutorConfig[] execConfigs =
- GSON.get().fromJson(params.getOptions().get("executors"), ExecutorConfig[].class);
-
List<Executor> tmpExec = new ArrayList<>();
+ String values;
+
+ if (params.getOptions().containsKey("executors")
+ && !params.getOptions().get("executors").isBlank()) {
+ values = params.getOptions().get("executors");
- for (ExecutorConfig executorConfig : execConfigs) {
- Long maxSize = executorConfig.maxSize == null ? null
- : ConfigurationTypeHelper.getFixedMemoryAsBytes(executorConfig.maxSize);
+ // Generate a list of fields from the desired object.
+ final List<String> execFields = Arrays.stream(ExecutorConfig.class.getDeclaredFields())
+ .map(Field::getName).collect(Collectors.toList());
- CompactionExecutorId ceid;
+ for (JsonElement element : GSON.get().fromJson(values, JsonArray.class)) {
+ validateConfig(element, execFields, ExecutorConfig.class.getName());
+ ExecutorConfig executorConfig = GSON.get().fromJson(element, ExecutorConfig.class);
- // If not supplied, GSON will leave type null. Default to internal
- if (executorConfig.type == null) {
- executorConfig.type = "internal";
+ Long maxSize = executorConfig.maxSize == null ? null
+ : ConfigurationTypeHelper.getFixedMemoryAsBytes(executorConfig.maxSize);
+ CompactionExecutorId ceid;
+
+ // If not supplied, GSON will leave type null. Default to internal
+ if (executorConfig.type == null) {
+ executorConfig.type = "internal";
+ }
+
+ switch (executorConfig.type) {
+ case "internal":
+ Preconditions.checkArgument(null == executorConfig.queue,
+ "'queue' should not be specified for internal compactions");
+ int numThreads = Objects.requireNonNull(executorConfig.numThreads,
+ "'numThreads' must be specified for internal type");
+ ceid = params.getExecutorManager().createExecutor(executorConfig.name, numThreads);
+ break;
+ case "external":
+ Preconditions.checkArgument(null == executorConfig.numThreads,
+ "'numThreads' should not be specified for external compactions");
+ String queue = Objects.requireNonNull(executorConfig.queue,
+ "'queue' must be specified for external type");
+ ceid = params.getExecutorManager().getExternalExecutor(queue);
+ break;
+ default:
+ throw new IllegalArgumentException("type must be 'internal' or 'external'");
+ }
+ tmpExec.add(new Executor(ceid, maxSize));
}
+ }
- switch (executorConfig.type) {
- case "internal":
- Preconditions.checkArgument(null == executorConfig.queue,
- "'queue' should not be specified for internal compactions");
- int numThreads = Objects.requireNonNull(executorConfig.numThreads,
- "'numThreads' must be specified for internal type");
- ceid = params.getExecutorManager().createExecutor(executorConfig.name, numThreads);
- break;
- case "external":
- Preconditions.checkArgument(null == executorConfig.numThreads,
- "'numThreads' should not be specified for external compactions");
- String queue = Objects.requireNonNull(executorConfig.queue,
- "'queue' must be specified for external type");
- ceid = params.getExecutorManager().getExternalExecutor(queue);
- break;
- default:
- throw new IllegalArgumentException("type must be 'internal' or 'external'");
+ if (params.getOptions().containsKey("queues") && !params.getOptions().get("queues").isBlank()) {
+ values = params.getOptions().get("queues");
+
+ // Generate a list of fields from the desired object.
+ final List<String> queueFields = Arrays.stream(QueueConfig.class.getDeclaredFields())
+ .map(Field::getName).collect(Collectors.toList());
+
+ for (JsonElement element : GSON.get().fromJson(values, JsonArray.class)) {
+ validateConfig(element, queueFields, QueueConfig.class.getName());
+ QueueConfig queueConfig = GSON.get().fromJson(element, QueueConfig.class);
+
+ Long maxSize = queueConfig.maxSize == null ? null
+ : ConfigurationTypeHelper.getFixedMemoryAsBytes(queueConfig.maxSize);
+
+ CompactionExecutorId ceid;
+ String queue = Objects.requireNonNull(queueConfig.name, "'name' must be specified");
+ ceid = params.getExecutorManager().getExternalExecutor(queue);
+ tmpExec.add(new Executor(ceid, maxSize));
}
- tmpExec.add(new Executor(ceid, maxSize));
}
- Collections.sort(tmpExec, Comparator.comparing(Executor::getMaxSize,
+ if (tmpExec.size() < 1) {
+ throw new IllegalStateException("No defined executors or queues for this planner");
+ }
+
+ tmpExec.sort(Comparator.comparing(Executor::getMaxSize,
Comparator.nullsLast(Comparator.naturalOrder())));
executors = List.copyOf(tmpExec);
@@ -207,7 +280,27 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
}
private void determineMaxFilesToCompact(InitParameters params) {
- this.maxFilesToCompact = Integer.parseInt(params.getOptions().getOrDefault("maxOpen", "10"));
+
+ String maxOpen = params.getOptions().get("maxOpen");
+ if (maxOpen == null) {
+ maxOpen = "10";
+ log.trace("default maxOpen not set, defaulting to 10");
+ }
+ this.maxFilesToCompact = Integer.parseInt(maxOpen);
+ }
+
+ private void validateConfig(JsonElement json, List<String> fields, String className) {
+
+ JsonObject jsonObject = GSON.get().fromJson(json, JsonObject.class);
+
+ List<String> objectProperties = new ArrayList<>(jsonObject.keySet());
+ HashSet<String> classFieldNames = new HashSet<>(fields);
+
+ if (!classFieldNames.containsAll(objectProperties)) {
+ objectProperties.removeAll(classFieldNames);
+ throw new JsonParseException(
+ "Invalid fields: " + objectProperties + " provided for class: " + className);
+ }
}
@Override
diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
index 0f79ce4df0..3a3f72c699 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
@@ -38,14 +37,16 @@ public class CompactionPlannerInitParams implements CompactionPlanner.InitParame
private final Set<CompactionExecutorId> requestedExternalExecutors;
private final ServiceEnvironment senv;
private final CompactionServiceId serviceId;
+ private final String prefix;
- public CompactionPlannerInitParams(CompactionServiceId serviceId, Map<String,String> plannerOpts,
- ServiceEnvironment senv) {
+ public CompactionPlannerInitParams(CompactionServiceId serviceId, String prefix,
+ Map<String,String> plannerOpts, ServiceEnvironment senv) {
this.serviceId = serviceId;
this.plannerOpts = plannerOpts;
this.requestedExecutors = new HashMap<>();
this.requestedExternalExecutors = new HashSet<>();
this.senv = senv;
+ this.prefix = prefix;
}
@Override
@@ -60,7 +61,7 @@ public class CompactionPlannerInitParams implements CompactionPlanner.InitParame
@Override
public String getFullyQualifiedOption(String key) {
- return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key;
+ return prefix + serviceId + ".opts." + key;
}
@Override
diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
index b3f911c467..5cf9d0abc8 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
@@ -21,11 +21,14 @@ package org.apache.accumulo.core.util.compaction;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
@@ -33,13 +36,18 @@ import com.google.common.collect.Sets;
* This class serves to configure compaction services from an {@link AccumuloConfiguration} object.
*
* Specifically, compaction service properties (those prefixed by "tserver.compaction.major
- * .service") are used.
+ * .service" or "compaction.service") are used.
*/
public class CompactionServicesConfig {
+ private static final Logger log = LoggerFactory.getLogger(CompactionServicesConfig.class);
private final Map<String,String> planners = new HashMap<>();
+ private final Map<String,String> plannerPrefixes = new HashMap<>();
private final Map<String,Long> rateLimits = new HashMap<>();
private final Map<String,Map<String,String>> options = new HashMap<>();
+ @SuppressWarnings("removal")
+ private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX;
+ private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX;
long defaultRateLimit;
public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default");
@@ -50,31 +58,91 @@ public class CompactionServicesConfig {
.getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
}
- private Map<String,String> getConfiguration(AccumuloConfiguration aconf) {
- return aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX);
+ private Map<String,Map<String,String>> getConfiguration(AccumuloConfiguration aconf) {
+ Map<String,Map<String,String>> properties = new HashMap<>();
+
+ var newProps = aconf.getAllPropertiesWithPrefixStripped(newPrefix);
+ properties.put(newPrefix.getKey(), newProps);
+
+ // get all of the services under the new prefix
+ var newServices =
+ newProps.keySet().stream().map(prop -> prop.split("\\.")[0]).collect(Collectors.toSet());
+
+ Map<String,String> oldServices = new HashMap<>();
+
+ for (Map.Entry<String,String> entry : aconf.getAllPropertiesWithPrefixStripped(oldPrefix)
+ .entrySet()) {
+ // Discard duplicate service definitions
+ var service = entry.getKey().split("\\.")[0];
+ if (newServices.contains(service)) {
+ log.warn("Duplicate compaction service '{}' definition exists. Ignoring property : '{}'",
+ service, entry.getKey());
+ } else {
+ oldServices.put(entry.getKey(), entry.getValue());
+ }
+ }
+ properties.put(oldPrefix.getKey(), oldServices);
+ // Return unmodifiable map
+ return Map.copyOf(properties);
}
public CompactionServicesConfig(AccumuloConfiguration aconf) {
- Map<String,String> configs = getConfiguration(aconf);
-
- configs.forEach((prop, val) -> {
-
- var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length());
- String[] tokens = suffix.split("\\.");
- if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) {
- options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val);
- } else if (tokens.length == 2 && tokens[1].equals("planner")) {
- planners.put(tokens[0], val);
- } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) {
- var eprop = Property.getPropertyByKey(prop);
- if (eprop == null || aconf.isPropertySet(eprop)) {
- rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val));
+ Map<String,Map<String,String>> configs = getConfiguration(aconf);
+
+ // Find compaction planner defs first.
+ configs.forEach((prefix, props) -> {
+ props.forEach((prop, val) -> {
+ String[] tokens = prop.split("\\.");
+ if (tokens.length == 2 && tokens[1].equals("planner")) {
+ if (prefix.equals(oldPrefix.getKey())) {
+ // Log a warning if the old prefix planner is defined by a user.
+ Property userDefined = null;
+ try {
+ userDefined = Property.valueOf(prefix + prop);
+ } catch (IllegalArgumentException e) {
+ log.trace("Property: {} is not set by default configuration", prefix + prop);
+ }
+ boolean isPropSet = true;
+ if (userDefined != null) {
+ isPropSet = aconf.isPropertySet(userDefined);
+ }
+ if (isPropSet) {
+ log.warn(
+ "Found compaction planner '{}' using a deprecated prefix. Please update property to use the '{}' prefix",
+ tokens[0], newPrefix);
+ }
+ }
+ plannerPrefixes.put(tokens[0], prefix);
+ planners.put(tokens[0], val);
}
- } else {
- throw new IllegalArgumentException("Malformed compaction service property " + prop);
- }
+ });
});
+ // Now find all compaction planner options.
+ configs.forEach((prefix, props) -> {
+ props.forEach((prop, val) -> {
+ String[] tokens = prop.split("\\.");
+ if (!plannerPrefixes.containsKey(tokens[0])) {
+ throw new IllegalArgumentException(
+ "Incomplete compaction service definition, missing planner class: " + prop);
+ }
+ if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) {
+ options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val);
+ } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) {
+ var eprop = Property.getPropertyByKey(prop);
+ if (eprop == null || aconf.isPropertySet(eprop)) {
+ rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val));
+ }
+ } else if (!(tokens.length == 2 && tokens[1].equals("planner"))) {
+ throw new IllegalArgumentException(
+ "Malformed compaction service property " + prefix + prop);
+ } else {
+ log.warn(
+ "Ignoring compaction property {} as does not match the prefix used by the referenced planner definition",
+ prop);
+ }
+ });
+ });
defaultRateLimit = getDefaultThroughput();
var diff = Sets.difference(options.keySet(), planners.keySet());
@@ -110,6 +178,10 @@ public class CompactionServicesConfig {
return planners;
}
+ public String getPlannerPrefix(String service) {
+ return plannerPrefixes.get(service);
+ }
+
public Map<String,Long> getRateLimits() {
return rateLimits;
}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 06deb2e219..5ae90a4f2d 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -19,7 +19,9 @@
package org.apache.accumulo.core.spi.compaction;
import static com.google.common.collect.MoreCollectors.onlyElement;
+import static org.apache.accumulo.core.spi.compaction.CompactionPlanner.InitParameters;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -44,6 +46,8 @@ import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Test;
+import com.google.gson.JsonParseException;
+
public class DefaultCompactionPlannerTest {
private static <T> T getOnlyElement(Collection<T> c) {
@@ -239,6 +243,74 @@ public class DefaultCompactionPlannerTest {
assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor());
}
+ @Test
+ public void testQueueCreation() throws Exception {
+ DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+ Configuration conf = EasyMock.createMock(Configuration.class);
+ EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+
+ ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+ EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.replay(conf, senv);
+
+ String queues = "[{\"name\": \"small\", \"maxSize\":\"32M\"},{\"name\":\"midsize\"}]";
+ planner.init(getInitParamQueues(senv, queues));
+
+ var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "1M");
+ var params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM);
+ var plan = planner.makePlan(params);
+
+ var job = getOnlyElement(plan.getJobs());
+ assertEquals(all, job.getFiles());
+ assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor());
+
+ all = createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "100M");
+ params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM);
+ plan = planner.makePlan(params);
+
+ job = getOnlyElement(plan.getJobs());
+ assertEquals(all, job.getFiles());
+ assertEquals(CompactionExecutorIdImpl.externalId("midsize"), job.getExecutor());
+ }
+
+ /**
+ * Tests that additional fields in the JSON objects cause errors to be thrown.
+ */
+ @Test
+ public void testErrorAdditionalConfigFields() {
+ Configuration conf = EasyMock.createMock(Configuration.class);
+ EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+
+ ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+ EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.replay(conf, senv);
+
+ DefaultCompactionPlanner QueuePlanner = new DefaultCompactionPlanner();
+
+ String queues =
+ "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, {\"name\":\"largeQueue\", \"type\":\"internal\", \"foo\":\"bar\", \"queue\":\"broken\"}]";
+
+ final InitParameters queueParams = getInitParamQueues(senv, queues);
+ assertNotNull(queueParams);
+ var e = assertThrows(JsonParseException.class, () -> QueuePlanner.init(queueParams),
+ "Failed to throw error");
+ assertTrue(e.getMessage().contains("[type, foo, queue]"),
+ "Error message didn't contain '[type, foo, queue]'");
+
+ String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1",
+ "'type': 'internal','maxSize':'128M','numThreads':2, 'foo':'bar'",
+ "'type': 'internal','numThreads':1, 'unexpectedField':'foo'");
+
+ final InitParameters execParams = getInitParams(senv, executors);
+ assertNotNull(execParams);
+
+ DefaultCompactionPlanner ExecPlanner = new DefaultCompactionPlanner();
+ var err = assertThrows(JsonParseException.class, () -> ExecPlanner.init(execParams),
+ "Failed to throw error");
+ assertTrue(err.getMessage().contains("Invalid fields: [foo]"),
+ "Error message didn't contain '[foo]'");
+ }
+
/**
* Tests internal type executor with no numThreads set throws error
*/
@@ -302,6 +374,59 @@ public class DefaultCompactionPlannerTest {
assertTrue(e.getMessage().contains("queue"), "Error message didn't contain queue");
}
+ /**
+ * Tests queue with missing name throws error
+ */
+ @Test
+ public void testErrorQueueNoName() {
+ DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+ Configuration conf = EasyMock.createMock(Configuration.class);
+ EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+
+ ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+ EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.replay(conf, senv);
+
+ String queues = "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, {\"maxSize\":\"120M\"}]";
+
+ final InitParameters params = getInitParamQueues(senv, queues);
+ assertNotNull(params);
+
+ var e = assertThrows(NullPointerException.class, () -> planner.init(params),
+ "Failed to throw error");
+ assertEquals(e.getMessage(), "'name' must be specified", "Error message didn't contain 'name'");
+ }
+
+ /**
+ * Tests not having executors or queues throws errors
+ */
+ @Test
+ public void testErrorNoExecutors() {
+ DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+ Configuration conf = EasyMock.createMock(Configuration.class);
+ EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+
+ ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+ EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.replay(conf, senv);
+
+ var execParams = getInitParams(senv, "");
+ assertNotNull(execParams);
+
+ var e = assertThrows(IllegalStateException.class, () -> planner.init(execParams),
+ "Failed to throw error");
+ assertEquals("No defined executors or queues for this planner", e.getMessage(),
+ "Error message was not equal");
+
+ var params = getInitParamQueues(senv, "");
+ assertNotNull(params);
+
+ var err = assertThrows(IllegalStateException.class, () -> planner.init(params),
+ "Failed to throw error");
+ assertEquals("No defined executors or queues for this planner", e.getMessage(),
+ "Error message was not equal");
+ }
+
/**
* Tests executors can only have one without a max size.
*/
@@ -360,7 +485,44 @@ public class DefaultCompactionPlannerTest {
@Override
public String getFullyQualifiedOption(String key) {
assertEquals("maxOpen", key);
- return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key;
+ return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key;
+ }
+
+ @Override
+ public ExecutorManager getExecutorManager() {
+ return new ExecutorManager() {
+ @Override
+ public CompactionExecutorId createExecutor(String name, int threads) {
+ return CompactionExecutorIdImpl.externalId(name);
+ }
+
+ @Override
+ public CompactionExecutorId getExternalExecutor(String name) {
+ return CompactionExecutorIdImpl.externalId(name);
+ }
+ };
+ }
+ };
+ }
+
+ private CompactionPlanner.InitParameters getInitParamQueues(ServiceEnvironment senv,
+ String queues) {
+ return new CompactionPlanner.InitParameters() {
+
+ @Override
+ public ServiceEnvironment getServiceEnvironment() {
+ return senv;
+ }
+
+ @Override
+ public Map<String,String> getOptions() {
+ return Map.of("queues", queues, "maxOpen", "15");
+ }
+
+ @Override
+ public String getFullyQualifiedOption(String key) {
+ assertEquals("maxOpen", key);
+ return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key;
}
@Override
@@ -521,7 +683,7 @@ public class DefaultCompactionPlannerTest {
@Override
public String getFullyQualifiedOption(String key) {
assertEquals("maxOpen", key);
- return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key;
+ return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key;
}
@Override
diff --git a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java
new file mode 100644
index 0000000000..a5921e33eb
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.compaction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.junit.jupiter.api.Test;
+
+public class CompactionServicesConfigTest {
+
+ @SuppressWarnings("removal")
+ private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX;
+ private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX;
+
+ @Test
+ public void testCompactionProps() {
+ ConfigurationCopy conf = new ConfigurationCopy();
+
+ conf.set(newPrefix.getKey() + "default.planner", DefaultCompactionPlanner.class.getName());
+ conf.set(newPrefix.getKey() + "default.planner.opts.maxOpen", "10");
+ conf.set(newPrefix.getKey() + "default.planner.opts.executors",
+ "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]");
+
+ conf.set(oldPrefix.getKey() + "default.planner.opts.ignoredProp", "1");
+ conf.set(newPrefix.getKey() + "default.planner.opts.validProp", "1");
+ conf.set(oldPrefix.getKey() + "default.planner.opts.validProp", "a");
+
+ var compactionConfig = new CompactionServicesConfig(conf);
+ assertEquals(Map.of("maxOpen", "10", "executors",
+ "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]",
+ "validProp", "1"), compactionConfig.getOptions().get("default"));
+ }
+
+ @Test
+ public void testDuplicateCompactionPlannerDefs() {
+ ConfigurationCopy conf = new ConfigurationCopy();
+
+ String planner = DefaultCompactionPlanner.class.getName();
+ String oldPlanner = "OldPlanner";
+
+ conf.set(newPrefix.getKey() + "default.planner", planner);
+ conf.set(oldPrefix.getKey() + "default.planner", oldPlanner);
+
+ conf.set(oldPrefix.getKey() + "old.planner", oldPlanner);
+
+ var compactionConfig = new CompactionServicesConfig(conf);
+ assertEquals(Map.of("default", planner, "old", oldPlanner), compactionConfig.getPlanners());
+ }
+
+ @Test
+ public void testCompactionPlannerOldDef() {
+ ConfigurationCopy conf = new ConfigurationCopy();
+
+ conf.set(oldPrefix.getKey() + "cs1.planner", DefaultCompactionPlanner.class.getName());
+ conf.set(oldPrefix.getKey() + "cs1.planner.opts.maxOpen", "10");
+ conf.set(oldPrefix.getKey() + "cs1.planner.opts.executors",
+ "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]");
+ conf.set(oldPrefix.getKey() + "cs1.planner.opts.foo", "1");
+
+ var compactionConfig = new CompactionServicesConfig(conf);
+ assertTrue(compactionConfig.getOptions().get("cs1").containsKey("foo"));
+ assertEquals("1", compactionConfig.getOptions().get("cs1").get("foo"));
+ }
+
+ @Test
+ public void testCompactionRateLimits() {
+ ConfigurationCopy conf = new ConfigurationCopy();
+ CompactionServicesConfig compactionConfig;
+
+ conf.set(oldPrefix.getKey() + "cs1.planner", DefaultCompactionPlanner.class.getName());
+ conf.set(oldPrefix.getKey() + "cs1.rate.limit", "2M");
+ compactionConfig = new CompactionServicesConfig(conf);
+ assertEquals(2097152, compactionConfig.getRateLimits().get("cs1"));
+
+ // Test service collision
+ conf.set(newPrefix.getKey() + "cs1.rate.limit", "4M");
+ var e = assertThrows(IllegalArgumentException.class, () -> new CompactionServicesConfig(conf),
+ "failed to throw error");
+ assertEquals("Incomplete compaction service definition, missing planner class: cs1.rate.limit",
+ e.getMessage(), "Error message was not equal");
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
index 47e75fd42d..4790440ff6 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
@@ -98,7 +98,7 @@ public class CompactionWatcher implements Runnable {
// remove any compaction that completed or made progress
observedCompactions.keySet().retainAll(newKeys);
- long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
+ long warnTime = config.getTimeInMillis(Property.COMPACTION_WARN_TIME);
// check for stuck compactions
for (ObservedCompactionInfo oci : observedCompactions.values()) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
index e297168e51..029cb2fb13 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
@@ -112,7 +112,8 @@ public class CheckCompactionConfig implements KeywordExecutable {
CompactionPlanner planner = plannerClass.getDeclaredConstructor().newInstance();
var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId),
- servicesConfig.getOptions().get(serviceId), senv);
+ servicesConfig.getPlannerPrefix(serviceId), servicesConfig.getOptions().get(serviceId),
+ senv);
planner.init(initParams);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
index c80ab28f8d..4aa38abf90 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
@@ -79,6 +79,29 @@ public class CheckCompactionConfigTest extends WithTestNames {
CheckCompactionConfig.main(new String[] {filePath});
}
+ @Test
+ public void testValidInput3() throws Exception {
+ String inputString = ("tserver.compaction.major.service.cs1.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n"
+ + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n"
+ + "{'name':'large','type':'internal','numThreads':2}] \n"
+ + "tserver.compaction.major.service.cs2.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "tserver.compaction.major.service.cs2.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':7},\\\n"
+ + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':5},\\\n"
+ + "{'name':'large','type':'external','queue':'DCQ1'}] \n"
+ + "compaction.service.cs3.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "compaction.service.cs3.planner.opts.queues=\\\n"
+ + "[{'name':'small','maxSize':'16M'},{'name':'large'}]").replaceAll("'", "\"");
+
+ String filePath = writeToFileAndReturnPath(inputString);
+ CheckCompactionConfig.main(new String[] {filePath});
+ }
+
@Test
public void testThrowsExternalNumThreadsError() throws IOException {
String inputString = ("tserver.compaction.major.service.cs1.planner="
@@ -93,7 +116,7 @@ public class CheckCompactionConfigTest extends WithTestNames {
var e = assertThrows(IllegalArgumentException.class,
() -> CheckCompactionConfig.main(new String[] {filePath}));
- assertEquals(e.getMessage(), expectedErrorMsg);
+ assertEquals(expectedErrorMsg, e.getMessage());
}
@Test
@@ -119,7 +142,7 @@ public class CheckCompactionConfigTest extends WithTestNames {
+ "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n"
+ "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n"
+ "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\"");
- String expectedErrorMsg = "Incomplete compaction service definitions, missing planner class";
+ String expectedErrorMsg = "Incomplete compaction service definition, missing planner class";
String filePath = writeToFileAndReturnPath(inputString);
@@ -138,13 +161,31 @@ public class CheckCompactionConfigTest extends WithTestNames {
+ "{'name':'small','type':'internal','numThreads':2}]").replaceAll("'", "\"");
String expectedErrorMsg = "Duplicate Compaction Executor ID found";
- String filePath = writeToFileAndReturnPath(inputString);
+ final String filePath = writeToFileAndReturnPath(inputString);
var e = assertThrows(IllegalStateException.class,
() -> CheckCompactionConfig.main(new String[] {filePath}));
assertTrue(e.getMessage().startsWith(expectedErrorMsg));
}
+ @Test
+ public void testRepeatedQueueName() throws Exception {
+ String inputString = ("compaction.service.cs1.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "compaction.service.cs1.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'external','maxSize':'16M','queue':'failedQueue'}] \n"
+ + "compaction.service.cs1.planner.opts.queues=[{'name':'failedQueue'}]")
+ .replaceAll("'", "\"");
+
+ String expectedErrorMsg = "Duplicate external executor for queue failedQueue";
+
+ final String filePath = writeToFileAndReturnPath(inputString);
+
+ var err = assertThrows(IllegalArgumentException.class,
+ () -> CheckCompactionConfig.main(new String[] {filePath}));
+ assertEquals(err.getMessage(), expectedErrorMsg);
+ }
+
@Test
public void testInvalidTypeValue() throws Exception {
String inputString = ("tserver.compaction.major.service.cs1.planner="
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index bbabc9f262..6332842236 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -205,7 +205,7 @@ public class CompactionManager {
try {
tmpServices.put(CompactionServiceId.of(serviceName),
new CompactionService(serviceName, plannerClassName,
- currentCfg.getRateLimit(serviceName),
+ currentCfg.getPlannerPrefix(serviceName), currentCfg.getRateLimit(serviceName),
currentCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics,
this::getExternalExecutor));
} catch (RuntimeException e) {
@@ -249,11 +249,12 @@ public class CompactionManager {
if (service == null) {
tmpServices.put(csid,
new CompactionService(serviceName, plannerClassName,
- tmpCfg.getRateLimit(serviceName),
+ tmpCfg.getPlannerPrefix(serviceName), tmpCfg.getRateLimit(serviceName),
tmpCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics,
this::getExternalExecutor));
} else {
- service.configurationChanged(plannerClassName, tmpCfg.getRateLimit(serviceName),
+ service.configurationChanged(plannerClassName, tmpCfg.getPlannerPrefix(serviceName),
+ tmpCfg.getRateLimit(serviceName),
tmpCfg.getOptions().getOrDefault(serviceName, Map.of()));
tmpServices.put(csid, service);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 926152e1be..fffbbe1afe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -86,8 +86,8 @@ public class CompactionService {
private static final Logger log = LoggerFactory.getLogger(CompactionService.class);
- public CompactionService(String serviceName, String plannerClass, Long maxRate,
- Map<String,String> plannerOptions, ServerContext context,
+ public CompactionService(String serviceName, String plannerClass, String plannerPrefix,
+ Long maxRate, Map<String,String> plannerOptions, ServerContext context,
CompactionExecutorsMetrics ceMetrics,
Function<CompactionExecutorId,ExternalCompactionExecutor> externExecutorSupplier) {
@@ -100,8 +100,8 @@ public class CompactionService {
this.ceMetrics = ceMetrics;
this.externExecutorSupplier = externExecutorSupplier;
- var initParams =
- new CompactionPlannerInitParams(myId, plannerOpts, new ServiceEnvironmentImpl(context));
+ var initParams = new CompactionPlannerInitParams(myId, plannerPrefix, plannerOpts,
+ new ServiceEnvironmentImpl(context));
planner = createPlanner(myId, plannerClass, plannerOptions, initParams);
Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new HashMap<>();
@@ -359,7 +359,7 @@ public class CompactionService {
.anyMatch(job -> job.getStatus() == Status.QUEUED);
}
- public void configurationChanged(String plannerClassName, Long maxRate,
+ public void configurationChanged(String plannerClassName, String plannerPrefix, Long maxRate,
Map<String,String> plannerOptions) {
Preconditions.checkArgument(maxRate >= 0);
@@ -372,8 +372,8 @@ public class CompactionService {
return;
}
- var initParams =
- new CompactionPlannerInitParams(myId, plannerOptions, new ServiceEnvironmentImpl(context));
+ var initParams = new CompactionPlannerInitParams(myId, plannerPrefix, plannerOptions,
+ new ServiceEnvironmentImpl(context));
var tmpPlanner = createPlanner(myId, plannerClassName, plannerOptions, initParams);
Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new HashMap<>();
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java
index 5f5ec1dd4b..72f53b94c0 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java
@@ -54,7 +54,7 @@ import com.google.common.collect.MoreCollectors;
public class BadCompactionServiceConfigIT extends AccumuloClusterHarness {
- private static final String CSP = Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey();
+ private static final String CSP = Property.COMPACTION_SERVICE_PREFIX.getKey();
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java
index 60ede3dff7..6b8dfe2fb9 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java
@@ -57,10 +57,10 @@ public class CompactionConfigChangeIT extends AccumuloClusterHarness {
final String table = getUniqueNames(1)[0];
client.instanceOperations().setProperty(
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner",
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner",
DefaultCompactionPlanner.class.getName());
client.instanceOperations().setProperty(
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors",
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors",
("[{'name':'small','type':'internal','maxSize':'2M','numThreads':2},"
+ "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+ "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\""));
@@ -91,7 +91,7 @@ public class CompactionConfigChangeIT extends AccumuloClusterHarness {
// compactions. Because the compactions are running slow, expect this config change to overlap
// with running compactions.
client.instanceOperations().setProperty(
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors",
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors",
("[{'name':'little','type':'internal','maxSize':'128M','numThreads':8},"
+ "{'name':'big','type':'internal','numThreads':2}]").replaceAll("'", "\""));
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
index 1832117ce5..45e0b20d08 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
@@ -140,7 +140,7 @@ public class CompactionExecutorIT extends SharedMiniClusterBase {
SharedMiniClusterBase.startMiniClusterWithConfig((miniCfg, coreSite) -> {
Map<String,String> siteCfg = new HashMap<>();
- var csp = Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey();
+ var csp = Property.COMPACTION_SERVICE_PREFIX.getKey();
siteCfg.put(csp + "cs1.planner", TestPlanner.class.getName());
siteCfg.put(csp + "cs1.planner.opts.executors", "3");
siteCfg.put(csp + "cs1.planner.opts.filesPerCompaction", "5");
@@ -205,10 +205,11 @@ public class CompactionExecutorIT extends SharedMiniClusterBase {
assertEquals(2, getFiles(client, "rctt").size());
- client.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey()
- + "recfg.planner.opts.filesPerCompaction", "5");
client.instanceOperations().setProperty(
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.executors", "1");
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.filesPerCompaction",
+ "5");
+ client.instanceOperations().setProperty(
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.executors", "1");
addFiles(client, "rctt", 10);
@@ -223,15 +224,15 @@ public class CompactionExecutorIT extends SharedMiniClusterBase {
@Test
public void testAddCompactionService() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
- client.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey()
- + "newcs.planner.opts.filesPerCompaction", "7");
client.instanceOperations().setProperty(
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.process",
- "SYSTEM");
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.filesPerCompaction",
+ "7");
+ client.instanceOperations().setProperty(
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.process", "SYSTEM");
client.instanceOperations().setProperty(
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", "3");
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", "3");
client.instanceOperations().setProperty(
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner",
+ Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner",
TestPlanner.class.getName());
createTable(client, "acst", "newcs");
diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java
index 9da4c6b216..2c1ae92d7e 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java
@@ -49,6 +49,7 @@ public class ConfigSetIT extends SharedMiniClusterBase {
private static final Logger log = LoggerFactory.getLogger(ConfigSetIT.class);
@Test
+ @SuppressWarnings("removal")
public void setInvalidJson() throws Exception {
log.debug("Starting setInvalidJson test ------------------");
@@ -61,7 +62,6 @@ public class ConfigSetIT extends SharedMiniClusterBase {
try (AccumuloClient client =
getCluster().createAccumuloClient("root", new PasswordToken(getRootPassword()))) {
-
client.instanceOperations().setProperty(TSERV_COMPACTION_SERVICE_ROOT_EXECUTORS.getKey(),
validJson);
assertThrows(AccumuloException.class, () -> client.instanceOperations()
diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
index 979c3b85dd..d0012c5d37 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@ -79,6 +79,7 @@ public class SlowOps {
createData();
}
+ @SuppressWarnings("removal")
public static void setExpectedCompactions(AccumuloClient client, final int numParallelExpected) {
final int target = numParallelExpected + 1;
try {