You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2024/01/30 15:03:10 UTC

(accumulo) branch elasticity updated: Use conditional mutations for AccumuloStore. Add checks for status and putRepo (#4160)

This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 11949de209 Use conditional mutations for AccumuloStore. Add checks for status and putRepo (#4160)
11949de209 is described below

commit 11949de209a1eba129a488970c946b9dc7ef1e7d
Author: Dom G <do...@apache.org>
AuthorDate: Tue Jan 30 10:03:05 2024 -0500

    Use conditional mutations for AccumuloStore. Add checks for status and putRepo (#4160)
    
    * Use conditional mutations for AccumuloStore
    * Add requireStatus method
    * Create new Condition instead of re-use
    * Add retry to create method and add test
    
    ---------
    
    Co-authored-by: Keith Turner <kt...@apache.org>
---
 .../accumulo/core/fate/accumulo/AccumuloStore.java |  39 ++++-
 .../accumulo/core/fate/accumulo/FateMutator.java   |   8 +
 .../core/fate/accumulo/FateMutatorImpl.java        |  75 +++++++--
 .../core/fate/accumulo/StatusMappingIterator.java  | 151 ++++++++++++++++++
 .../test/fate/accumulo/AccumuloStoreIT.java        |  94 +++++++++++
 .../test/fate/accumulo/FateMutatorImplIT.java      | 171 +++++++++++++++++++++
 6 files changed, 523 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index b5dc999d42..e4c36fd63a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -44,12 +44,17 @@ import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
 public class AccumuloStore<T> extends AbstractFateStore<T> {
 
+  private static final Logger log = LoggerFactory.getLogger(AccumuloStore.class);
+
   private final ClientContext context;
   private final String tableName;
 
@@ -73,11 +78,35 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
 
   @Override
   public long create() {
-    long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+    final int maxAttempts = 5;
+    long tid = 0L;
 
-    newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate();
+    for (int attempt = 0; attempt < maxAttempts; attempt++) {
+      if (attempt >= 1) {
+        log.debug("Failed to create new id: {}, trying again", tid);
+        UtilWaitThread.sleep(100);
+      }
+      tid = getTid();
+
+      var status = newMutator(tid).requireStatus().putStatus(TStatus.NEW)
+          .putCreateTime(System.currentTimeMillis()).tryMutate();
+
+      switch (status) {
+        case ACCEPTED:
+          return tid;
+        case UNKNOWN:
+        case REJECTED:
+          continue;
+        default:
+          throw new IllegalStateException("Unknown status " + status);
+      }
+    }
 
-    return tid;
+    throw new IllegalStateException("Failed to create new id after " + maxAttempts + " attempts");
+  }
+
+  public long getTid() {
+    return RANDOM.get().nextLong() & 0x7fffffffffffffffL;
   }
 
   @Override
@@ -249,11 +278,9 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
     public void setTransactionInfo(TxInfo txInfo, Serializable so) {
       verifyReserved(true);
 
-      FateMutator<T> fateMutator = newMutator(tid);
       final byte[] serialized = serializeTxInfo(so);
-      fateMutator.putTxInfo(txInfo, serialized);
 
-      fateMutator.mutate();
+      newMutator(tid).putTxInfo(txInfo, serialized).mutate();
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
index 4caf5985bd..22497006db 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
@@ -46,4 +46,12 @@ public interface FateMutator<T> {
 
   void mutate();
 
+  // This exists to represent the subset of statuses from ConditionalWriter.Status that are expected
+  // and need to be handled.
+  enum Status {
+    ACCEPTED, REJECTED, UNKNOWN
+  }
+
+  Status tryMutate();
+
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
index 90d22008d5..7056438d21 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
@@ -24,11 +24,16 @@ import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.invertRepo;
 
 import java.util.Objects;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.fate.Fate.TxInfo;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
@@ -45,13 +50,13 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
   private final ClientContext context;
   private final String tableName;
   private final long tid;
-  private final Mutation mutation;
+  private final ConditionalMutation mutation;
 
-  FateMutatorImpl(ClientContext context, String tableName, long tid) {
+  public FateMutatorImpl(ClientContext context, String tableName, long tid) {
     this.context = Objects.requireNonNull(context);
     this.tableName = Objects.requireNonNull(tableName);
     this.tid = tid;
-    this.mutation = new Mutation(new Text("tx_" + FastFormat.toHexString(tid)));
+    this.mutation = new ConditionalMutation(new Text("tx_" + FastFormat.toHexString(tid)));
   }
 
   @Override
@@ -122,7 +127,10 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
 
   @Override
   public FateMutator<T> putRepo(int position, Repo<T> repo) {
-    mutation.put(RepoColumnFamily.NAME, invertRepo(position), new Value(serialize(repo)));
+    final Text cq = invertRepo(position);
+    // ensure this repo is not already set
+    mutation.addCondition(new Condition(RepoColumnFamily.NAME, cq));
+    mutation.put(RepoColumnFamily.NAME, cq, new Value(serialize(repo)));
     return this;
   }
 
@@ -143,12 +151,61 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
     return this;
   }
 
+  /**
+   * Require that the transaction status is one of the given statuses. If no statuses are provided,
+   * require that the status column is absent.
+   *
+   * @param statuses The statuses to check against.
+   */
+  public FateMutator<T> requireStatus(TStatus... statuses) {
+    Condition condition = StatusMappingIterator.createCondition(statuses);
+    mutation.addCondition(condition);
+    return this;
+  }
+
   @Override
   public void mutate() {
-    try (BatchWriter writer = context.createBatchWriter(tableName)) {
-      writer.addMutation(mutation);
-    } catch (Exception e) {
-      throw new IllegalStateException(e);
+    var status = tryMutate();
+    if (status != Status.ACCEPTED) {
+      throw new IllegalStateException("Failed to write mutation " + status + " " + mutation);
+    }
+  }
+
+  @Override
+  public Status tryMutate() {
+    try {
+      // if there are no conditions attached, then we can use a batch writer
+      if (mutation.getConditions().isEmpty()) {
+        try (BatchWriter writer = context.createBatchWriter(tableName)) {
+          writer.addMutation(mutation);
+        } catch (MutationsRejectedException e) {
+          throw new RuntimeException(e);
+        }
+
+        return Status.ACCEPTED;
+      } else {
+        try (ConditionalWriter writer = context.createConditionalWriter(tableName)) {
+          ConditionalWriter.Result result = writer.write(mutation);
+
+          switch (result.getStatus()) {
+            case ACCEPTED:
+              return Status.ACCEPTED;
+            case REJECTED:
+              return Status.REJECTED;
+            case UNKNOWN:
+              return Status.UNKNOWN;
+            default:
+              // do not expect other statuses
+              throw new IllegalStateException(
+                  "Unhandled status for mutation " + result.getStatus());
+          }
+
+        } catch (AccumuloException | AccumuloSecurityException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException(e);
     }
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
new file mode 100644
index 0000000000..d7dc4fa22c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import static org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily.STATUS_COLUMN;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * A specialized iterator that maps the value of the status column to "present" or "absent". This
+ * iterator allows for checking of the status column's value against a set of acceptable statuses
+ * within a conditional mutation.
+ */
+public class StatusMappingIterator implements SortedKeyValueIterator<Key,Value> {
+
+  private static final String PRESENT = "present";
+  private static final String ABSENT = "absent";
+  private static final String STATUS_SET_KEY = "statusSet";
+
+  private SortedKeyValueIterator<Key,Value> source;
+  private final Set<String> acceptableStatuses = new HashSet<>();
+  private Value mappedValue;
+
+  /**
+   * The set of acceptable must be provided as an option to the iterator using the
+   * {@link #STATUS_SET_KEY} key.
+   */
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    this.source = source;
+    if (options.containsKey(STATUS_SET_KEY)) {
+      String[] statuses = decodeStatuses(options.get(STATUS_SET_KEY));
+      acceptableStatuses.addAll(Arrays.asList(statuses));
+    }
+  }
+
+  @Override
+  public boolean hasTop() {
+    return source.hasTop();
+  }
+
+  @Override
+  public void next() throws IOException {
+    source.next();
+    mapValue();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+    source.seek(range, columnFamilies, inclusive);
+    mapValue();
+  }
+
+  /**
+   * Maps the value of the status column to "present" or "absent" based on its presence within the
+   * set of statuses.
+   */
+  private void mapValue() {
+    if (source.hasTop()) {
+      String currentValue = source.getTopValue().toString();
+      mappedValue =
+          acceptableStatuses.contains(currentValue) ? new Value(PRESENT) : new Value(ABSENT);
+    } else {
+      mappedValue = null;
+    }
+  }
+
+  @Override
+  public Key getTopKey() {
+    return source.getTopKey();
+  }
+
+  @Override
+  public Value getTopValue() {
+    return Objects.requireNonNull(mappedValue);
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Creates a condition that checks if the status column's value is one of the given acceptable
+   * statuses.
+   *
+   * @param statuses The acceptable statuses.
+   * @return A condition configured to use this iterator.
+   */
+  public static Condition createCondition(ReadOnlyFateStore.TStatus... statuses) {
+    Condition condition =
+        new Condition(STATUS_COLUMN.getColumnFamily(), STATUS_COLUMN.getColumnQualifier());
+
+    if (statuses.length == 0) {
+      // If no statuses are provided, require the status column to be absent. Return the condition
+      // with no value set so that the mutation will be rejected if the status column is present.
+      return condition;
+    } else {
+      IteratorSetting is = new IteratorSetting(100, StatusMappingIterator.class);
+      is.addOption(STATUS_SET_KEY, encodeStatuses(statuses));
+
+      // If the value of the status column is in the set, it will be mapped to "present", so set the
+      // value of the condition to "present".
+      return condition.setValue(PRESENT).setIterators(is);
+    }
+  }
+
+  private static String encodeStatuses(ReadOnlyFateStore.TStatus[] statuses) {
+    return Arrays.stream(statuses).map(Enum::name).collect(Collectors.joining(","));
+  }
+
+  private static String[] decodeStatuses(String statuses) {
+    return statuses.split(",");
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
new file mode 100644
index 0000000000..88c2ac4884
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloStoreIT extends SharedMiniClusterBase {
+
+  private static final Logger log = LoggerFactory.getLogger(AccumuloStore.class);
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  private static class TestAccumuloStore extends AccumuloStore<String> {
+    private final Iterator<Long> tidIterator;
+
+    // use the list of txids to simulate collisions on txids
+    public TestAccumuloStore(ClientContext context, String tableName, List<Long> txids) {
+      super(context, tableName);
+      this.tidIterator = txids.iterator();
+    }
+
+    @Override
+    public long getTid() {
+      if (tidIterator.hasNext()) {
+        return tidIterator.next();
+      } else {
+        return -1L;
+      }
+    }
+  }
+
+  @Test
+  public void testCreateWithCollisionAndExceedRetryLimit() throws Exception {
+    String table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table);
+
+      List<Long> txids = List.of(1L, 1L, 1L, 2L, 3L, 3L, 3L, 3L, 4L, 4L, 5L, 5L, 5L, 5L, 5L, 5L);
+      Set<Long> expectedTids = new TreeSet<>(txids);
+      TestAccumuloStore store = new TestAccumuloStore(client, table, txids);
+
+      // call create and expect we get the unique txids
+      for (Long expectedTid : expectedTids) {
+        long tid = store.create();
+        log.info("Created tid: " + tid);
+        assertEquals(expectedTid, tid, "Expected " + expectedTid + " but got " + tid);
+      }
+
+      // Calling create again on 5L should throw an exception since we've exceeded the max retries
+      assertThrows(IllegalStateException.class, store::create);
+    }
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java
new file mode 100644
index 0000000000..27e6dd650b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.apache.accumulo.core.fate.accumulo.FateMutator.Status.ACCEPTED;
+import static org.apache.accumulo.core.fate.accumulo.FateMutator.Status.REJECTED;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.time.Duration;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.accumulo.FateMutatorImpl;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FateIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FateMutatorImplIT extends SharedMiniClusterBase {
+
+  Logger log = LoggerFactory.getLogger(FateMutatorImplIT.class);
+  final NewTableConfiguration ntc =
+      new NewTableConfiguration().withInitialHostingGoal(TabletHostingGoal.ALWAYS);
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(5);
+  }
+
+  @Test
+  public void putRepo() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table, ntc);
+
+      ClientContext context = (ClientContext) client;
+
+      final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+      // add some repos in order
+      FateMutatorImpl<FateIT.TestEnv> fateMutator = new FateMutatorImpl<>(context, table, tid);
+      fateMutator.putRepo(100, new FateIT.TestRepo("test")).mutate();
+      FateMutatorImpl<FateIT.TestEnv> fateMutator1 = new FateMutatorImpl<>(context, table, tid);
+      fateMutator1.putRepo(99, new FateIT.TestRepo("test")).mutate();
+      FateMutatorImpl<FateIT.TestEnv> fateMutator2 = new FateMutatorImpl<>(context, table, tid);
+      fateMutator2.putRepo(98, new FateIT.TestRepo("test")).mutate();
+
+      // make sure we cant add a repo that has already been added
+      FateMutatorImpl<FateIT.TestEnv> fateMutator3 = new FateMutatorImpl<>(context, table, tid);
+      assertThrows(IllegalStateException.class,
+          () -> fateMutator3.putRepo(98, new FateIT.TestRepo("test")).mutate(),
+          "Repo in position 98 already exists. Expected to not be able to add it again.");
+      FateMutatorImpl<FateIT.TestEnv> fateMutator4 = new FateMutatorImpl<>(context, table, tid);
+      assertThrows(IllegalStateException.class,
+          () -> fateMutator4.putRepo(99, new FateIT.TestRepo("test")).mutate(),
+          "Repo in position 99 already exists. Expected to not be able to add it again.");
+    }
+  }
+
+  @Test
+  public void requireStatus() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table, ntc);
+
+      ClientContext context = (ClientContext) client;
+
+      final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+      // use require status passing all statuses. without the status column present this should fail
+      assertThrows(IllegalStateException.class,
+          () -> new FateMutatorImpl<>(context, table, tid)
+              .requireStatus(ReadOnlyFateStore.TStatus.values())
+              .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate());
+      assertEquals(0, client.createScanner(table).stream().count());
+      var status = new FateMutatorImpl<>(context, table, tid)
+          .requireStatus(ReadOnlyFateStore.TStatus.values())
+          .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
+      assertEquals(REJECTED, status);
+      assertEquals(0, client.createScanner(table).stream().count());
+
+      // use require status without passing any statuses to require that the status column is absent
+      status = new FateMutatorImpl<>(context, table, tid).requireStatus()
+          .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
+      assertEquals(ACCEPTED, status);
+
+      // try again with requiring an absent status column. this time it should fail because we just
+      // put status NEW
+      assertThrows(IllegalStateException.class,
+          () -> new FateMutatorImpl<>(context, table, tid).requireStatus()
+              .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate(),
+          "Expected to not be able to use requireStatus() without passing any statuses");
+      status = new FateMutatorImpl<>(context, table, tid).requireStatus()
+          .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
+      assertEquals(REJECTED, status,
+          "Expected to not be able to use requireStatus() without passing any statuses");
+
+      // now use require same with the current status, NEW passed in
+      status =
+          new FateMutatorImpl<>(context, table, tid).requireStatus(ReadOnlyFateStore.TStatus.NEW)
+              .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate();
+      assertEquals(ACCEPTED, status);
+
+      // use require same with an array of statuses, none of which are the current status
+      // (SUBMITTED)
+      assertThrows(IllegalStateException.class,
+          () -> new FateMutatorImpl<>(context, table, tid)
+              .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN)
+              .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).mutate(),
+          "Expected to not be able to use requireStatus() with statuses that do not match the current status");
+      status = new FateMutatorImpl<>(context, table, tid)
+          .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN)
+          .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate();
+      assertEquals(REJECTED, status,
+          "Expected to not be able to use requireStatus() with statuses that do not match the current status");
+
+      // use require same with an array of statuses, one of which is the current status (SUBMITTED)
+      status = new FateMutatorImpl<>(context, table, tid)
+          .requireStatus(ReadOnlyFateStore.TStatus.UNKNOWN, ReadOnlyFateStore.TStatus.SUBMITTED)
+          .putStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS).tryMutate();
+      assertEquals(ACCEPTED, status);
+
+      // one more time check that we can use require same with the current status (IN_PROGRESS)
+      status = new FateMutatorImpl<>(context, table, tid)
+          .requireStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS)
+          .putStatus(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS).tryMutate();
+      assertEquals(ACCEPTED, status);
+
+    }
+
+  }
+
+  void logAllEntriesInTable(String tableName, AccumuloClient client) throws Exception {
+    client.createScanner(tableName).forEach(e -> log.info(e.getKey() + " " + e.getValue()));
+  }
+}