You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/05/23 22:05:43 UTC

[accumulo] branch elasticity updated: Improves handling of conditional writer edge cases in Ample (#3416)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new e0612abb7b Improves handling of conditional writer edge cases in Ample (#3416)
e0612abb7b is described below

commit e0612abb7b8ad835350d01d754872d2a736828a5
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue May 23 18:05:36 2023 -0400

    Improves handling of conditional writer edge cases in Ample (#3416)
---
 .../accumulo/core/metadata/schema/Ample.java       | 131 ++++++++++++++++----
 .../manager/state/AbstractTabletStateStore.java    |  26 ++--
 .../server/manager/state/MetaDataStateStore.java   |   6 +-
 .../metadata/ConditionalTabletMutatorImpl.java     |  18 +--
 .../metadata/ConditionalTabletsMutatorImpl.java    | 135 ++++++++++++++++-----
 .../ConditionalTabletsMutatorImplTest.java         |  64 ++++++----
 .../tableOps/bulkVer2/CleanUpBulkImport.java       |   4 +-
 .../manager/tableOps/bulkVer2/LoadFiles.java       |  10 +-
 .../manager/tableOps/split/DeleteOperationIds.java |  17 ++-
 .../accumulo/manager/tableOps/split/PreSplit.java  |   9 +-
 .../manager/tableOps/split/UpdateTablets.java      |  36 ++----
 .../test/functional/AmpleConditionalWriterIT.java  |  76 ++++++------
 12 files changed, 336 insertions(+), 196 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 491dad5754..c0ee195825 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -186,6 +186,11 @@ public interface Ample {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * An entry point for updating tablets metadata using a conditional writer.
+   *
+   * @see ConditionalTabletMutator#submit(RejectionHandler)
+   */
   default ConditionalTabletsMutator conditionallyMutateTablets() {
     throw new UnsupportedOperationException();
   }
@@ -250,23 +255,27 @@ public interface Ample {
   public interface ConditionalResult {
 
     /**
-     * Returns the status of the conditional mutation. If the status was
-     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN} and
-     * Ample#UknownValidator indicates things are ok then this will return
-     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED}
+     * This enum was created instead of using {@link ConditionalWriter.Status} because Ample has
+     * automated handling for most of the statuses of the conditional writer and therefore only a
+     * subset are expected to be passed out of Ample. This enum represents the subset that Ample
+     * will actually return.
      */
-    ConditionalWriter.Status getStatus();
+    enum Status {
+      ACCEPTED, REJECTED
+    }
+
+    /**
+     * Returns the status of the conditional mutation or may return a computed status of ACCEPTED in
+     * some cases, see {@link ConditionalTabletMutator#submit(RejectionHandler)} for details.
+     */
+    Status getStatus();
 
     KeyExtent getExtent();
 
     /**
      * This can only be called when {@link #getStatus()} returns something other than
-     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED}. It reads that
-     * tablets metadata for a failed conditional mutation. This can used used to see why it failed.
-     * In the case where {@link #getStatus()} returns
-     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN} it can be used to
-     * see if the mutation actually succeeded or not.
-     *
+     * {@link Status#ACCEPTED}. It reads that tablets metadata for a failed conditional mutation.
+     * This can used used to see why it was rejected.
      */
     TabletMetadata readMetadata();
   }
@@ -275,7 +284,7 @@ public interface Ample {
 
     /**
      * @return A fluent interface to conditional mutating a tablet. Ensure you call
-     *         {@link ConditionalTabletMutator#submit()} when finished.
+     *         {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished.
      */
     OperationRequirements mutateTablet(KeyExtent extent);
 
@@ -396,9 +405,9 @@ public interface Ample {
   }
 
   /**
-   * Convenience interface for handling conditional mutations with a status of UNKNOWN.
+   * Convenience interface for handling conditional mutations with a status of REJECTED.
    */
-  interface UnknownValidator extends Predicate<TabletMetadata> {}
+  interface RejectionHandler extends Predicate<TabletMetadata> {}
 
   interface ConditionalTabletMutator extends TabletUpdates<ConditionalTabletMutator> {
 
@@ -428,18 +437,92 @@ public interface Ample {
     ConditionalTabletMutator requirePrevEndRow(Text per);
 
     /**
-     * Submits or queues a conditional mutation for processing.
-     */
-    void submit();
-
-    /**
-     * @param unknownCheck if the conditional mutation comes back with a status of
-     *        {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN} then read the
-     *        tablets metadata and apply this check to see if it should be considered as
+     * <p>
+     * Ample provides the following features on top of the conditional writer to help automate
+     * handling of edges cases that arise when using the conditional writer.
+     * <ul>
+     * <li>Automatically resubmit conditional mutations with a status of
+     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN}.</li>
+     * <li>When a mutation is rejected (status of
+     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#REJECTED}) it will read the
+     * tablets metadata and call the passed rejectionHandler to determine if the mutation should be
+     * considered as accepted.</li>
+     * <li>For status of
+     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#INVISIBLE_VISIBILITY} and
+     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#VIOLATED} ample will throw an
+     * exception. This is done so that all code does not have to deal with these unexpected
+     * statuses.</li>
+     * </ul>
+     *
+     * <p>
+     * The motivation behind the rejectionHandler is to help sort things out when conditional
+     * mutations are submitted twice and the subsequent submission is rejected even though the first
+     * submission was accepted. There are two causes for this. First when a threads is running in
+     * something like FATE it may submit a mutation and the thread dies before it sees the response.
+     * Later FATE will run the code again for a second time, submitting a second mutation. The
+     * second cause is ample resubmitting on unknown as mentioned above. Below are a few examples
+     * that go over how Ample will handle these different situations.
+     *
+     * <h3>Example 1</h3>
+     *
+     * <ul>
+     * <li>Conditional mutation CM1 with a condition requiring an absent location that sets a future
+     * location is submitted. When its submitted to ample a rejectionHandler is set that checks the
+     * future location.</li>
+     * <li>Inside Ample CM1 is submitted to a conditional writer and returns a status of UNKNOWN,
+     * but it actually succeeded. This could be caused by the mutation succeeding and the tablet
+     * server dying just before it reports back.</li>
+     * <li>Ample sees the UNKNOWN status and resubmits CM1 for a second time. Because the future
+     * location was set, the mutation is returned to ample with a status of rejected by the
+     * conditional writer.</li>
+     * <li>Because the mutation was rejected, ample reads the tablet metadata and calls the
+     * rejectionHandler. The rejectionHandler sees the future location was set and reports that
+     * everything is ok, therefore ample reports the status as ACCEPTED.</li>
+     * </ul>
+     *
+     * <h3>Example 2</h3>
+     *
+     * <ul>
+     * <li>Conditional mutation CM2 with a condition requiring an absent location that sets a future
+     * location is submitted. When its submitted to ample a rejectionHandler is set that checks the
+     * future location.</li>
+     * <li>Inside Ample CM2 is submitted to a conditional writer and returns a status of UNKNOWN,
+     * but it actually never made it to the tserver. This could be caused by the tablet server dying
+     * just after a network connection was established to send the mutation.</li>
+     * <li>Ample sees the UNKNOWN status and resubmits CM2 for a second time. There is no future
+     * location set so the mutation is returned to ample with a status of accepted by the
+     * conditional writer.</li>
+     * <li>Because the mutation was accepted, ample never calls the rejectionHandler and returns it
+     * as accepted.</li>
+     * </ul>
+     *
+     * <h3>Example 3</h3>
+     *
+     * <ul>
+     * <li>Conditional mutation CM3 with a condition requiring an absent operation that sets the
+     * operation id to a fate transaction id is submitted. When it's submitted to ample a
+     * rejectionHandler is set that checks if the operation id equals the fate transaction id.</li>
+     * <li>The thread running the fate operation dies after submitting the mutation but before
+     * seeing it was actually accepted.</li>
+     * <li>Later fate creates an identical mutation to CM3, lets call it CM3.2, and resubmits it
+     * with the same rejection handler.</li>
+     * <li>CM3.2 is rejected because the operation id is not absent.</li>
+     * <li>Because the mutation was rejected, ample calls the rejectionHandler. The rejectionHandler
+     * sees in the tablet metadata that the operation id is its fate transaction id and reports back
+     * true</li>
+     * <li>When rejectionHandler reports true, ample reports the mutation as accepted.</li>
+     * </ul>
+     *
+     * @param rejectionHandler if the conditional mutation comes back with a status of
+     *        {@link org.apache.accumulo.core.client.ConditionalWriter.Status#REJECTED} then read
+     *        the tablets metadata and apply this check to see if it should be considered as
      *        {@link org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED} in the
-     *        return of {@link ConditionalTabletsMutator#process()}
+     *        return of {@link ConditionalTabletsMutator#process()}. The rejection handler is only
+     *        called when a tablets metadata exists. If ample reads a tablet's metadata and the
+     *        tablet no longer exists, then ample will not call the rejectionHandler with null. It
+     *        will let the rejected status carry forward in this case.
      */
-    void submit(UnknownValidator unknownCheck);
+    void submit(RejectionHandler rejectionHandler);
   }
 
   /**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
index cbd8384923..41516db6a5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
@@ -21,14 +21,13 @@ package org.apache.accumulo.server.manager.state;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
-import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletLocationState;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.server.util.ManagerMetadataUtil;
@@ -61,16 +60,13 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
 
         conditionalMutator.submit(tabletMetadata -> {
           Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
-          // see if we are the current location, if so then the unknown mutation actually
-          // succeeded
           return tabletMetadata.getLocation() != null && tabletMetadata.getLocation()
               .equals(TabletMetadata.Location.current(assignment.server));
         });
       }
 
       if (tabletsMutator.process().values().stream()
-          .anyMatch(result -> result.getStatus() != ConditionalWriter.Status.ACCEPTED)) {
-        // TODO should this look at why?
+          .anyMatch(result -> result.getStatus() != Status.ACCEPTED)) {
         throw new DistributedStoreException(
             "failed to set tablet location, conditional mutation failed");
       }
@@ -89,8 +85,6 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
             .putLocation(TabletMetadata.Location.future(assignment.server))
             .submit(tabletMetadata -> {
               Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
-              // see if we are the future location, if so then the unknown mutation actually
-              // succeeded
               return tabletMetadata.getLocation() != null && tabletMetadata.getLocation()
                   .equals(TabletMetadata.Location.future(assignment.server));
             });
@@ -98,12 +92,9 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
 
       var results = tabletsMutator.process();
 
-      if (results.values().stream()
-          .anyMatch(result -> result.getStatus() != ConditionalWriter.Status.ACCEPTED)) {
-        var statuses = results.values().stream().map(Ample.ConditionalResult::getStatus)
-            .collect(Collectors.toSet());
+      if (results.values().stream().anyMatch(result -> result.getStatus() != Status.ACCEPTED)) {
         throw new DistributedStoreException(
-            "failed to set tablet location, conditional mutation failed. " + statuses);
+            "failed to set tablet location, conditional mutation failed. ");
       }
 
     } catch (RuntimeException ex) {
@@ -158,16 +149,13 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
 
         processSuspension(tabletMutator, tls, suspensionTimestamp);
 
-        tabletMutator.submit(tabletMetadata -> {
-          // The status of the conditional update is unknown, so check and see if things are ok
-          return tabletMetadata.getLocation() == null;
-        });
+        tabletMutator.submit(tabletMetadata -> tabletMetadata.getLocation() == null);
       }
 
       Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process();
 
-      if (results.values().stream().anyMatch(conditionalResult -> conditionalResult.getStatus()
-          != ConditionalWriter.Status.ACCEPTED)) {
+      if (results.values().stream()
+          .anyMatch(conditionalResult -> conditionalResult.getStatus() != Status.ACCEPTED)) {
         throw new DistributedStoreException("Some unassignments did not satisfy conditions.");
       }
 
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index 86cd47552e..a5b898cb9e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@ -20,11 +20,11 @@ package org.apache.accumulo.server.manager.state;
 
 import java.util.Collection;
 
-import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.TabletLocationState;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 
 class MetaDataStateStore extends AbstractTabletStateStore implements TabletStateStore {
@@ -64,8 +64,8 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState
             .submit(tabletMetadata -> tabletMetadata.getSuspend() == null);
       }
 
-      boolean unacceptedConditions = tabletsMutator.process().values().stream().anyMatch(
-          conditionalResult -> conditionalResult.getStatus() != ConditionalWriter.Status.ACCEPTED);
+      boolean unacceptedConditions = tabletsMutator.process().values().stream()
+          .anyMatch(conditionalResult -> conditionalResult.getStatus() != Status.ACCEPTED);
       if (unacceptedConditions) {
         throw new DistributedStoreException("Some mutations failed to satisfy conditions");
       }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 6cfd3add13..cef634a8c7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -55,7 +55,7 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit
   private final Consumer<ConditionalMutation> mutationConsumer;
   private final Ample.ConditionalTabletsMutator parent;
 
-  private final BiConsumer<KeyExtent,Ample.UnknownValidator> unknownValidators;
+  private final BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer;
 
   private final KeyExtent extent;
 
@@ -63,12 +63,12 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit
 
   protected ConditionalTabletMutatorImpl(Ample.ConditionalTabletsMutator parent,
       ServerContext context, KeyExtent extent, Consumer<ConditionalMutation> mutationConsumer,
-      BiConsumer<KeyExtent,Ample.UnknownValidator> unknownValidators) {
+      BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer) {
     super(context, new ConditionalMutation(extent.toMetaRow()));
     this.mutation = (ConditionalMutation) super.mutation;
     this.mutationConsumer = mutationConsumer;
     this.parent = parent;
-    this.unknownValidators = unknownValidators;
+    this.rejectionHandlerConsumer = rejectionHandlerConsumer;
     this.extent = extent;
   }
 
@@ -150,19 +150,11 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit
   }
 
   @Override
-  public void submit() {
+  public void submit(Ample.RejectionHandler rejectionCheck) {
     Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
     Preconditions.checkState(sawOperationRequirement, "No operation requirements were seen");
     getMutation();
     mutationConsumer.accept(mutation);
-  }
-
-  @Override
-  public void submit(Ample.UnknownValidator unknownCheck) {
-    Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
-    Preconditions.checkState(sawOperationRequirement, "No operation requirements were seen");
-    getMutation();
-    mutationConsumer.accept(mutation);
-    unknownValidators.accept(extent, unknownCheck);
+    rejectionHandlerConsumer.accept(extent, rejectionCheck);
   }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 940555276f..cfcc1fec49 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -18,8 +18,13 @@
  */
 package org.apache.accumulo.server.metadata;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -29,12 +34,12 @@ import java.util.stream.Collectors;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -58,7 +63,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
 
   private boolean active = true;
 
-  Map<KeyExtent,Ample.UnknownValidator> unknownValidators = new HashMap<>();
+  Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new HashMap<>();
 
   public ConditionalTabletsMutatorImpl(ServerContext context) {
     this.context = context;
@@ -80,7 +85,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
     Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent) == null,
         "Duplicate extents not handled");
     return new ConditionalTabletMutatorImpl(this, context, extent, mutations::add,
-        unknownValidators::put);
+        rejectedHandlers::put);
   }
 
   protected ConditionalWriter createConditionalWriter(Ample.DataLevel dataLevel)
@@ -109,7 +114,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
 
     var extents = results.entrySet().stream().filter(e -> {
       try {
-        return e.getValue().getStatus() != Status.ACCEPTED;
+        return e.getValue().getStatus() != ConditionalWriter.Status.ACCEPTED;
       } catch (AccumuloException | AccumuloSecurityException ex) {
         throw new RuntimeException(ex);
       }
@@ -122,12 +127,83 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
     return readTablets(extents);
   }
 
+  private void partitionResults(Iterator<ConditionalWriter.Result> results,
+      List<ConditionalWriter.Result> resultsList, List<ConditionalWriter.Result> unknownResults) {
+    while (results.hasNext()) {
+      var result = results.next();
+
+      try {
+        if (result.getStatus() == ConditionalWriter.Status.UNKNOWN) {
+          unknownResults.add(result);
+        } else {
+          resultsList.add(result);
+        }
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private Iterator<ConditionalWriter.Result> writeMutations(ConditionalWriter conditionalWriter) {
+    var results = conditionalWriter.write(mutations.iterator());
+
+    List<ConditionalWriter.Result> resultsList = new ArrayList<>();
+    List<ConditionalWriter.Result> unknownResults = new ArrayList<>();
+    partitionResults(results, resultsList, unknownResults);
+
+    Retry retry = null;
+
+    while (!unknownResults.isEmpty()) {
+      try {
+        if (retry == null) {
+          retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+              .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+              .logInterval(3, MINUTES).createRetry();
+        }
+        retry.waitForNextAttempt(log, "handle conditional mutations with unknown status");
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+
+      results = conditionalWriter
+          .write(unknownResults.stream().map(ConditionalWriter.Result::getMutation).iterator());
+
+      // create a new array instead of clearing in case the above has not consumed everything
+      unknownResults = new ArrayList<>();
+
+      partitionResults(results, resultsList, unknownResults);
+    }
+
+    return resultsList.iterator();
+  }
+
+  private Ample.ConditionalResult.Status mapStatus(KeyExtent extent,
+      ConditionalWriter.Result result) {
+
+    ConditionalWriter.Status status = null;
+    try {
+      status = result.getStatus();
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      throw new IllegalStateException(e);
+    }
+
+    switch (status) {
+      case REJECTED:
+        return Ample.ConditionalResult.Status.REJECTED;
+      case ACCEPTED:
+        return Ample.ConditionalResult.Status.ACCEPTED;
+      default:
+        throw new IllegalStateException(
+            "Unexpected conditional mutation status : " + extent + " " + status);
+    }
+  }
+
   @Override
   public Map<KeyExtent,Ample.ConditionalResult> process() {
     Preconditions.checkState(active);
     if (dataLevel != null) {
       try (ConditionalWriter conditionalWriter = createConditionalWriter(dataLevel)) {
-        var results = conditionalWriter.write(mutations.iterator());
+        var results = writeMutations(conditionalWriter);
 
         var resultsMap = new HashMap<KeyExtent,ConditionalWriter.Result>();
 
@@ -138,22 +214,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
         }
 
         var extentsSet = Set.copyOf(extents.values());
-        if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) {
-          // ELASTICITY_TODO this check can trigger if someone forgets to submit, could check for
-          // that
-
-          Sets.difference(resultsMap.keySet(), extentsSet)
-              .forEach(extent -> log.error("Unexpected extent seen in in result {}", extent));
-
-          Sets.difference(extentsSet, resultsMap.keySet())
-              .forEach(extent -> log.error("Expected extent not seen in result {}", extent));
-
-          resultsMap.forEach((keyExtent, result) -> {
-            log.error("result seen {} {}", keyExtent, new Text(result.getMutation().getRow()));
-          });
-
-          throw new AssertionError("Not all extents were seen, this is unexpected");
-        }
+        ensureAllExtentsSeen(resultsMap, extentsSet);
 
         // only fetch the metadata for failures when requested and when it is requested fetch all
         // of the failed extents at once to avoid fetching them one by one.
@@ -161,20 +222,16 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
 
         return Maps.transformEntries(resultsMap, (extent, result) -> new Ample.ConditionalResult() {
 
-          private Status _getStatus() {
-            try {
-              return result.getStatus();
-            } catch (AccumuloException | AccumuloSecurityException e) {
-              throw new RuntimeException(e);
-            }
+          private Ample.ConditionalResult.Status _getStatus() {
+            return mapStatus(extent, result);
           }
 
           @Override
           public Status getStatus() {
             var status = _getStatus();
-            if (status == Status.UNKNOWN && unknownValidators.containsKey(extent)) {
+            if (status == Status.REJECTED && rejectedHandlers.containsKey(extent)) {
               var tabletMetadata = readMetadata();
-              if (tabletMetadata != null && unknownValidators.get(extent).test(tabletMetadata)) {
+              if (tabletMetadata != null && rejectedHandlers.get(extent).test(tabletMetadata)) {
                 return Status.ACCEPTED;
               }
             }
@@ -211,6 +268,26 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
     }
   }
 
+  private void ensureAllExtentsSeen(HashMap<KeyExtent,ConditionalWriter.Result> resultsMap,
+      Set<KeyExtent> extentsSet) {
+    if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) {
+      // ELASTICITY_TODO this check can trigger if someone forgets to submit, could check for
+      // that
+
+      Sets.difference(resultsMap.keySet(), extentsSet)
+          .forEach(extent -> log.error("Unexpected extent seen in in result {}", extent));
+
+      Sets.difference(extentsSet, resultsMap.keySet())
+          .forEach(extent -> log.error("Expected extent not seen in result {}", extent));
+
+      resultsMap.forEach((keyExtent, result) -> {
+        log.error("result seen {} {}", keyExtent, new Text(result.getMutation().getRow()));
+      });
+
+      throw new AssertionError("Not all extents were seen, this is unexpected");
+    }
+  }
+
   @Override
   public void close() {}
 }
diff --git a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
index 569b6190a4..e4d6bd51ff 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
@@ -28,7 +28,6 @@ import java.util.function.Function;
 import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.data.TableId;
@@ -45,9 +44,11 @@ public class ConditionalTabletsMutatorImplTest {
   static class TestConditionalTabletsMutator extends ConditionalTabletsMutatorImpl {
 
     private final Map<KeyExtent,TabletMetadata> failedExtents;
-    private final Function<Text,Status> statuses;
+    private final List<Function<Text,ConditionalWriter.Status>> statuses;
 
-    public TestConditionalTabletsMutator(Function<Text,Status> statuses,
+    private int attempt = 0;
+
+    public TestConditionalTabletsMutator(List<Function<Text,ConditionalWriter.Status>> statuses,
         Map<KeyExtent,TabletMetadata> failedExtents) {
       super(null);
       this.statuses = statuses;
@@ -64,8 +65,10 @@ public class ConditionalTabletsMutatorImplTest {
         @Override
         public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
           Iterable<ConditionalMutation> iterable = () -> mutations;
+          var localAttempt = attempt++;
           return StreamSupport.stream(iterable.spliterator(), false)
-              .map(cm -> new Result(statuses.apply(new Text(cm.getRow())), cm, "server"))
+              .map(cm -> new Result(statuses.get(localAttempt).apply(new Text(cm.getRow())), cm,
+                  "server"))
               .iterator();
         }
 
@@ -83,9 +86,10 @@ public class ConditionalTabletsMutatorImplTest {
   }
 
   @Test
-  public void testUnknownValidation() {
+  public void testRejectionHandler() {
 
-    // this test checks the handling of conditional mutations that return a status of unknown
+    // this test checks the handling of conditional mutations that return a status of unknown and
+    // rejected
 
     var ke1 = new KeyExtent(TableId.of("1"), null, null);
 
@@ -100,41 +104,51 @@ public class ConditionalTabletsMutatorImplTest {
     EasyMock.replay(tm2);
 
     var ke3 = new KeyExtent(TableId.of("b"), null, null);
+
+    TabletMetadata tm3 = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm3.getDirName()).andReturn("dir4").anyTimes();
+    EasyMock.replay(tm3);
+
     var ke4 = new KeyExtent(TableId.of("c"), null, null);
 
-    var failedExtents = Map.of(ke1, tm1, ke2, tm2);
-    var statuses = Map.of(ke1.toMetaRow(), Status.UNKNOWN, ke2.toMetaRow(), Status.UNKNOWN,
-        ke3.toMetaRow(), Status.REJECTED, ke4.toMetaRow(), Status.ACCEPTED);
+    TabletMetadata tm4 = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm4.getDirName()).andReturn("dir5").anyTimes();
+    EasyMock.replay(tm4);
+
+    var failedExtents = Map.of(ke1, tm1, ke2, tm2, ke3, tm3, ke4, tm4);
+
+    // expect retry on unknown
+    var statuses1 = Map.of(ke1.toMetaRow(), ConditionalWriter.Status.UNKNOWN, ke2.toMetaRow(),
+        ConditionalWriter.Status.UNKNOWN, ke3.toMetaRow(), ConditionalWriter.Status.REJECTED,
+        ke4.toMetaRow(), ConditionalWriter.Status.ACCEPTED);
+
+    // on the 2nd retry, return rejected
+    var statuses2 = Map.of(ke1.toMetaRow(), ConditionalWriter.Status.REJECTED, ke2.toMetaRow(),
+        ConditionalWriter.Status.REJECTED);
+
+    try (var mutator =
+        new TestConditionalTabletsMutator(List.of(statuses1::get, statuses2::get), failedExtents)) {
 
-    try (var mutator = new TestConditionalTabletsMutator(statuses::get, failedExtents)) {
-      // passed in unknown handler should determine the mutations status should be accepted
       mutator.mutateTablet(ke1).requireAbsentOperation().putDirName("dir1")
           .submit(tmeta -> tmeta.getDirName().equals("dir1"));
 
-      // passed in unknown handler should determine the mutations status should continue to be
-      // UNKNOWN
       mutator.mutateTablet(ke2).requireAbsentOperation().putDirName("dir3")
           .submit(tmeta -> tmeta.getDirName().equals("dir3"));
 
-      // ensure the unknown handler is only called when the status is unknown, this mutation will
-      // have a status of REJECTED
-      mutator.mutateTablet(ke3).requireAbsentOperation().putDirName("dir3").submit(tmeta -> {
-        throw new IllegalStateException();
-      });
+      mutator.mutateTablet(ke3).requireAbsentOperation().putDirName("dir4")
+          .submit(tmeta -> tmeta.getDirName().equals("dir4"));
 
-      // ensure the unknown handler is only called when the status is unknown, this mutations will
-      // have a status of ACCEPTED
-      mutator.mutateTablet(ke4).requireAbsentOperation().putDirName("dir3").submit(tmeta -> {
+      mutator.mutateTablet(ke4).requireAbsentOperation().putDirName("dir5").submit(tmeta -> {
         throw new IllegalStateException();
       });
 
       Map<KeyExtent,Ample.ConditionalResult> results = mutator.process();
 
       assertEquals(Set.of(ke1, ke2, ke3, ke4), results.keySet());
-      assertEquals(Status.ACCEPTED, results.get(ke1).getStatus());
-      assertEquals(Status.UNKNOWN, results.get(ke2).getStatus());
-      assertEquals(Status.REJECTED, results.get(ke3).getStatus());
-      assertEquals(Status.ACCEPTED, results.get(ke4).getStatus());
+      assertEquals(Ample.ConditionalResult.Status.ACCEPTED, results.get(ke1).getStatus());
+      assertEquals(Ample.ConditionalResult.Status.REJECTED, results.get(ke2).getStatus());
+      assertEquals(Ample.ConditionalResult.Status.ACCEPTED, results.get(ke3).getStatus());
+      assertEquals(Ample.ConditionalResult.Status.ACCEPTED, results.get(ke4).getStatus());
     }
   }
 }
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index 2760abd8bd..139762d557 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -27,13 +27,13 @@ import java.util.Collections;
 import java.util.Map;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.manager.thrift.BulkImportState;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.manager.Manager;
@@ -111,7 +111,7 @@ public class CleanUpBulkImport extends ManagerRepo {
                 tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
             tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue() == tid)
                 .map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile);
-            tabletMutator.submit();
+            tabletMutator.submit(tm -> false);
           }
         }
 
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index f739fa1b03..1efd621a0e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
-import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.clientImpl.bulk.Bulk;
 import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
 import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
@@ -42,6 +41,7 @@ import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.manager.thrift.BulkImportState;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -139,7 +139,7 @@ class LoadFiles extends ManagerRepo {
             tabletMutator.putFile(f, v);
           });
 
-          tabletMutator.submit();
+          tabletMutator.submit(tm -> false);
         }
       }
     }
@@ -147,15 +147,15 @@ class LoadFiles extends ManagerRepo {
     long finish() {
       var results = conditionalMutator.process();
 
-      boolean allDone = results.values().stream()
-          .allMatch(result -> result.getStatus() == ConditionalWriter.Status.ACCEPTED);
+      boolean allDone =
+          results.values().stream().allMatch(result -> result.getStatus() == Status.ACCEPTED);
 
       long sleepTime = 0;
       if (!allDone) {
         sleepTime = 1000;
 
         results.forEach((extent, condResult) -> {
-          if (condResult.getStatus() != ConditionalWriter.Status.ACCEPTED) {
+          if (condResult.getStatus() != Status.ACCEPTED) {
             var metadata = condResult.readMetadata();
             log.debug("Tablet update failed {} {} {} {} {} {}", FateTxId.formatTid(tid), extent,
                 condResult.getStatus(), metadata.getOperationId(), metadata.getLocation(),
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
index 181bc71e62..6528b7e21c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
@@ -20,9 +20,9 @@ package org.apache.accumulo.manager.tableOps.split;
 
 import java.util.stream.Collectors;
 
-import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.manager.Manager;
@@ -44,25 +44,22 @@ public class DeleteOperationIds extends ManagerRepo {
     try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
 
       // As long as the operation is not our operation id, then this step can be considered
-      // successful in the case of unknown. If this repo is running for a second time and has
+      // successful in the case of rejection. If this repo is running for a second time and has
       // already deleted the operation id, then it could be absent or set by another fate operation.
-      Ample.UnknownValidator unknownValidator =
-          tabletMetadata -> tabletMetadata.getOperationId() == null
-              || !tabletMetadata.getOperationId().equals(opid);
+      Ample.RejectionHandler rejectionHandler =
+          tabletMetadata -> !opid.equals(tabletMetadata.getOperationId());
 
       splitInfo.getTablets().forEach(extent -> {
         tabletsMutator.mutateTablet(extent).requireOperation(opid).deleteOperation()
-            .submit(unknownValidator);
+            .submit(rejectionHandler);
       });
 
       var results = tabletsMutator.process();
 
-      boolean allAccepted = results.values().stream()
-          .allMatch(result -> result.getStatus() == ConditionalWriter.Status.ACCEPTED);
+      boolean allAccepted =
+          results.values().stream().allMatch(result -> result.getStatus() == Status.ACCEPTED);
 
       if (!allAccepted) {
-        // ELASTICITY_TODO not handling the case where running a 2nd time in the case of failures.
-        // Fix this when improving unknown handling.
         throw new IllegalStateException(
             "Failed to delete operation ids " + splitInfo.getOriginal() + " " + results.values()
                 .stream().map(Ample.ConditionalResult::getStatus).collect(Collectors.toSet()));
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
index 9d8b853fba..ae0bfae78a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
@@ -25,11 +25,11 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.SortedSet;
 
-import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletOperationId;
@@ -77,12 +77,11 @@ public class PreSplit extends ManagerRepo {
 
       tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation()
           .requireAbsentLocation().requirePrevEndRow(splitInfo.getOriginal().prevEndRow())
-          .putOperation(opid)
-          .submit(tmeta -> tmeta.getOperationId() != null && tmeta.getOperationId().equals(opid));
+          .putOperation(opid).submit(tmeta -> opid.equals(tmeta.getOperationId()));
 
       Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process();
 
-      if (results.get(splitInfo.getOriginal()).getStatus() == ConditionalWriter.Status.ACCEPTED) {
+      if (results.get(splitInfo.getOriginal()).getStatus() == Status.ACCEPTED) {
         log.trace("{} reserved {} for split", FateTxId.formatTid(tid), splitInfo.getOriginal());
         return 0;
       } else {
@@ -91,7 +90,7 @@ public class PreSplit extends ManagerRepo {
         // its possible the tablet no longer exists
         var optMeta = Optional.ofNullable(tabletMetadata);
 
-        log.trace("{} Failed to set operation id. extent:{} location:{} opid:{}",
+        log.debug("{} Failed to set operation id. extent:{} location:{} opid:{}",
             FateTxId.formatTid(tid), splitInfo.getOriginal(),
             optMeta.map(TabletMetadata::getLocation).orElse(null),
             optMeta.map(TabletMetadata::getOperationId).orElse(null));
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
index 09db1e11f8..1ed5de53b9 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
@@ -26,12 +26,12 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 
-import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletOperationId;
@@ -169,7 +169,7 @@ public class UpdateTablets extends ManagerRepo {
     try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
       for (var newExtent : newTablets) {
         if (newExtent.equals(newTablets.last())) {
-          // Skip the last tablet, its done in the next fate step.
+          // Skip the last tablet, its done after successfully adding all new tablets
           continue;
         }
 
@@ -188,28 +188,15 @@ public class UpdateTablets extends ManagerRepo {
 
         newTabletsFiles.get(newExtent).forEach(mutator::putFile);
 
-        mutator.submit(afterMeta -> afterMeta.getOperationId() != null
-            && afterMeta.getOperationId().equals(opid));
+        mutator.submit(afterMeta -> opid.equals(afterMeta.getOperationId()));
       }
 
       var results = tabletsMutator.process();
       results.values().forEach(result -> {
         var status = result.getStatus();
-        if (status == ConditionalWriter.Status.REJECTED) {
-          // lets see if this was rejected because this operation is running again in the case of
-          // failure
-          var newTabletMetadata = result.readMetadata();
-          if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) {
-            log.trace(
-                "{} {} creating new tablet was rejected because it existed, operation probably failed before.",
-                FateTxId.formatTid(tid), result.getExtent());
-            return;
-          }
-        }
 
-        Preconditions.checkState(status == ConditionalWriter.Status.ACCEPTED,
-            "Failed to add new tablet %s %s %s", status, splitInfo.getOriginal(),
-            result.getExtent());
+        Preconditions.checkState(status == Status.ACCEPTED, "Failed to add new tablet %s %s %s",
+            status, splitInfo.getOriginal(), result.getExtent());
       });
     }
   }
@@ -234,24 +221,21 @@ public class UpdateTablets extends ManagerRepo {
         }
       });
 
-      mutator.submit();
+      mutator.submit(tm -> false);
 
       var result = tabletsMutator.process().get(splitInfo.getOriginal());
 
-      if (result.getStatus() == ConditionalWriter.Status.UNKNOWN) {
-        // Can not use Ample's built in code for checking unknown because we are changing the prev
-        // end row, so much check it manually
+      if (result.getStatus() == Status.REJECTED) {
+        // Can not use Ample's built in code for checking rejected because we are changing the prev
+        // end row and Ample would try to read the old tablet, so must check it manually.
 
         var tabletMeta = manager.getContext().getAmple().readTablet(newExtent);
 
         if (tabletMeta == null || !tabletMeta.getOperationId().equals(opid)) {
-          // ELASTICITY_TODO need to retry when an UNKNOWN condition is seen, its possible the
-          // mutation never made it to the tserver. May want ample to always retry on unknown and
-          // change the unknown handler to a rejected handler.
           throw new IllegalStateException("Failed to update existing tablet in split "
               + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent());
         }
-      } else if (result.getStatus() != ConditionalWriter.Status.ACCEPTED) {
+      } else if (result.getStatus() != Status.ACCEPTED) {
         // maybe this step is being run again and the update was already made
         throw new IllegalStateException("Failed to update existing tablet in split "
             + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index ed99f59686..ee18a6133a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -30,7 +30,6 @@ import java.util.TreeSet;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -38,6 +37,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
@@ -78,7 +78,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       var ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts1)).submit();
+          .putLocation(Location.future(ts1)).submit(tm -> false);
       var results = ctmi.process();
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 
@@ -86,7 +86,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts2)).submit();
+          .putLocation(Location.future(ts2)).submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.REJECTED, results.get(e1).getStatus());
 
@@ -94,7 +94,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
-          .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)).submit();
+          .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
+          .submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 
@@ -102,7 +103,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
-          .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)).submit();
+          .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
+          .submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.REJECTED, results.get(e1).getStatus());
 
@@ -110,7 +112,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts2))
-          .putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2)).submit();
+          .putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2))
+          .submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.REJECTED, results.get(e1).getStatus());
 
@@ -118,7 +121,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
-          .deleteLocation(Location.current(ts1)).submit();
+          .deleteLocation(Location.current(ts1)).submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 
@@ -162,7 +165,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       // simulate a compaction where the tablet location is not set
       var ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf1).requireFile(stf2)
-          .requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).submit();
+          .requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).submit(tm -> false);
       var results = ctmi.process();
       assertEquals(Status.REJECTED, results.get(e1).getStatus());
 
@@ -172,7 +175,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
         ctmi = new ConditionalTabletsMutatorImpl(context);
         ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
-            .putFile(file, new DataFileValue(0, 0)).submit();
+            .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
         results = ctmi.process();
         assertEquals(Status.REJECTED, results.get(e1).getStatus());
       }
@@ -182,7 +185,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       // set the location
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.current(ts1)).submit();
+          .putLocation(Location.current(ts1)).submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 
@@ -190,7 +193,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
         ctmi = new ConditionalTabletsMutatorImpl(context);
         ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts2))
-            .putFile(file, new DataFileValue(0, 0)).submit();
+            .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
         results = ctmi.process();
         assertEquals(Status.REJECTED, results.get(e1).getStatus());
       }
