You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2022/01/03 14:21:40 UTC
[accumulo] branch main updated: Create tool to validate Compaction configuration (#2345)
This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 12fe963 Create tool to validate Compaction configuration (#2345)
12fe963 is described below
commit 12fe9633f6cdcac7a964d8fdfcec8c615aaf1629
Author: Dom G <47...@users.noreply.github.com>
AuthorDate: Mon Jan 3 09:21:30 2022 -0500
Create tool to validate Compaction configuration (#2345)
* Create CLI tool that validates a compaction configuration within a file
* Extract CompactionManager.Config into its own class
* Extract CompactionService.CpInitParams into its own class
---
.../spi/compaction/DefaultCompactionPlanner.java | 11 +-
.../compaction/CompactionPlannerInitParams.java | 98 +++++++++++
.../util/compaction/CompactionServicesConfig.java | 189 +++++++++++++++++++++
.../server/conf/CheckCompactionConfig.java | 162 ++++++++++++++++++
.../server/conf/CheckCompactionConfigTest.java | 154 +++++++++++++++++
.../tserver/compactions/CompactionManager.java | 174 +++----------------
.../tserver/compactions/CompactionService.java | 70 +-------
.../accumulo/tserver/tablet/CompactableImpl.java | 5 +-
.../apache/accumulo/test/start/KeywordStartIT.java | 8 +-
9 files changed, 645 insertions(+), 226 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index a74ab37..2c86f1f 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -113,7 +113,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class DefaultCompactionPlanner implements CompactionPlanner {
- private static Logger log = LoggerFactory.getLogger(DefaultCompactionPlanner.class);
+ private static final Logger log = LoggerFactory.getLogger(DefaultCompactionPlanner.class);
public static class ExecutorConfig {
String type;
@@ -170,17 +170,16 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
case "internal":
Preconditions.checkArgument(null == executorConfig.queue,
"'queue' should not be specified for internal compactions");
- Objects.requireNonNull(executorConfig.numThreads,
+ int numThreads = Objects.requireNonNull(executorConfig.numThreads,
"'numThreads' must be specified for internal type");
- ceid = params.getExecutorManager().createExecutor(executorConfig.name,
- executorConfig.numThreads);
+ ceid = params.getExecutorManager().createExecutor(executorConfig.name, numThreads);
break;
case "external":
Preconditions.checkArgument(null == executorConfig.numThreads,
"'numThreads' should not be specified for external compactions");
- Objects.requireNonNull(executorConfig.queue,
+ String queue = Objects.requireNonNull(executorConfig.queue,
"'queue' must be specified for external type");
- ceid = params.getExecutorManager().getExternalExecutor(executorConfig.queue);
+ ceid = params.getExecutorManager().getExternalExecutor(queue);
break;
default:
throw new IllegalArgumentException("type must be 'internal' or 'external'");
diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
new file mode 100644
index 0000000..61bf50e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.compaction;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.ExecutorManager;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionPlannerInitParams implements CompactionPlanner.InitParameters {
+
+ private final Map<String,String> plannerOpts;
+ private final Map<CompactionExecutorId,Integer> requestedExecutors;
+ private final Set<CompactionExecutorId> requestedExternalExecutors;
+ private final ServiceEnvironment senv;
+ private final CompactionServiceId serviceId;
+
+ public CompactionPlannerInitParams(CompactionServiceId serviceId, Map<String,String> plannerOpts,
+ ServiceEnvironment senv) {
+ this.serviceId = serviceId;
+ this.plannerOpts = plannerOpts;
+ this.requestedExecutors = new HashMap<>();
+ this.requestedExternalExecutors = new HashSet<>();
+ this.senv = senv;
+ }
+
+ @Override
+ public ServiceEnvironment getServiceEnvironment() {
+ return senv;
+ }
+
+ @Override
+ public Map<String,String> getOptions() {
+ return plannerOpts;
+ }
+
+ @Override
+ public String getFullyQualifiedOption(String key) {
+ return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key;
+ }
+
+ @Override
+ public ExecutorManager getExecutorManager() {
+ return new ExecutorManager() {
+ @Override
+ public CompactionExecutorId createExecutor(String executorName, int threads) {
+ Preconditions.checkArgument(threads > 0, "Positive number of threads required : %s",
+ threads);
+ var ceid = CompactionExecutorIdImpl.internalId(serviceId, executorName);
+ Preconditions.checkState(!getRequestedExecutors().containsKey(ceid));
+ getRequestedExecutors().put(ceid, threads);
+ return ceid;
+ }
+
+ @Override
+ public CompactionExecutorId getExternalExecutor(String name) {
+ var ceid = CompactionExecutorIdImpl.externalId(name);
+ Preconditions.checkArgument(!getRequestedExternalExecutors().contains(ceid),
+ "Duplicate external executor for queue " + name);
+ getRequestedExternalExecutors().add(ceid);
+ return ceid;
+ }
+ };
+ }
+
+ public Map<CompactionExecutorId,Integer> getRequestedExecutors() {
+ return requestedExecutors;
+ }
+
+ public Set<CompactionExecutorId> getRequestedExternalExecutors() {
+ return requestedExternalExecutors;
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
new file mode 100644
index 0000000..0dfcfe8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.compaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class serves to configure compaction services from an {@link AccumuloConfiguration} object.
+ *
+ * Specifically, compaction service properties (those prefixed by "tserver.compaction.major
+ * .service") are used.
+ */
+public class CompactionServicesConfig {
+
+ private final Map<String,String> planners = new HashMap<>();
+ private final Map<String,Long> rateLimits = new HashMap<>();
+ private final Map<String,Map<String,String>> options = new HashMap<>();
+ long defaultRateLimit;
+ private final Consumer<String> deprecationWarningConsumer;
+
+ public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default");
+
+ @SuppressWarnings("removal")
+ private long getDefaultThroughput(AccumuloConfiguration aconf) {
+ if (aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true)) {
+ return aconf.getAsBytes(Property.TSERV_MAJC_THROUGHPUT);
+ }
+
+ return ConfigurationTypeHelper
+ .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
+ }
+
+ @SuppressWarnings("removal")
+ private Map<String,String> getConfiguration(AccumuloConfiguration aconf) {
+
+ Map<String,String> configs =
+ aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX);
+
+ // check if deprecated properties for compaction executor are set
+ if (aconf.isPropertySet(Property.TSERV_MAJC_MAXCONCURRENT, true)) {
+
+ String defaultServicePrefix =
+ Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + DEFAULT_SERVICE.canonical() + ".";
+
+ // check if any properties for the default compaction service are set
+ boolean defaultServicePropsSet = configs.keySet().stream()
+ .filter(key -> key.startsWith(defaultServicePrefix)).map(Property::getPropertyByKey)
+ .anyMatch(prop -> prop == null || aconf.isPropertySet(prop, true));
+
+ if (defaultServicePropsSet) {
+
+ String warning = "The deprecated property " + Property.TSERV_MAJC_MAXCONCURRENT.getKey()
+ + " was set. Properties with the prefix " + defaultServicePrefix
+ + " were also set which replace the deprecated properties. The deprecated property "
+ + "was therefore ignored.";
+
+ deprecationWarningConsumer.accept(warning);
+
+ } else {
+ String numThreads = aconf.get(Property.TSERV_MAJC_MAXCONCURRENT);
+
+ // Its possible a user has configured the other compaction services, but not the default
+ // service. In this case want to produce a config with the default service configs
+ // overridden using deprecated configs.
+
+ HashMap<String,String> configsCopy = new HashMap<>(configs);
+
+ Map<String,String> defaultServiceConfigs =
+ Map.of(defaultServicePrefix + "planner", DefaultCompactionPlanner.class.getName(),
+ defaultServicePrefix + "planner.opts.executors",
+ "[{'name':'deprecated', 'numThreads':" + numThreads + "}]");
+
+ configsCopy.putAll(defaultServiceConfigs);
+
+ String warning = "The deprecated property " + Property.TSERV_MAJC_MAXCONCURRENT.getKey()
+ + " was set. Properties with the prefix " + defaultServicePrefix
+ + " were not set, these should replace the deprecated properties. The old properties "
+ + "were automatically mapped to the new properties in process creating : "
+ + defaultServiceConfigs + ".";
+
+ deprecationWarningConsumer.accept(warning);
+
+ configs = Map.copyOf(configsCopy);
+ }
+ }
+
+ return configs;
+
+ }
+
+ public CompactionServicesConfig(AccumuloConfiguration aconf,
+ Consumer<String> deprecationWarningConsumer) {
+ this.deprecationWarningConsumer = deprecationWarningConsumer;
+ Map<String,String> configs = getConfiguration(aconf);
+
+ configs.forEach((prop, val) -> {
+
+ var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length());
+ String[] tokens = suffix.split("\\.");
+ if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) {
+ options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val);
+ } else if (tokens.length == 2 && tokens[1].equals("planner")) {
+ planners.put(tokens[0], val);
+ } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) {
+ var eprop = Property.getPropertyByKey(prop);
+ if (eprop == null || aconf.isPropertySet(eprop, true)
+ || !isDeprecatedThroughputSet(aconf)) {
+ rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val));
+ }
+ } else {
+ throw new IllegalArgumentException("Malformed compaction service property " + prop);
+ }
+ });
+
+ defaultRateLimit = getDefaultThroughput(aconf);
+
+ var diff = Sets.difference(options.keySet(), planners.keySet());
+
+ if (!diff.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Incomplete compaction service definitions, missing planner class " + diff);
+ }
+
+ }
+
+ @SuppressWarnings("removal")
+ private boolean isDeprecatedThroughputSet(AccumuloConfiguration aconf) {
+ return aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true);
+ }
+
+ public long getRateLimit(String serviceName) {
+ return getRateLimits().getOrDefault(serviceName, defaultRateLimit);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof CompactionServicesConfig) {
+ var oc = (CompactionServicesConfig) o;
+ return getPlanners().equals(oc.getPlanners()) && getOptions().equals(oc.getOptions())
+ && getRateLimits().equals(oc.getRateLimits());
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getPlanners(), getOptions(), getRateLimits());
+ }
+
+ public Map<String,String> getPlanners() {
+ return planners;
+ }
+
+ public Map<String,Long> getRateLimits() {
+ return rateLimits;
+ }
+
+ public Map<String,Map<String,String>> getOptions() {
+ return options;
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
new file mode 100644
index 0000000..2d53180
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Path;
+import java.util.Set;
+
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.util.ConfigurationImpl;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+/**
+ * A command line tool that verifies that a given properties file will correctly configure
+ * compaction services.
+ *
+ * This tool takes, as input, a local path to a properties file containing the properties used to
+ * configure compaction services. The file is parsed and the user is presented with output detailing
+ * which (if any) compaction services would be created from the given properties, or an error
+ * describing why the given properties are incorrect.
+ */
+@AutoService(KeywordExecutable.class)
+public class CheckCompactionConfig implements KeywordExecutable {
+
+ private final static Logger log = LoggerFactory.getLogger(CheckCompactionConfig.class);
+
+ final static String DEFAULT = "default";
+ final static String META = "meta";
+ final static String ROOT = "root";
+
+ static class Opts extends Help {
+ @Parameter(description = "<path> Local path to file containing compaction configuration",
+ required = true)
+ String filePath;
+ }
+
+ @Override
+ public String keyword() {
+ return "check-compaction-config";
+ }
+
+ @Override
+ public String description() {
+ return "Verifies compaction config within a given file";
+ }
+
+ public static void main(String[] args) throws Exception {
+ new CheckCompactionConfig().execute(args);
+ }
+
+ @Override
+ public void execute(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(keyword(), args);
+
+ if (opts.filePath == null) {
+ throw new IllegalArgumentException("No properties file was given");
+ }
+
+ Path path = Path.of(opts.filePath);
+ if (!path.toFile().exists())
+ throw new FileNotFoundException("File at given path could not be found");
+
+ AccumuloConfiguration config = SiteConfiguration.fromFile(path.toFile()).build();
+ var servicesConfig = new CompactionServicesConfig(config, log::warn);
+ ServiceEnvironment senv = createServiceEnvironment(config);
+
+ Set<String> defaultServices = Set.of(DEFAULT, META, ROOT);
+ if (servicesConfig.getPlanners().keySet().equals(defaultServices)) {
+ log.warn("Only the default compaction services were created - {}", defaultServices);
+ return;
+ }
+
+ for (var entry : servicesConfig.getPlanners().entrySet()) {
+ String serviceId = entry.getKey();
+ String plannerClassName = entry.getValue();
+
+ log.info("Service id: {}, planner class:{}", serviceId, plannerClassName);
+
+ Class<? extends CompactionPlanner> plannerClass =
+ Class.forName(plannerClassName).asSubclass(CompactionPlanner.class);
+ CompactionPlanner planner = plannerClass.getDeclaredConstructor().newInstance();
+
+ var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId),
+ servicesConfig.getOptions().get(serviceId), senv);
+
+ planner.init(initParams);
+
+ initParams.getRequestedExecutors()
+ .forEach((execId, numThreads) -> log.info(
+ "Compaction service '{}' requested creation of thread pool '{}' with {} threads.",
+ serviceId, execId, numThreads));
+
+ initParams.getRequestedExternalExecutors()
+ .forEach(execId -> log.info(
+ "Compaction service '{}' requested with external execution queue '{}'", serviceId,
+ execId));
+
+ }
+
+ log.info("Properties file has passed all checks.");
+ }
+
+ private ServiceEnvironment createServiceEnvironment(AccumuloConfiguration config) {
+ return new ServiceEnvironment() {
+
+ @Override
+ public <T> T instantiate(TableId tableId, String className, Class<T> base) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T instantiate(String className, Class<T> base) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getTableName(TableId tableId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Configuration getConfiguration(TableId tableId) {
+ return new ConfigurationImpl(config);
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return new ConfigurationImpl(config);
+ }
+ };
+ }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
new file mode 100644
index 0000000..01c6fa3
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path not set by user input")
+public class CheckCompactionConfigTest {
+
+ private final static Logger log = LoggerFactory.getLogger(CheckCompactionConfigTest.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @ClassRule
+ public static final TemporaryFolder folder =
+ new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+ @Test
+ public void testValidInput1() throws Exception {
+ String inputString = ("tserver.compaction.major.service.cs1.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n"
+ + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n"
+ + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\"");
+
+ String filePath = writeToFileAndReturnPath(inputString);
+
+ CheckCompactionConfig.main(new String[] {filePath});
+ }
+
+ @Test
+ public void testValidInput2() throws Exception {
+ String inputString = ("tserver.compaction.major.service.cs1.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n"
+ + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n"
+ + "{'name':'large','type':'internal','numThreads':2}] \n"
+ + "tserver.compaction.major.service.cs2.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "tserver.compaction.major.service.cs2.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':7},\\\n"
+ + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':5},\\\n"
+ + "{'name':'large','type':'external','queue':'DCQ1'}]").replaceAll("'", "\"");
+
+ String filePath = writeToFileAndReturnPath(inputString);
+
+ CheckCompactionConfig.main(new String[] {filePath});
+ }
+
+ @Test
+ public void testThrowsExternalNumThreadsError() throws IOException {
+ String inputString = ("tserver.compaction.major.service.cs1.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n"
+ + "{'name':'medium','type':'external','maxSize':'128M','numThreads':4},\\\n"
+ + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\"");
+ String expectedErrorMsg = "'numThreads' should not be specified for external compactions";
+
+ String filePath = writeToFileAndReturnPath(inputString);
+
+ var e = assertThrows(IllegalArgumentException.class,
+ () -> CheckCompactionConfig.main(new String[] {filePath}));
+ assertEquals(e.getMessage(), expectedErrorMsg);
+ }
+
+ @Test
+ public void testNegativeThreadCount() throws IOException {
+ String inputString = ("tserver.compaction.major.service.cs1.planner="
+ + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n"
+ + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n"
+ + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':-4},\\\n"
+ + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\"");
+ String expectedErrorMsg = "Positive number of threads required : -4";
+
+ String filePath = writeToFileAndReturnPath(inputString);
+
+ var e = assertThrows(IllegalArgumentException.class,
+ () -> CheckCompactionConfig.main(new String[] {filePath}));
+ assertEquals(e.getMessage(), expectedErrorMsg);
+ }
+
+ @Test
+ public void testNoPlanner() throws Exception {
+ String inputString = ("tserver.compaction.major.service.cs1.planner.opts.executors=\\\n"
+ + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n"
+ + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n"
+ + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\"");
+ String expectedErrorMsg = "Incomplete compaction service definitions, missing planner class";
+
+ String filePath = writeToFileAndReturnPath(inputString);
+
+ var e = assertThrows(IllegalArgumentException.class,
+ () -> CheckCompactionConfig.main(new String[] {filePath}));
+ assertTrue(e.getMessage().startsWith(expectedErrorMsg));
+ }
+
+ @Test
+ public void testBadPropsFilePath() {
+ String[] args = {"/home/foo/bar/myProperties.properties"};
+ String expectedErrorMsg = "File at given path could not be found";
+ var e = assertThrows(FileNotFoundException.class, () -> CheckCompactionConfig.main(args));
+ assertEquals(expectedErrorMsg, e.getMessage());
+ }
+
+ private String writeToFileAndReturnPath(String inputString) throws IOException {
+ File file = folder.newFile(testName.getMethodName() + ".properties");
+ try (FileWriter fileWriter = new FileWriter(file, UTF_8);
+ BufferedWriter bufferedWriter = new BufferedWriter(fileWriter)) {
+ bufferedWriter.write(inputString);
+ }
+ log.info("Wrote to path: {}\nWith string:\n{}", file.getAbsolutePath(), inputString);
+ return file.getAbsolutePath();
+ }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index f65a7a0..35d9d7d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.tserver.compactions;
+import static org.apache.accumulo.core.util.compaction.CompactionServicesConfig.DEFAULT_SERVICE;
+
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,8 +32,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -39,9 +39,9 @@ import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.spi.compaction.CompactionServices;
-import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.server.ServerContext;
@@ -67,14 +67,12 @@ public class CompactionManager {
private ServerContext context;
- private Config currentCfg;
+ private CompactionServicesConfig currentCfg;
private long lastConfigCheckTime = System.nanoTime();
private CompactionExecutorsMetrics ceMetrics;
- public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default");
-
private String lastDeprecationWarning = "";
private Map<CompactionExecutorId,ExternalCompactionExecutor> externalExecutors;
@@ -91,144 +89,10 @@ public class CompactionManager {
}
}
- private class Config {
- Map<String,String> planners = new HashMap<>();
- Map<String,Long> rateLimits = new HashMap<>();
- Map<String,Map<String,String>> options = new HashMap<>();
- long defaultRateLimit = 0;
-
- @SuppressWarnings("removal")
- private long getDefaultThroughput(AccumuloConfiguration aconf) {
- if (aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true)) {
- return aconf.getAsBytes(Property.TSERV_MAJC_THROUGHPUT);
- }
-
- return ConfigurationTypeHelper
- .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
- }
-
- @SuppressWarnings("removal")
- private Map<String,String> getConfiguration(AccumuloConfiguration aconf) {
-
- Map<String,String> configs =
- aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX);
-
- // check if deprecated properties for compaction executor are set
- if (aconf.isPropertySet(Property.TSERV_MAJC_MAXCONCURRENT, true)) {
-
- String defaultServicePrefix =
- Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + DEFAULT_SERVICE.canonical() + ".";
-
- // check if any properties for the default compaction service are set
- boolean defaultServicePropsSet = configs.keySet().stream()
- .filter(key -> key.startsWith(defaultServicePrefix)).map(Property::getPropertyByKey)
- .anyMatch(prop -> prop == null || aconf.isPropertySet(prop, true));
-
- if (defaultServicePropsSet) {
-
- String warning = String.format(
- "The deprecated property %s was set. Properties with the prefix %s "
- + "were also set, which replace the deprecated properties. The deprecated "
- + "property was therefore ignored.",
- Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix);
-
- if (!warning.equals(lastDeprecationWarning)) {
- log.warn(warning);
- lastDeprecationWarning = warning;
- }
- } else {
- String numThreads = aconf.get(Property.TSERV_MAJC_MAXCONCURRENT);
-
- // Its possible a user has configured the others compaction services, but not the default
- // service. In this case want to produce a config with the default service configs
- // overridden using deprecated configs.
-
- HashMap<String,String> configsCopy = new HashMap<>(configs);
-
- Map<String,String> defaultServiceConfigs =
- Map.of(defaultServicePrefix + "planner", DefaultCompactionPlanner.class.getName(),
- defaultServicePrefix + "planner.opts.executors",
- "[{'name':'deprecated', 'numThreads':" + numThreads + "}]");
-
- configsCopy.putAll(defaultServiceConfigs);
-
- String warning = String.format(
- "The deprecated property %s was set. Properties with the prefix %s "
- + "were not set, these should replace the deprecated properties. The old "
- + "properties were automatically mapped to the new properties in process "
- + "creating : %s.",
- Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix,
- defaultServiceConfigs);
-
- if (!warning.equals(lastDeprecationWarning)) {
- log.warn(warning);
- lastDeprecationWarning = warning;
- }
-
- configs = Map.copyOf(configsCopy);
- }
- }
-
- return configs;
-
- }
-
- Config(AccumuloConfiguration aconf) {
- Map<String,String> configs = getConfiguration(aconf);
-
- configs.forEach((prop, val) -> {
-
- var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length());
- String[] tokens = suffix.split("\\.");
- if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) {
- options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val);
- } else if (tokens.length == 2 && tokens[1].equals("planner")) {
- planners.put(tokens[0], val);
- } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) {
- var eprop = Property.getPropertyByKey(prop);
- if (eprop == null || aconf.isPropertySet(eprop, true)
- || !isDeprecatedThroughputSet(aconf)) {
- rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val));
- }
- } else {
- throw new IllegalArgumentException("Malformed compaction service property " + prop);
- }
- });
-
- defaultRateLimit = getDefaultThroughput(aconf);
-
- var diff = Sets.difference(options.keySet(), planners.keySet());
-
- if (!diff.isEmpty()) {
- throw new IllegalArgumentException(
- "Incomplete compaction service definitions, missing planner class " + diff);
- }
-
- }
-
- @SuppressWarnings("removal")
- private boolean isDeprecatedThroughputSet(AccumuloConfiguration aconf) {
- return aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true);
- }
-
- public long getRateLimit(String serviceName) {
- return rateLimits.getOrDefault(serviceName, defaultRateLimit);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof Config) {
- var oc = (Config) o;
- return planners.equals(oc.planners) && options.equals(oc.options)
- && rateLimits.equals(oc.rateLimits);
- }
-
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(planners, options, rateLimits);
+ private void warnAboutDeprecation(String warning) {
+ if (!warning.equals(lastDeprecationWarning)) {
+ log.warn(warning);
+ lastDeprecationWarning = warning;
}
}
@@ -320,7 +184,8 @@ public class CompactionManager {
CompactionExecutorsMetrics ceMetrics) {
this.compactables = compactables;
- this.currentCfg = new Config(context.getConfiguration());
+ this.currentCfg =
+ new CompactionServicesConfig(context.getConfiguration(), this::warnAboutDeprecation);
this.context = context;
@@ -332,16 +197,16 @@ public class CompactionManager {
Map<CompactionServiceId,CompactionService> tmpServices = new HashMap<>();
- currentCfg.planners.forEach((serviceName, plannerClassName) -> {
+ currentCfg.getPlanners().forEach((serviceName, plannerClassName) -> {
try {
tmpServices.put(CompactionServiceId.of(serviceName),
new CompactionService(serviceName, plannerClassName,
currentCfg.getRateLimit(serviceName),
- currentCfg.options.getOrDefault(serviceName, Map.of()), context, ceMetrics,
+ currentCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics,
this::getExternalExecutor));
} catch (RuntimeException e) {
log.error("Failed to create compaction service {} with planner:{} options:{}", serviceName,
- plannerClassName, currentCfg.options.getOrDefault(serviceName, Map.of()), e);
+ plannerClassName, currentCfg.getOptions().getOrDefault(serviceName, Map.of()), e);
}
});
@@ -367,12 +232,13 @@ public class CompactionManager {
lastConfigCheckTime = System.nanoTime();
- var tmpCfg = new Config(context.getConfiguration());
+ var tmpCfg =
+ new CompactionServicesConfig(context.getConfiguration(), this::warnAboutDeprecation);
if (!currentCfg.equals(tmpCfg)) {
Map<CompactionServiceId,CompactionService> tmpServices = new HashMap<>();
- tmpCfg.planners.forEach((serviceName, plannerClassName) -> {
+ tmpCfg.getPlanners().forEach((serviceName, plannerClassName) -> {
try {
var csid = CompactionServiceId.of(serviceName);
@@ -381,22 +247,22 @@ public class CompactionManager {
tmpServices.put(csid,
new CompactionService(serviceName, plannerClassName,
tmpCfg.getRateLimit(serviceName),
- tmpCfg.options.getOrDefault(serviceName, Map.of()), context, ceMetrics,
+ tmpCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics,
this::getExternalExecutor));
} else {
service.configurationChanged(plannerClassName, tmpCfg.getRateLimit(serviceName),
- tmpCfg.options.getOrDefault(serviceName, Map.of()));
+ tmpCfg.getOptions().getOrDefault(serviceName, Map.of()));
tmpServices.put(csid, service);
}
} catch (RuntimeException e) {
throw new RuntimeException("Failed to create or update compaction service "
+ serviceName + " with planner:" + plannerClassName + " options:"
- + tmpCfg.options.getOrDefault(serviceName, Map.of()), e);
+ + tmpCfg.getOptions().getOrDefault(serviceName, Map.of()), e);
}
});
var deletedServices =
- Sets.difference(currentCfg.planners.keySet(), tmpCfg.planners.keySet());
+ Sets.difference(currentCfg.getPlanners().keySet(), tmpCfg.getPlanners().keySet());
for (String serviceName : deletedServices) {
services.get(CompactionServiceId.of(serviceName)).stop();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index df59d23..0829f26 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -41,7 +41,6 @@ import java.util.function.Function;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
-import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
@@ -52,9 +51,9 @@ import org.apache.accumulo.core.spi.compaction.CompactionPlan;
import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
import org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
-import org.apache.accumulo.core.spi.compaction.ExecutorManager;
import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -88,59 +87,6 @@ public class CompactionService {
private static final Logger log = LoggerFactory.getLogger(CompactionService.class);
- private class CpInitParams implements CompactionPlanner.InitParameters {
-
- private final Map<String,String> plannerOpts;
- private final Map<CompactionExecutorId,Integer> requestedExecutors;
- private final Set<CompactionExecutorId> requestedExternalExecutors;
- private final ServiceEnvironment senv = new ServiceEnvironmentImpl(context);
-
- CpInitParams(Map<String,String> plannerOpts) {
- this.plannerOpts = plannerOpts;
- this.requestedExecutors = new HashMap<>();
- this.requestedExternalExecutors = new HashSet<>();
- }
-
- @Override
- public ServiceEnvironment getServiceEnvironment() {
- return senv;
- }
-
- @Override
- public Map<String,String> getOptions() {
- return plannerOpts;
- }
-
- @Override
- public String getFullyQualifiedOption(String key) {
- return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + myId + ".opts." + key;
- }
-
- @Override
- public ExecutorManager getExecutorManager() {
- return new ExecutorManager() {
- @Override
- public CompactionExecutorId createExecutor(String executorName, int threads) {
- Preconditions.checkArgument(threads > 0, "Positive number of threads required : %s",
- threads);
- var ceid = CompactionExecutorIdImpl.internalId(myId, executorName);
- Preconditions.checkState(!requestedExecutors.containsKey(ceid));
- requestedExecutors.put(ceid, threads);
- return ceid;
- }
-
- @Override
- public CompactionExecutorId getExternalExecutor(String name) {
- var ceid = CompactionExecutorIdImpl.externalId(name);
- Preconditions.checkArgument(!requestedExternalExecutors.contains(ceid),
- "Duplicate external executor for queue " + name);
- requestedExternalExecutors.add(ceid);
- return ceid;
- }
- };
- }
- }
-
public CompactionService(String serviceName, String plannerClass, Long maxRate,
Map<String,String> plannerOptions, ServerContext context,
CompactionExecutorsMetrics ceMetrics,
@@ -155,7 +101,8 @@ public class CompactionService {
this.ceMetrics = ceMetrics;
this.externExecutorSupplier = externExecutorSupplier;
- var initParams = new CpInitParams(plannerOpts);
+ var initParams =
+ new CompactionPlannerInitParams(myId, plannerOpts, new ServiceEnvironmentImpl(context));
planner = createPlanner(plannerClass);
planner.init(initParams);
@@ -168,12 +115,12 @@ public class CompactionService {
this.writeLimiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration())
.create("CS_" + serviceName + "_write", () -> rateLimit.get());
- initParams.requestedExecutors.forEach((ceid, numThreads) -> {
+ initParams.getRequestedExecutors().forEach((ceid, numThreads) -> {
tmpExecutors.put(ceid,
new InternalCompactionExecutor(ceid, numThreads, ceMetrics, readLimiter, writeLimiter));
});
- initParams.requestedExternalExecutors.forEach((ceid) -> {
+ initParams.getRequestedExternalExecutors().forEach((ceid) -> {
tmpExecutors.put(ceid, externExecutorSupplier.apply(ceid));
});
@@ -415,13 +362,14 @@ public class CompactionService {
if (this.plannerClassName.equals(plannerClassName) && this.plannerOpts.equals(plannerOptions))
return;
- var initParams = new CpInitParams(plannerOptions);
+ var initParams =
+ new CompactionPlannerInitParams(myId, plannerOptions, new ServiceEnvironmentImpl(context));
var tmpPlanner = createPlanner(plannerClassName);
tmpPlanner.init(initParams);
Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new HashMap<>();
- initParams.requestedExecutors.forEach((ceid, numThreads) -> {
+ initParams.getRequestedExecutors().forEach((ceid, numThreads) -> {
InternalCompactionExecutor executor = (InternalCompactionExecutor) executors.get(ceid);
if (executor == null) {
executor =
@@ -432,7 +380,7 @@ public class CompactionService {
tmpExecutors.put(ceid, executor);
});
- initParams.requestedExternalExecutors.forEach(ceid -> {
+ initParams.getRequestedExternalExecutors().forEach(ceid -> {
ExternalCompactionExecutor executor = (ExternalCompactionExecutor) executors.get(ceid);
if (executor == null) {
executor = externExecutorSupplier.apply(ceid);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 092a543..b2b1eed 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -66,6 +66,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionServices;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.compaction.CompactionStats;
@@ -1468,8 +1469,8 @@ public class CompactableImpl implements Compactable {
return dispatch.getService();
} catch (RuntimeException e) {
log.error("Failed to dispatch compaction {} kind:{} hints:{}, falling back to {} service.",
- getExtent(), kind, debugHints, CompactionManager.DEFAULT_SERVICE, e);
- return CompactionManager.DEFAULT_SERVICE;
+ getExtent(), kind, debugHints, CompactionServicesConfig.DEFAULT_SERVICE, e);
+ return CompactionServicesConfig.DEFAULT_SERVICE;
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
index 708632b..6b69f82 100644
--- a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.minicluster.MiniAccumuloRunner;
import org.apache.accumulo.miniclusterImpl.MiniClusterExecutable;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.monitor.MonitorExecutable;
+import org.apache.accumulo.server.conf.CheckCompactionConfig;
import org.apache.accumulo.server.conf.CheckServerConfig;
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.util.Admin;
@@ -106,6 +107,7 @@ public class KeywordStartIT {
assumeTrue(new File(System.getProperty("user.dir") + "/src").exists());
TreeMap<String,Class<? extends KeywordExecutable>> expectSet = new TreeMap<>();
expectSet.put("admin", Admin.class);
+ expectSet.put("check-compaction-config", CheckCompactionConfig.class);
expectSet.put("check-server-config", CheckServerConfig.class);
expectSet.put("compaction-coordinator", CoordinatorExecutable.class);
expectSet.put("compactor", CompactorExecutable.class);
@@ -167,6 +169,7 @@ public class KeywordStartIT {
HashSet<Class<?>> expectSet = new HashSet<>();
expectSet.add(Admin.class);
+ expectSet.add(CheckCompactionConfig.class);
expectSet.add(CreateToken.class);
expectSet.add(Info.class);
expectSet.add(Initialize.class);
@@ -190,12 +193,11 @@ public class KeywordStartIT {
private static boolean hasMain(Class<?> classToCheck) {
Method main;
try {
- main = classToCheck.getMethod("main", new String[0].getClass());
+ main = classToCheck.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
return false;
}
- return main != null && Modifier.isPublic(main.getModifiers())
- && Modifier.isStatic(main.getModifiers());
+ return Modifier.isPublic(main.getModifiers()) && Modifier.isStatic(main.getModifiers());
}
private static class NoOp implements KeywordExecutable {