You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/06/20 21:20:18 UTC

[accumulo] branch elasticity updated: Start Compactors when starting MiniAccumuloCluster (#3484)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 9bc5cafc81 Start Compactors when starting MiniAccumuloCluster (#3484)
9bc5cafc81 is described below

commit 9bc5cafc81048a5af7aa84110ffe6e6e1bb64137
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Jun 20 17:20:12 2023 -0400

    Start Compactors when starting MiniAccumuloCluster (#3484)
    
    This change sets the default compaction planner opts
    for MiniAccumuloCluster to run all major compactions
    using external compactions and starts the Compactors.
    I also added a process summary at the end of the startup
    to confirm that a Compactor was running
---
 .../spi/compaction/DefaultCompactionPlanner.java   |  43 ++++++++-
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   | 101 +++++++++++++++++++++
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    |   5 +
 3 files changed, 148 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index 11feb5c60e..15a9263acf 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -110,12 +110,53 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 public class DefaultCompactionPlanner implements CompactionPlanner {
 
-  private static class ExecutorConfig {
+  public static class ExecutorConfig {
     String type;
     String name;
     String maxSize;
     Integer numThreads;
     String queue;
+
+    public String getType() {
+      return type;
+    }
+
+    public void setType(String type) {
+      this.type = type;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    public String getMaxSize() {
+      return maxSize;
+    }
+
+    public void setMaxSize(String maxSize) {
+      this.maxSize = maxSize;
+    }
+
+    public Integer getNumThreads() {
+      return numThreads;
+    }
+
+    public void setNumThreads(Integer numThreads) {
+      this.numThreads = numThreads;
+    }
+
+    public String getQueue() {
+      return queue;
+    }
+
+    public void setQueue(String queue) {
+      this.queue = queue;
+    }
+
   }
 
   private static class Executor {
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 2139c8e1e9..25edd7cd8b 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -30,6 +30,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URI;
@@ -57,10 +58,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -79,13 +83,19 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 import org.apache.accumulo.manager.state.SetGoalState;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerDirs;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.init.Initialize;
 import org.apache.accumulo.server.util.AccumuloStatus;
@@ -614,8 +624,65 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
       executor = Executors.newSingleThreadExecutor();
     }
 
+    Set<String> queues;
+    try {
+      queues = getCompactionQueueNames();
+      if (queues.isEmpty()) {
+        throw new IllegalStateException("No Compactor queues configured.");
+      }
+      for (String name : queues) {
+        control.startCompactors(Compactor.class, getConfig().getNumCompactors(), name);
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Unable to find declared CompactionPlanner class", e);
+    }
+
     verifyUp();
 
+    printProcessSummary();
+
+  }
+
+  private void printProcessSummary() {
+    log.info("Process Summary:");
+    getProcesses().forEach((k, v) -> log.info("{}: {}", k,
+        v.stream().map((pr) -> pr.getProcess().pid()).collect(Collectors.toList())));
+  }
+
+  private Set<String> getCompactionQueueNames() throws ClassNotFoundException {
+
+    Set<String> queueNames = new HashSet<>();
+    AccumuloConfiguration aconf = new ConfigurationCopy(config.getSiteConfig());
+    CompactionServicesConfig csc = new CompactionServicesConfig(aconf);
+    ServiceEnvironment senv = new ServiceEnvironmentImpl(getServerContext());
+
+    for (var entry : csc.getPlanners().entrySet()) {
+      String serviceId = entry.getKey();
+      String plannerClass = entry.getValue();
+
+      @SuppressWarnings("unchecked")
+      Class<CompactionPlanner> cpClass = (Class<CompactionPlanner>) ClassLoaderUtil
+          .loadClass(plannerClass, CompactionPlanner.class);
+      try {
+        CompactionPlanner cp = cpClass.getDeclaredConstructor().newInstance();
+        var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId),
+            csc.getOptions().get(serviceId), senv);
+        cp.init(initParams);
+        initParams.getRequestedExternalExecutors().forEach(ceid -> {
+          String id = ceid.canonical();
+          if (id.startsWith("e.")) {
+            queueNames.add(id.substring(2));
+          } else {
+            queueNames.add(id);
+          }
+        });
+      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
+          | InvocationTargetException | NoSuchMethodException | SecurityException e) {
+        throw new RuntimeException(
+            "Error creating instance of " + plannerClass + " with no-arg constructor", e);
+      }
+    }
+    return queueNames;
   }
 
   // wait up to 10 seconds for the process to start
@@ -647,6 +714,13 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
       waitForProcessStart(tsp, "TabletServer" + tsExpectedCount);
     }
 
+    int ecExpectedCount = 0;
+    for (Process ecp : getClusterControl().compactorProcesses) {
+      ecExpectedCount++;
+      requireNonNull(ecp, "Error starting compactor " + ecExpectedCount + " - no process");
+      waitForProcessStart(ecp, "Compactor" + ecExpectedCount);
+    }
+
     try (ZooKeeper zk = new ZooKeeper(getZooKeepers(), 60000, event -> log.warn("{}", event))) {
 
       String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
@@ -727,6 +801,24 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
         throw new IllegalStateException("Unable to read TServer information from zookeeper.", e);
       }
 
+      int ecActualCount = 0;
+      try {
+        while (ecActualCount < ecExpectedCount) {
+          ecActualCount = 0;
+          for (String child : zk.getChildren(rootPath + Constants.ZCOMPACTORS, null)) {
+            if (zk.getChildren(rootPath + Constants.ZCOMPACTORS + "/" + child, null).isEmpty()) {
+              log.info("Compactor " + ecActualCount + " not yet present in ZooKeeper");
+            } else {
+              ecActualCount++;
+              log.info("Compactor " + ecActualCount + " present in ZooKeeper");
+            }
+          }
+          Thread.sleep(500);
+        }
+      } catch (KeeperException e) {
+        throw new IllegalStateException("Unable to read Compactor information from zookeeper.", e);
+      }
+
       try {
         while (zk.getChildren(rootPath + Constants.ZMANAGER_LOCK, null).isEmpty()) {
           log.info("Manager not yet present in ZooKeeper");
@@ -771,12 +863,21 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
     result.put(ServerType.MANAGER, references(control.managerProcess));
     result.put(ServerType.TABLET_SERVER,
         references(control.tabletServerProcesses.toArray(new Process[0])));
+    result.put(ServerType.COMPACTOR,
+        references(control.compactorProcesses.toArray(new Process[0])));
+    if (control.scanServerProcesses != null) {
+      result.put(ServerType.SCAN_SERVER,
+          references(control.scanServerProcesses.toArray(new Process[0])));
+    }
     if (control.zooKeeperProcess != null) {
       result.put(ServerType.ZOOKEEPER, references(control.zooKeeperProcess));
     }
     if (control.gcProcess != null) {
       result.put(ServerType.GARBAGE_COLLECTOR, references(control.gcProcess));
     }
+    if (control.monitor != null) {
+      result.put(ServerType.MONITOR, references(control.monitor));
+    }
     return result;
   }
 
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index efa6343868..ee5a0c4724 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -164,6 +164,11 @@ public class MiniAccumuloConfigImpl {
       mergePropWithRandomPort(Property.MONITOR_PORT.getKey());
       mergePropWithRandomPort(Property.GC_PORT.getKey());
 
+      mergeProp(Property.TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(),
+          Property.TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue());
+      mergeProp("tserver.compaction.major.service.default.planner.opts.executors",
+          "[{\"name\":\"all\",\"type\":\"external\",\"queue\":\"defaultQueue\"}]");
+
       if (isUseCredentialProvider()) {
         updateConfigForCredentialProvider();
       }