@@ -201,7 +204,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
         ctmi = new ConditionalTabletsMutatorImpl(context);
         ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
-            .putFile(file, new DataFileValue(0, 0)).submit();
+            .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
         results = ctmi.process();
         assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
       }
@@ -212,7 +215,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf1).requireFile(stf2)
           .requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).deleteFile(stf1)
-          .deleteFile(stf2).deleteFile(stf3).submit();
+          .deleteFile(stf2).deleteFile(stf3).submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 
@@ -227,7 +230,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5)
           .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5, 9L)
-          .putFile(stf5, new DataFileValue(0, 0)).submit();
+          .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 
@@ -238,7 +241,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
           "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf");
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf4).requireFile(stf5)
-          .putFile(stf6, new DataFileValue(0, 0)).deleteFile(stf4).deleteFile(stf5).submit();
+          .putFile(stf6, new DataFileValue(0, 0)).deleteFile(stf4).deleteFile(stf5)
+          .submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 
@@ -248,7 +252,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5)
           .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5, 9L)
-          .putFile(stf5, new DataFileValue(0, 0)).submit();
+          .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false);
       results = ctmi.process();
       assertEquals(Status.REJECTED, results.get(e1).getStatus());
 
@@ -282,9 +286,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       var ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts1)).submit();
+          .putLocation(Location.future(ts1)).submit(tm -> false);
       ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts2)).submit();
