You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/11/06 18:18:33 UTC

(accumulo) branch elasticity updated: updates delete table to use conditional muations (#3929)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new a131e97052 updates delete table to use conditional muations (#3929)
a131e97052 is described below

commit a131e9705293ef7166cfce3fbdae9f40201430b8
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon Nov 6 13:18:27 2023 -0500

    updates delete table to use conditional muations (#3929)
    
    Updates the delete table fate operation to use conditional mutations.
    Also introduces a new mechanism in Ample for processing conditional
    mutations that does not buffer everything in memory.  This avoid
    buffering large amounts of tablet metadata in memory when deleting a
    large table.  The IT that creates 1 million splits in a table and
    deletes it passes with these changes.
---
 .../accumulo/core/metadata/schema/Ample.java       |  43 ++++++--
 .../AsyncConditionalTabletsMutatorImpl.java        |  98 ++++++++++++++++++
 .../accumulo/server/metadata/ServerAmpleImpl.java  |   7 ++
 .../accumulo/manager/tableOps/delete/CleanUp.java  |  31 +-----
 .../manager/tableOps/delete/DeleteTable.java       |   2 +-
 .../manager/tableOps/delete/PreDeleteTable.java    |   1 +
 .../manager/tableOps/delete/ReserveTablets.java    | 111 +++++++++++++++++++++
 .../test/functional/AmpleConditionalWriterIT.java  |  77 ++++++++++++++
 8 files changed, 332 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index a23e6da08e..44f1715775 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
@@ -209,7 +210,10 @@ public interface Ample {
   }
 
   /**
-   * An entry point for updating tablets metadata using a conditional writer.
+   * An entry point for updating tablets metadata using a conditional writer. The returned mutator
+   * will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called.
+   * If buffering everything in memory is undesirable, then consider using
+   * {@link #conditionallyMutateTablets(BiConsumer)}
    *
    * @see ConditionalTabletMutator#submit(RejectionHandler)
    */
@@ -217,6 +221,26 @@ public interface Ample {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * An entry point for updating tablets metadata using a conditional writer asynchronously. This
+   * will process conditional mutations in the background as they are added. The benefit of this
+   * method over {@link #conditionallyMutateTablets()} is that it can avoid buffering everything in
+   * memory. Using this method may also be faster as it allows tablet metadata scans and conditional
+   * updates of tablets to run concurrently.
+   *
+   * @param resultsConsumer as conditional mutations are processed in the background their result is
+   *        passed to this consumer. This consumer should be thread safe as it may be called from a
+   *        different thread.
+   * @return A conditional tablet mutator that will asynchronously report results. Closing this
+   *         object will force everything to be processed and reported. The returned object is not
+   *         thread safe and is only intended to be used by a single thread.
+   * @see ConditionalTabletMutator#submit(RejectionHandler)
+   */
+  default AsyncConditionalTabletsMutator
+      conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
+    throw new UnsupportedOperationException();
+  }
+
   default void putGcCandidates(TableId tableId, Collection<StoredTabletFile> candidates) {
     throw new UnsupportedOperationException();
   }
@@ -264,7 +288,7 @@ public interface Ample {
     void close();
   }
 
-  public interface ConditionalResult {
+  interface ConditionalResult {
 
     /**
      * This enum was created instead of using {@link ConditionalWriter.Status} because Ample has
@@ -292,14 +316,22 @@ public interface Ample {
     TabletMetadata readMetadata();
   }
 
-  public interface ConditionalTabletsMutator extends AutoCloseable {
-
+  interface AsyncConditionalTabletsMutator extends AutoCloseable {
     /**
      * @return A fluent interface to conditional mutating a tablet. Ensure you call
      *         {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished.
      */
     OperationRequirements mutateTablet(KeyExtent extent);
 
+    /**
+     * Closing ensures that all mutations are processed and their results are reported.
+     */
+    @Override
+    void close();
+  }
+
+  interface ConditionalTabletsMutator extends AsyncConditionalTabletsMutator {
+
     /**
      * After creating one or more conditional mutations using {@link #mutateTablet(KeyExtent)}, call
      * this method to process them using a {@link ConditionalWriter}
@@ -307,9 +339,6 @@ public interface Ample {
      * @return The result from the {@link ConditionalWriter} of processing each tablet.
      */
     Map<KeyExtent,ConditionalResult> process();
-
-    @Override
-    void close();
   }
 
   /**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
new file mode 100644
index 0000000000..0d31f71eef
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metadata;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.server.ServerContext;
+
+public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator {
+  private final BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer;
+  private final ExecutorService executor;
+  private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing = null;
+  private ConditionalTabletsMutatorImpl bufferingMutator;
+  private final ServerContext context;
+  private long mutatedTablets = 0;
+  public static final int BATCH_SIZE = 1000;
+
+  AsyncConditionalTabletsMutatorImpl(ServerContext context,
+      BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer) {
+    this.resultsConsumer = Objects.requireNonNull(resultsConsumer);
+    this.bufferingMutator = new ConditionalTabletsMutatorImpl(context);
+    this.context = context;
+    var creatorId = Thread.currentThread().getId();
+    this.executor = Executors.newSingleThreadExecutor(runnable -> Threads.createThread(
+        "Async conditional tablets mutator background thread, created by : #" + creatorId,
+        runnable));
+
+  }
+
+  @Override
+  public Ample.OperationRequirements mutateTablet(KeyExtent extent) {
+    if (mutatedTablets > BATCH_SIZE) {
+      if (backgroundProcessing != null) {
+        // a previous batch of mutations was submitted for processing so wait on it.
+        try {
+          backgroundProcessing.get().forEach(resultsConsumer);
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+
+      // Spin up processing of the mutations submitted so far in a background thread. Must copy the
+      // reference for the background thread because a new one is about to be created.
+      var bufferingMutatorRef = bufferingMutator;
+      backgroundProcessing = executor.submit(() -> {
+        var result = bufferingMutatorRef.process();
+        bufferingMutatorRef.close();
+        return result;
+      });
+
+      bufferingMutator = new ConditionalTabletsMutatorImpl(context);
+      mutatedTablets = 0;
+    }
+    mutatedTablets++;
+    return bufferingMutator.mutateTablet(extent);
+  }
+
+  @Override
+  public void close() {
+    if (backgroundProcessing != null) {
+      // a previous batch of mutations was submitted for processing so wait on it.
+      try {
+        backgroundProcessing.get().forEach(resultsConsumer);
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+    // process anything not processed so far
+    bufferingMutator.process().forEach(resultsConsumer);
+    bufferingMutator.close();
+    executor.shutdownNow();
+  }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index f8d916e159..a42bc5f488 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -93,6 +94,12 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
     return new ConditionalTabletsMutatorImpl(context);
   }
 
+  @Override
+  public AsyncConditionalTabletsMutator
+      conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
+    return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer);
+  }
+
   private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
     String zpath = context.getZooKeeperRoot() + ZROOT_TABLET_GC_CANDIDATES;
     try {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
index 0b0512b072..4262443dfc 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
@@ -18,15 +18,10 @@
  */
 package org.apache.accumulo.manager.tableOps.delete;
 
-import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
-import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
-import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
-
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -40,11 +35,8 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.iterators.user.GrepIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.TabletState;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -87,32 +79,11 @@ class CleanUp extends ManagerRepo {
 
   @Override
   public long isReady(long tid, Manager manager) throws Exception {
+    // ELASTICITY_TODO investigate this, what is it for and is it still needed?
     if (!manager.hasCycled(creationTime)) {
       return 50;
     }
 
-    boolean done = true;
-
-    try (var tablets = manager.getContext().getAmple().readTablets().forTable(tableId)
-        .fetch(LOCATION, PREV_ROW, SUSPEND).checkConsistency().build()) {
-      Set<TServerInstance> liveTServers = manager.onlineTabletServers();
-      for (TabletMetadata tm : tablets) {
-        TabletState state = TabletState.compute(tm, liveTServers);
-        if (!state.equals(TabletState.UNASSIGNED)) {
-          // This code will even wait on tablets that are assigned to dead tablets servers. This is
-          // intentional because the manager may make metadata writes for these tablets. See #587
-          log.debug("Still waiting for table({}) to be deleted; Target tablet state: UNASSIGNED, "
-              + "Current tablet state: {}, locationState: {}", tableId, state, tm);
-          done = false;
-          break;
-        }
-      }
-    }
-
-    if (!done) {
-      return 50;
-    }
-
     return 0;
   }
 
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
index e8eb5c8761..b511056e2a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
@@ -49,7 +49,7 @@ public class DeleteTable extends ManagerRepo {
   public Repo<Manager> call(long tid, Manager env) {
     env.getTableManager().transitionTableState(tableId, TableState.DELETING);
     env.getEventCoordinator().event(tableId, "deleting table %s ", tableId);
-    return new CleanUp(tableId, namespaceId);
+    return new ReserveTablets(tableId, namespaceId);
   }
 
   @Override
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
index d9c31c13b8..41ce26dcbb 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
@@ -57,6 +57,7 @@ public class PreDeleteTable extends ManagerRepo {
 
   private void preventFutureCompactions(Manager environment)
       throws KeeperException, InterruptedException {
+    // ELASTICITY_TODO investigate this. Is still needed? Is it still working as expected?
     String deleteMarkerPath = createDeleteMarkerPath(environment.getInstanceID(), tableId);
     ZooReaderWriter zoo = environment.getContext().getZooReaderWriter();
     zoo.putPersistentData(deleteMarkerPath, new byte[] {}, NodeExistsPolicy.SKIP);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
new file mode 100644
index 0000000000..afdd5fc155
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.delete;
+
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReserveTablets extends ManagerRepo {
+
+  private static final Logger log = LoggerFactory.getLogger(ReserveTablets.class);
+
+  private static final long serialVersionUID = 1L;
+
+  private final TableId tableId;
+  private final NamespaceId namespaceId;
+
+  public ReserveTablets(TableId tableId, NamespaceId namespaceId) {
+    this.tableId = tableId;
+    this.namespaceId = namespaceId;
+  }
+
+  @Override
+  public long isReady(long tid, Manager manager) throws Exception {
+
+    var opid = TabletOperationId.from(TabletOperationType.DELETING, tid);
+
+    // The consumer may be called in another thread so use an AtomicLong
+    AtomicLong accepted = new AtomicLong(0);
+    BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> {
+      if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
+        accepted.incrementAndGet();
+      } else {
+        log.debug("Failed to set operation id {} {}", opid, extent);
+      }
+    };
+
+    long locations = 0;
+    long otherOps = 0;
+    long submitted = 0;
+    long tabletsSeen = 0;
+
+    try (
+        var tablets = manager.getContext().getAmple().readTablets().forTable(tableId)
+            .fetch(OPID, PREV_ROW, LOCATION).checkConsistency().build();
+        var conditionalMutator =
+            manager.getContext().getAmple().conditionallyMutateTablets(resultsConsumer)) {
+
+      for (var tabletMeta : tablets) {
+        tabletsSeen++;
+        if (tabletMeta.getLocation() != null) {
+          locations++;
+        }
+
+        if (tabletMeta.getOperationId() != null) {
+          if (!opid.equals(tabletMeta.getOperationId())) {
+            otherOps++;
+          }
+        } else {
+          conditionalMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation()
+              .putOperation(opid).submit(tm -> opid.equals(tm.getOperationId()));
+          submitted++;
+        }
+      }
+    }
+
+    if (locations > 0 || otherOps > 0 || submitted != accepted.get()) {
+      log.debug("Waiting to delete table locations:{} operations:{}  submitted:{} accepted:{}",
+          locations, otherOps, submitted, accepted.get());
+      return Math.min(Math.max(100, tabletsSeen), 30000);
+    }
+
+    return 0;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+    return new CleanUp(tableId, namespaceId);
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 2ee6c5d1e7..3a7e2d3c9d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -26,6 +26,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
 import static org.apache.accumulo.core.util.LazySingletons.GSON;
@@ -43,8 +45,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -62,6 +67,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -73,6 +79,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
 import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -794,4 +801,74 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       assertEquals(44L, context.getAmple().readTablet(e1).getFlushId().getAsLong());
     }
   }
+
+  @Test
+  public void testAsyncMutator() throws Exception {
+    var table = getUniqueNames(2)[1];
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+      // The AsyncConditionalTabletsMutatorImpl processes batches of conditional mutations. Run
+      // tests where more than the batch size is processed an ensure this handled correctly.
+
+      TreeSet<Text> splits =
+          IntStream.range(1, (int) (AsyncConditionalTabletsMutatorImpl.BATCH_SIZE * 2.5))
+              .mapToObj(i -> new Text(String.format("%06d", i)))
+              .collect(Collectors.toCollection(TreeSet::new));
+
+      assertTrue(splits.size() > AsyncConditionalTabletsMutatorImpl.BATCH_SIZE);
+
+      c.tableOperations().create(table, new NewTableConfiguration().withSplits(splits));
+      var tableId = TableId.of(c.tableOperations().tableIdMap().get(table));
+
+      var ample = cluster.getServerContext().getAmple();
+
+      AtomicLong accepted = new AtomicLong(0);
+      AtomicLong total = new AtomicLong(0);
+      BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> {
+        if (result.getStatus() == Status.ACCEPTED) {
+          accepted.incrementAndGet();
+        }
+        total.incrementAndGet();
+      };
+
+      // run a test where a subset of tablets are modified, all modifications should be accepted
+      var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50);
+
+      int expected = 0;
+      try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build();
+          var mutator = ample.conditionallyMutateTablets(resultsConsumer)) {
+        for (var tablet : tablets) {
+          if (tablet.getEndRow() != null
+              && Integer.parseInt(tablet.getEndRow().toString()) % 2 == 0) {
+            mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid1)
+                .submit(tm -> opid1.equals(tm.getOperationId()));
+            expected++;
+          }
+        }
+      }
+
+      assertTrue(expected > 0);
+      assertEquals(expected, accepted.get());
+      assertEquals(total.get(), accepted.get());
+
+      // run test where some will be accepted and some will be rejected and ensure the counts come
+      // out as expected.
+      var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51);
+
+      accepted.set(0);
+      total.set(0);
+
+      try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build();
+          var mutator = ample.conditionallyMutateTablets(resultsConsumer)) {
+        for (var tablet : tablets) {
+          mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid2)
+              .submit(tm -> opid2.equals(tm.getOperationId()));
+        }
+      }
+
+      var numTablets = splits.size() + 1;
+      assertEquals(numTablets - expected, accepted.get());
+      assertEquals(numTablets, total.get());
+    }
+  }
+
 }