You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2024/04/05 15:09:30 UTC

(accumulo) branch elasticity updated (637dd0fd3f -> eea8ce48e2)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a change to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


    from 637dd0fd3f Removed elasticity comment to do in ExternalCompaction_2_IT (#4416)
     add 9d4d68b2a3 Changed return variable for ExternalCompactionUtil.getCompactorAddrs (#4419)
     new eea8ce48e2 Merge branch 'main' into elasticity

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/accumulo/core/clientImpl/InstanceOperationsImpl.java   | 2 +-
 .../accumulo/core/util/compaction/ExternalCompactionUtil.java     | 7 ++++---
 .../src/main/java/org/apache/accumulo/monitor/Monitor.java        | 2 +-
 .../monitor/rest/compactions/external/CoordinatorInfo.java        | 4 ++--
 .../monitor/rest/compactions/external/ExternalCompactionInfo.java | 8 ++++----
 .../org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java  | 6 ++++--
 6 files changed, 16 insertions(+), 13 deletions(-)


(accumulo) 01/01: Merge branch 'main' into elasticity

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit eea8ce48e2d5d7bd5c6b69e05c6824f697d8de0a
Merge: 637dd0fd3f 9d4d68b2a3
Author: Dave Marion <dl...@apache.org>
AuthorDate: Fri Apr 5 15:09:17 2024 +0000

    Merge branch 'main' into elasticity

 .../apache/accumulo/core/clientImpl/InstanceOperationsImpl.java   | 2 +-
 .../accumulo/core/util/compaction/ExternalCompactionUtil.java     | 7 ++++---
 .../src/main/java/org/apache/accumulo/monitor/Monitor.java        | 2 +-
 .../monitor/rest/compactions/external/CoordinatorInfo.java        | 4 ++--
 .../monitor/rest/compactions/external/ExternalCompactionInfo.java | 8 ++++----
 .../org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java  | 6 ++++--
 6 files changed, 16 insertions(+), 13 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 86181247a8,0046af7dc6..48895192bd
--- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@@ -105,26 -106,26 +106,26 @@@ public class ExternalCompactionUtil 
    }
  
    /**
 -   * @return map of queue names to compactor addresses
 +   * @return map of group names to compactor addresses
     */
-   public static Map<String,List<HostAndPort>> getCompactorAddrs(ClientContext context) {
+   public static Map<String,Set<HostAndPort>> getCompactorAddrs(ClientContext context) {
      try {
-       final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
 -      final Map<String,Set<HostAndPort>> queuesAndAddresses = new HashMap<>();
 -      final String compactorQueuesPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
++      final Map<String,Set<HostAndPort>> groupsAndAddresses = new HashMap<>();
 +      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
        ZooReader zooReader = context.getZooReader();
 -      List<String> queues = zooReader.getChildren(compactorQueuesPath);
 -      for (String queue : queues) {
 -        queuesAndAddresses.putIfAbsent(queue, new HashSet<>());
 +      List<String> groups = zooReader.getChildren(compactorGroupsPath);
 +      for (String group : groups) {
          try {
 -          List<String> compactors = zooReader.getChildren(compactorQueuesPath + "/" + queue);
 +          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
            for (String compactor : compactors) {
              // compactor is the address, we are checking to see if there is a child node which
              // represents the compactor's lock as a check that it's alive.
              List<String> children =
 -                zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" + compactor);
 +                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
              if (!children.isEmpty()) {
                LOG.trace("Found live compactor {} ", compactor);
-               groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
 -              queuesAndAddresses.get(queue).add(HostAndPort.fromString(compactor));
++              groupsAndAddresses.putIfAbsent(group, new HashSet<>());
 +              groupsAndAddresses.get(group).add(HostAndPort.fromString(compactor));
              }
            }
          } catch (NoNodeException e) {
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
index d17ce19f18,8724f758bb..eccda4569e
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
@@@ -33,9 -33,9 +33,9 @@@ public class CoordinatorInfo 
  
    public CoordinatorInfo(Optional<HostAndPort> serverOpt, ExternalCompactionInfo ecInfo) {
      server = serverOpt.map(HostAndPort::toString).orElse("none");
 -    var queueToCompactors = ecInfo.getCompactors();
 -    numQueues = queueToCompactors.size();
 -    numCompactors = queueToCompactors.values().stream().mapToInt(Set::size).sum();
 +    var groupToCompactors = ecInfo.getCompactors();
 +    numQueues = groupToCompactors.size();
-     numCompactors = groupToCompactors.values().stream().mapToInt(List::size).sum();
++    numCompactors = groupToCompactors.values().stream().mapToInt(Set::size).sum();
      lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis();
    }
  }
diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index 03f7442c91,771d74d588..637a71eca8
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@@ -22,10 -22,9 +22,12 @@@ import static org.apache.accumulo.test.
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertNull;
  import static org.junit.jupiter.api.Assertions.assertTrue;
 +import static org.junit.jupiter.api.Assertions.fail;
  
++import java.util.ArrayList;
  import java.util.List;
  import java.util.Map;
++import java.util.Set;
  import java.util.concurrent.atomic.AtomicReference;
  import java.util.concurrent.atomic.DoubleAdder;
  
@@@ -133,37 -121,6 +135,37 @@@ public class MemoryStarvedMajCIT extend
      String table = getUniqueNames(1)[0];
      try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
  
 +      ClientContext ctx = (ClientContext) client;
 +
 +      // Kill the normal compactors and wait until their addresses in ZK are cleared
 +      getCluster().getConfig().getClusterServerConfiguration().getCompactorConfiguration().keySet()
 +          .forEach(resourceGroup -> {
 +            List<Process> procs = getCluster().getClusterControl().getCompactors(resourceGroup);
 +            for (int i = 0; i < procs.size(); i++) {
 +              LOG.info("Stopping compactor process: {}", procs.get(i).pid());
 +              try {
 +                procs.get(i).destroyForcibly().waitFor();
 +              } catch (InterruptedException e) {
 +                fail("Interrupted trying to stop compactor process");
 +              }
 +            }
 +            getCluster().getClusterControl().getCompactors(resourceGroup).clear();
 +          });
 +      Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 0, 60_000);
 +
 +      // Start the Compactors that will consume and free memory when we need it to
 +      getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
 +          MemoryConsumingCompactor.class);
 +      Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 1, 60_000);
 +      Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx)
 +          .get(Constants.DEFAULT_RESOURCE_GROUP_NAME).size() == 1, 60_000);
 +
-       Map<String,List<HostAndPort>> groupedCompactors =
++      Map<String,Set<HostAndPort>> groupedCompactors =
 +          ExternalCompactionUtil.getCompactorAddrs(ctx);
 +      List<HostAndPort> compactorAddresses =
-           groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME);
++          new ArrayList<>(groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME));
 +      HostAndPort compactorAddr = compactorAddresses.get(0);
 +
        TableOperations to = client.tableOperations();
        to.create(table);