+          .putLocation(Location.future(ts2)).submit(tm -> false);
       var results = ctmi.process();
 
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -299,13 +303,13 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts2)).submit();
+          .putLocation(Location.future(ts2)).submit(tm -> false);
       ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts1)).submit();
+          .putLocation(Location.future(ts1)).submit(tm -> false);
       ctmi.mutateTablet(e3).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts1)).submit();
+          .putLocation(Location.future(ts1)).submit(tm -> false);
       ctmi.mutateTablet(e4).requireAbsentOperation().requireAbsentLocation()
-          .putLocation(Location.future(ts2)).submit();
+          .putLocation(Location.future(ts2)).submit(tm -> false);
       results = ctmi.process();
 
       assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -347,9 +351,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       var opid2 = TabletOperationId.from("MERGING:FATE[5678]");
 
       var ctmi = new ConditionalTabletsMutatorImpl(context);
-      ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit();
-      ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit();
-      ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit();
+      ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> false);
+      ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit(tm -> false);
+      ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit(tm -> false);
       var results = ctmi.process();
 
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -361,11 +365,11 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       assertEquals(TabletOperationType.MERGING,
           context.getAmple().readTablet(e2).getOperationId().getType());
       assertEquals(opid2, context.getAmple().readTablet(e2).getOperationId());
