You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/06/20 21:20:18 UTC
[accumulo] branch elasticity updated: Start Compactors when starting MiniAccumuloCluster (#3484)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 9bc5cafc81 Start Compactors when starting MiniAccumuloCluster (#3484)
9bc5cafc81 is described below
commit 9bc5cafc81048a5af7aa84110ffe6e6e1bb64137
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Jun 20 17:20:12 2023 -0400
Start Compactors when starting MiniAccumuloCluster (#3484)
This change sets the default compaction planner opts
for MiniAccumuloCluster to run all major compactions
using external compactions and starts the Compactors.
I also added a process summary at the end of the startup
to confirm that a Compactor was running
---
.../spi/compaction/DefaultCompactionPlanner.java | 43 ++++++++-
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 101 +++++++++++++++++++++
.../miniclusterImpl/MiniAccumuloConfigImpl.java | 5 +
3 files changed, 148 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index 11feb5c60e..15a9263acf 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -110,12 +110,53 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class DefaultCompactionPlanner implements CompactionPlanner {
- private static class ExecutorConfig {
+ public static class ExecutorConfig {
String type;
String name;
String maxSize;
Integer numThreads;
String queue;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getMaxSize() {
+ return maxSize;
+ }
+
+ public void setMaxSize(String maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public Integer getNumThreads() {
+ return numThreads;
+ }
+
+ public void setNumThreads(Integer numThreads) {
+ this.numThreads = numThreads;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
}
private static class Executor {
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 2139c8e1e9..25edd7cd8b 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -30,6 +30,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
@@ -57,10 +58,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.compactor.Compactor;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
@@ -79,13 +83,19 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.manager.state.SetGoalState;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerDirs;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.util.AccumuloStatus;
@@ -614,8 +624,65 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
executor = Executors.newSingleThreadExecutor();
}
+ Set<String> queues;
+ try {
+ queues = getCompactionQueueNames();
+ if (queues.isEmpty()) {
+ throw new IllegalStateException("No Compactor queues configured.");
+ }
+ for (String name : queues) {
+ control.startCompactors(Compactor.class, getConfig().getNumCompactors(), name);
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Unable to find declared CompactionPlanner class", e);
+ }
+
verifyUp();
+ printProcessSummary();
+
+ }
+
+ private void printProcessSummary() {
+ log.info("Process Summary:");
+ getProcesses().forEach((k, v) -> log.info("{}: {}", k,
+ v.stream().map((pr) -> pr.getProcess().pid()).collect(Collectors.toList())));
+ }
+
+ private Set<String> getCompactionQueueNames() throws ClassNotFoundException {
+
+ Set<String> queueNames = new HashSet<>();
+ AccumuloConfiguration aconf = new ConfigurationCopy(config.getSiteConfig());
+ CompactionServicesConfig csc = new CompactionServicesConfig(aconf);
+ ServiceEnvironment senv = new ServiceEnvironmentImpl(getServerContext());
+
+ for (var entry : csc.getPlanners().entrySet()) {
+ String serviceId = entry.getKey();
+ String plannerClass = entry.getValue();
+
+ @SuppressWarnings("unchecked")
+ Class<CompactionPlanner> cpClass = (Class<CompactionPlanner>) ClassLoaderUtil
+ .loadClass(plannerClass, CompactionPlanner.class);
+ try {
+ CompactionPlanner cp = cpClass.getDeclaredConstructor().newInstance();
+ var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId),
+ csc.getOptions().get(serviceId), senv);
+ cp.init(initParams);
+ initParams.getRequestedExternalExecutors().forEach(ceid -> {
+ String id = ceid.canonical();
+ if (id.startsWith("e.")) {
+ queueNames.add(id.substring(2));
+ } else {
+ queueNames.add(id);
+ }
+ });
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException | NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(
+ "Error creating instance of " + plannerClass + " with no-arg constructor", e);
+ }
+ }
+ return queueNames;
}
// wait up to 10 seconds for the process to start
@@ -647,6 +714,13 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
waitForProcessStart(tsp, "TabletServer" + tsExpectedCount);
}
+ int ecExpectedCount = 0;
+ for (Process ecp : getClusterControl().compactorProcesses) {
+ ecExpectedCount++;
+ requireNonNull(ecp, "Error starting compactor " + ecExpectedCount + " - no process");
+ waitForProcessStart(ecp, "Compactor" + ecExpectedCount);
+ }
+
try (ZooKeeper zk = new ZooKeeper(getZooKeepers(), 60000, event -> log.warn("{}", event))) {
String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
@@ -727,6 +801,24 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
throw new IllegalStateException("Unable to read TServer information from zookeeper.", e);
}
+ int ecActualCount = 0;
+ try {
+ while (ecActualCount < ecExpectedCount) {
+ ecActualCount = 0;
+ for (String child : zk.getChildren(rootPath + Constants.ZCOMPACTORS, null)) {
+ if (zk.getChildren(rootPath + Constants.ZCOMPACTORS + "/" + child, null).isEmpty()) {
+ log.info("Compactor " + ecActualCount + " not yet present in ZooKeeper");
+ } else {
+ ecActualCount++;
+ log.info("Compactor " + ecActualCount + " present in ZooKeeper");
+ }
+ }
+ Thread.sleep(500);
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Unable to read Compactor information from zookeeper.", e);
+ }
+
try {
while (zk.getChildren(rootPath + Constants.ZMANAGER_LOCK, null).isEmpty()) {
log.info("Manager not yet present in ZooKeeper");
@@ -771,12 +863,21 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
result.put(ServerType.MANAGER, references(control.managerProcess));
result.put(ServerType.TABLET_SERVER,
references(control.tabletServerProcesses.toArray(new Process[0])));
+ result.put(ServerType.COMPACTOR,
+ references(control.compactorProcesses.toArray(new Process[0])));
+ if (control.scanServerProcesses != null) {
+ result.put(ServerType.SCAN_SERVER,
+ references(control.scanServerProcesses.toArray(new Process[0])));
+ }
if (control.zooKeeperProcess != null) {
result.put(ServerType.ZOOKEEPER, references(control.zooKeeperProcess));
}
if (control.gcProcess != null) {
result.put(ServerType.GARBAGE_COLLECTOR, references(control.gcProcess));
}
+ if (control.monitor != null) {
+ result.put(ServerType.MONITOR, references(control.monitor));
+ }
return result;
}
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index efa6343868..ee5a0c4724 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -164,6 +164,11 @@ public class MiniAccumuloConfigImpl {
mergePropWithRandomPort(Property.MONITOR_PORT.getKey());
mergePropWithRandomPort(Property.GC_PORT.getKey());
+ mergeProp(Property.TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(),
+ Property.TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue());
+ mergeProp("tserver.compaction.major.service.default.planner.opts.executors",
+ "[{\"name\":\"all\",\"type\":\"external\",\"queue\":\"defaultQueue\"}]");
+
if (isUseCredentialProvider()) {
updateConfigForCredentialProvider();
}