-      assertEquals(null, context.getAmple().readTablet(e3).getOperationId());
+      assertNull(context.getAmple().readTablet(e3).getOperationId());
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
-      ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit();
-      ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit();
+      ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit(tm -> false);
+      ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit(tm -> false);
       results = ctmi.process();
 
       assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -376,14 +380,14 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
           context.getAmple().readTablet(e2).getOperationId().getType());
 
       ctmi = new ConditionalTabletsMutatorImpl(context);
-      ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit();
-      ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit();
+      ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit(tm -> false);
+      ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit(tm -> false);
       results = ctmi.process();
 
       assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
       assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
-      assertEquals(null, context.getAmple().readTablet(e1).getOperationId());
-      assertEquals(null, context.getAmple().readTablet(e2).getOperationId());
+      assertNull(context.getAmple().readTablet(e1).getOperationId());
+      assertNull(context.getAmple().readTablet(e2).getOperationId());
     }
   }
 
@@ -399,21 +403,23 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
     var ctmi = new ConditionalTabletsMutatorImpl(context);
     ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation()
-        .putCompactionId(7).submit();
+        .putCompactionId(7).submit(tm -> false);
     var results = ctmi.process();
     assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus());
     assertFalse(context.getAmple().readTablet(RootTable.EXTENT).getCompactId().isPresent());
 
     ctmi = new ConditionalTabletsMutatorImpl(context);
     ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
-        .requireLocation(Location.future(loc.getServerInstance())).putCompactionId(7).submit();
+        .requireLocation(Location.future(loc.getServerInstance())).putCompactionId(7)
+        .submit(tm -> false);
     results = ctmi.process();
     assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus());
     assertFalse(context.getAmple().readTablet(RootTable.EXTENT).getCompactId().isPresent());
 
     ctmi = new ConditionalTabletsMutatorImpl(context);
     ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
-        .requireLocation(Location.current(loc.getServerInstance())).putCompactionId(7).submit();
+        .requireLocation(Location.current(loc.getServerInstance())).putCompactionId(7)
+        .submit(tm -> false);
     results = ctmi.process();
     assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus());
     assertEquals(7L, context.getAmple().readTablet(RootTable.EXTENT).getCompactId().getAsLong());