You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by cs...@apache.org on 2024/01/27 00:18:41 UTC

(accumulo) branch elasticity updated: Refactor AccumuloStoreReadWriteIT to also run against ZooStore (#4202)

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new c8a5b11c4f Refactor AccumuloStoreReadWriteIT to also run against ZooStore (#4202)
c8a5b11c4f is described below

commit c8a5b11c4fbb46d70fcdd235249a6f0495eafe30
Author: Christopher L. Shannon <cs...@apache.org>
AuthorDate: Fri Jan 26 19:18:35 2024 -0500

    Refactor AccumuloStoreReadWriteIT to also run against ZooStore (#4202)
    
    This renames AccumuloStoreReadWriteIT to FateStoreIT and adds an
    implementation for both the Accumulo and Zookeeper Fate stores. The
    tests in this class apply to both so this change allows all the tests to
    run against both implementations
    
    The helper executor methods that were part of FateIT were moved into
    FateTestRunner to be re-used between the FateIT and FateStoreIT tests.
---
 .../java/org/apache/accumulo/test/fate/FateIT.java |  11 +-
 .../apache/accumulo/test/fate/FateTestRunner.java  |  37 +++
 .../test/fate/accumulo/AccumuloFateIT.java         |  12 +-
 .../test/fate/accumulo/AccumuloStoreFateIT.java    |  50 +++++
 .../fate/accumulo/AccumuloStoreReadWriteIT.java    | 248 ---------------------
 .../accumulo/test/fate/accumulo/FateStoreIT.java   | 232 +++++++++++++++++++
 .../{ZookeeperFateIT.java => ZooStoreFateIT.java}  |  43 +---
 .../test/fate/zookeeper/ZookeeperFateIT.java       |  11 +-
 8 files changed, 329 insertions(+), 315 deletions(-)

diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index d1797a42f9..380876aee0 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -49,7 +49,7 @@ import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class FateIT extends SharedMiniClusterBase {
+public abstract class FateIT extends SharedMiniClusterBase implements FateTestRunner {
 
   private static final Logger LOG = LoggerFactory.getLogger(FateIT.class);
 
@@ -373,15 +373,6 @@ public abstract class FateIT extends SharedMiniClusterBase {
 
   protected abstract TStatus getTxStatus(ServerContext sctx, long txid);
 
-  protected abstract void executeTest(FateTestExecutor testMethod) throws Exception;
-
-  protected abstract void executeTest(FateTestExecutor testMethod, int maxDeferred)
-      throws Exception;
-
-  protected interface FateTestExecutor {
-    void execute(FateStore<TestEnv> store, ServerContext sctx) throws Exception;
-  }
-
   private static void inCall() throws InterruptedException {
     // signal that call started
     callStarted.countDown();
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java
new file mode 100644
index 0000000000..b87702df91
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.fate.FateIT.TestEnv;
+
+public interface FateTestRunner {
+
+  void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception;
+
+  default void executeTest(FateTestExecutor testMethod) throws Exception {
+    executeTest(testMethod, 100_000);
+  }
+
+  interface FateTestExecutor {
+    void execute(FateStore<TestEnv> store, ServerContext sctx) throws Exception;
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
index c71c1d1229..0dec7e442b 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
@@ -51,19 +51,13 @@ public class AccumuloFateIT extends FateIT {
   }
 
   @Override
-  protected void executeTest(FateTestExecutor testMethod) throws Exception {
-    executeTest(testMethod, 1000);
-  }
-
-  @Override
-  protected void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception {
+  public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception {
     table = getUniqueNames(1)[0];
     try (ClientContext client =
         (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
       client.tableOperations().create(table);
-
-      final AccumuloStore<TestEnv> accumuloStore = new AccumuloStore<>(client, table, maxDeferred);
-      testMethod.execute(accumuloStore, getCluster().getServerContext());
+      testMethod.execute(new AccumuloStore<>(client, table, maxDeferred),
+          getCluster().getServerContext());
     }
   }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreFateIT.java
new file mode 100644
index 0000000000..7ef3e575de
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreFateIT.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+public class AccumuloStoreFateIT extends FateStoreIT {
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Override
+  public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception {
+    String table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table);
+      testMethod.execute(new AccumuloStore<>(client, table, maxDeferred),
+          getCluster().getServerContext());
+    }
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
deleted file mode 100644
index 236cb9624c..0000000000
--- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.fate.accumulo;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.time.Duration;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.fate.Fate.TxInfo;
-import org.apache.accumulo.core.fate.FateStore.FateTxStore;
-import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
-import org.apache.accumulo.core.fate.ReadOnlyRepo;
-import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
-import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.test.fate.FateIT.TestEnv;
-import org.apache.accumulo.test.fate.FateIT.TestRepo;
-import org.apache.accumulo.test.util.Wait;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-public class AccumuloStoreReadWriteIT extends SharedMiniClusterBase {
-
-  @BeforeAll
-  public static void setup() throws Exception {
-    SharedMiniClusterBase.startMiniCluster();
-  }
-
-  @AfterAll
-  public static void teardown() {
-    SharedMiniClusterBase.stopMiniCluster();
-  }
-
-  @Override
-  protected Duration defaultTimeout() {
-    return Duration.ofMinutes(1);
-  }
-
-  @Test
-  public void testReadWrite() throws Exception {
-    final String table = getUniqueNames(1)[0];
-    try (ClientContext client =
-        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
-      client.tableOperations().create(table);
-
-      AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table);
-      // Verify no transactions
-      assertEquals(0, store.list().count());
-
-      // Create a new transaction and get the store for it
-      long tid = store.create();
-      FateTxStore<TestEnv> txStore = store.reserve(tid);
-      assertTrue(txStore.timeCreated() > 0);
-      assertEquals(1, store.list().count());
-
-      // Push a test FATE op and verify we can read it back
-      txStore.push(new TestRepo("testOp"));
-      TestRepo op = (TestRepo) txStore.top();
-      assertNotNull(op);
-
-      // Test status
-      txStore.setStatus(TStatus.SUBMITTED);
-      assertEquals(TStatus.SUBMITTED, txStore.getStatus());
-
-      // Set a name to test setTransactionInfo()
-      txStore.setTransactionInfo(TxInfo.TX_NAME, "name");
-      assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME));
-
-      // Try setting a second test op to test getStack()
-      // when listing or popping TestOperation2 should be first
-      assertEquals(1, txStore.getStack().size());
-      txStore.push(new TestOperation2());
-      // test top returns TestOperation2
-      ReadOnlyRepo<TestEnv> top = txStore.top();
-      assertInstanceOf(TestOperation2.class, top);
-
-      // test get stack
-      List<ReadOnlyRepo<TestEnv>> ops = txStore.getStack();
-      assertEquals(2, ops.size());
-      assertInstanceOf(TestOperation2.class, ops.get(0));
-      assertEquals(TestRepo.class, ops.get(1).getClass());
-
-      // test pop, TestOperation should be left
-      txStore.pop();
-      ops = txStore.getStack();
-      assertEquals(1, ops.size());
-      assertEquals(TestRepo.class, ops.get(0).getClass());
-
-      // create second
-      FateTxStore<TestEnv> txStore2 = store.reserve(store.create());
-      assertEquals(2, store.list().count());
-
-      // test delete
-      txStore.delete();
-      assertEquals(1, store.list().count());
-      txStore2.delete();
-      assertEquals(0, store.list().count());
-    }
-  }
-
-  @Test
-  public void testReadWriteTxInfo() throws Exception {
-    final String table = getUniqueNames(1)[0];
-    try (ClientContext client =
-        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
-      client.tableOperations().create(table);
-
-      AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table);
-
-      long tid = store.create();
-      FateTxStore<TestEnv> txStore = store.reserve(tid);
-
-      try {
-        // Go through all enum values to verify each TxInfo type will be properly
-        // written and read from the store
-        for (TxInfo txInfo : TxInfo.values()) {
-          assertNull(txStore.getTransactionInfo(txInfo));
-          txStore.setTransactionInfo(txInfo, "value: " + txInfo.name());
-          assertEquals("value: " + txInfo.name(), txStore.getTransactionInfo(txInfo));
-        }
-      } finally {
-        txStore.delete();
-      }
-    }
-  }
-
-  @Test
-  public void testDeferredOverflow() throws Exception {
-    final String table = getUniqueNames(1)[0];
-    try (ClientContext client =
-        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
-      client.tableOperations().create(table);
-
-      AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table, 10);
-      // Verify no transactions
-      assertEquals(0, store.list().count());
-      assertFalse(store.isDeferredOverflow());
-
-      // Store 10 transactions that are all deferred
-      final Set<Long> transactions = new HashSet<>();
-      for (int i = 0; i < 10; i++) {
-        long tid = store.create();
-        transactions.add(tid);
-        FateTxStore<TestEnv> txStore = store.reserve(tid);
-        txStore.setStatus(TStatus.SUBMITTED);
-        assertTrue(txStore.timeCreated() > 0);
-        txStore.unreserve(10, TimeUnit.SECONDS);
-      }
-
-      // Verify we have 10 transactions and all are deferred
-      assertEquals(10, store.list().count());
-      assertEquals(10, store.getDeferredCount());
-
-      // Should still be false as we are at thet max but not over yet
-      assertFalse(store.isDeferredOverflow());
-
-      var executor = Executors.newCachedThreadPool();
-      Future<?> future;
-      AtomicBoolean keepRunning = new AtomicBoolean(true);
-      try {
-        // Run and verify all 10 transactions still exist and were not
-        // run because of the deferral time of all the transactions
-        future = executor.submit(() -> store.runnable(keepRunning, transactions::remove));
-        Thread.sleep(2000);
-        assertEquals(10, transactions.size());
-        // Setting this flag to false should terminate the task if sleeping
-        keepRunning.set(false);
-        // wait for the future to finish to verify the task finished
-        future.get();
-
-        // Store one more that should go over the max deferred of 10
-        // and should clear the map and set the overflow flag
-        long tid = store.create();
-        transactions.add(tid);
-        FateTxStore<TestEnv> txStore = store.reserve(tid);
-        txStore.setStatus(TStatus.SUBMITTED);
-        txStore.unreserve(30, TimeUnit.SECONDS);
-
-        // Verify we have 11 transactions stored and none
-        // deferred anymore because of the overflow
-        assertEquals(11, store.list().count());
-        assertEquals(0, store.getDeferredCount());
-        assertTrue(store.isDeferredOverflow());
-
-        // Run and verify all 11 transactions were processed
-        // and removed from the store
-        keepRunning.set(true);
-        future = executor.submit(() -> store.runnable(keepRunning, transactions::remove));
-        Wait.waitFor(transactions::isEmpty);
-        // Setting this flag to false should terminate the task if sleeping
-        keepRunning.set(false);
-        // wait for the future to finish to verify the task finished
-        future.get();
-
-        // Overflow should now be reset to false so adding another deferred
-        // transaction should now go back into the deferral map and flag should
-        // still be false as we are under the limit
-        assertFalse(store.isDeferredOverflow());
-        txStore = store.reserve(store.create());
-        txStore.unreserve(30, TimeUnit.SECONDS);
-        assertEquals(1, store.getDeferredCount());
-        assertFalse(store.isDeferredOverflow());
-      } finally {
-        executor.shutdownNow();
-      }
-    }
-  }
-
-  private static class TestOperation2 extends TestRepo {
-
-    private static final long serialVersionUID = 1L;
-
-    public TestOperation2() {
-      super("testOperation2");
-    }
-  }
-
-}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
new file mode 100644
index 0000000000..8ddd3b81b0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.StackOverflowException;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.fate.FateIT.TestEnv;
+import org.apache.accumulo.test.fate.FateIT.TestRepo;
+import org.apache.accumulo.test.fate.FateTestRunner;
+import org.apache.accumulo.test.util.Wait;
+import org.junit.jupiter.api.Test;
+
+public abstract class FateStoreIT extends SharedMiniClusterBase implements FateTestRunner {
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(1);
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    executeTest(this::testReadWrite);
+  }
+
+  protected void testReadWrite(FateStore<TestEnv> store, ServerContext sctx)
+      throws StackOverflowException {
+    // Verify no transactions
+    assertEquals(0, store.list().count());
+
+    // Create a new transaction and get the store for it
+    long tid = store.create();
+    FateTxStore<TestEnv> txStore = store.reserve(tid);
+    assertTrue(txStore.timeCreated() > 0);
+    assertEquals(1, store.list().count());
+
+    // Push a test FATE op and verify we can read it back
+    txStore.push(new TestRepo("testOp"));
+    TestRepo op = (TestRepo) txStore.top();
+    assertNotNull(op);
+
+    // Test status
+    txStore.setStatus(TStatus.SUBMITTED);
+    assertEquals(TStatus.SUBMITTED, txStore.getStatus());
+
+    // Set a name to test setTransactionInfo()
+    txStore.setTransactionInfo(TxInfo.TX_NAME, "name");
+    assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME));
+
+    // Try setting a second test op to test getStack()
+    // when listing or popping TestOperation2 should be first
+    assertEquals(1, txStore.getStack().size());
+    txStore.push(new TestOperation2());
+    // test top returns TestOperation2
+    ReadOnlyRepo<TestEnv> top = txStore.top();
+    assertInstanceOf(TestOperation2.class, top);
+
+    // test get stack
+    List<ReadOnlyRepo<TestEnv>> ops = txStore.getStack();
+    assertEquals(2, ops.size());
+    assertInstanceOf(TestOperation2.class, ops.get(0));
+    assertEquals(TestRepo.class, ops.get(1).getClass());
+
+    // test pop, TestOperation should be left
+    txStore.pop();
+    ops = txStore.getStack();
+    assertEquals(1, ops.size());
+    assertEquals(TestRepo.class, ops.get(0).getClass());
+
+    // create second
+    FateTxStore<TestEnv> txStore2 = store.reserve(store.create());
+    assertEquals(2, store.list().count());
+
+    // test delete
+    txStore.delete();
+    assertEquals(1, store.list().count());
+    txStore2.delete();
+    assertEquals(0, store.list().count());
+  }
+
+  @Test
+  public void testReadWriteTxInfo() throws Exception {
+    executeTest(this::testReadWriteTxInfo);
+  }
+
+  protected void testReadWriteTxInfo(FateStore<TestEnv> store, ServerContext sctx) {
+    long tid = store.create();
+    FateTxStore<TestEnv> txStore = store.reserve(tid);
+
+    try {
+      // Go through all enum values to verify each TxInfo type will be properly
+      // written and read from the store
+      for (TxInfo txInfo : TxInfo.values()) {
+        assertNull(txStore.getTransactionInfo(txInfo));
+        txStore.setTransactionInfo(txInfo, "value: " + txInfo.name());
+        assertEquals("value: " + txInfo.name(), txStore.getTransactionInfo(txInfo));
+      }
+    } finally {
+      txStore.delete();
+    }
+
+  }
+
+  @Test
+  public void testDeferredOverflow() throws Exception {
+    executeTest(this::testDeferredOverflow, 10);
+  }
+
+  protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx)
+      throws Exception {
+    // Verify no transactions
+    assertEquals(0, store.list().count());
+    assertFalse(store.isDeferredOverflow());
+
+    // Store 10 transactions that are all deferred
+    final Set<Long> transactions = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      long tid = store.create();
+      transactions.add(tid);
+      FateTxStore<TestEnv> txStore = store.reserve(tid);
+      txStore.setStatus(TStatus.SUBMITTED);
+      assertTrue(txStore.timeCreated() > 0);
+      txStore.unreserve(10, TimeUnit.SECONDS);
+    }
+
+    // Verify we have 10 transactions and all are deferred
+    assertEquals(10, store.list().count());
+    assertEquals(10, store.getDeferredCount());
+
+    // Should still be false as we are at thet max but not over yet
+    assertFalse(store.isDeferredOverflow());
+
+    var executor = Executors.newCachedThreadPool();
+    Future<?> future;
+    AtomicBoolean keepRunning = new AtomicBoolean(true);
+    try {
+      // Run and verify all 10 transactions still exist and were not
+      // run because of the deferral time of all the transactions
+      future = executor.submit(() -> store.runnable(keepRunning, transactions::remove));
+      Thread.sleep(2000);
+      assertEquals(10, transactions.size());
+      // Setting this flag to false should terminate the task if sleeping
+      keepRunning.set(false);
+      // wait for the future to finish to verify the task finished
+      future.get();
+
+      // Store one more that should go over the max deferred of 10
+      // and should clear the map and set the overflow flag
+      long tid = store.create();
+      transactions.add(tid);
+      FateTxStore<TestEnv> txStore = store.reserve(tid);
+      txStore.setStatus(TStatus.SUBMITTED);
+      txStore.unreserve(30, TimeUnit.SECONDS);
+
+      // Verify we have 11 transactions stored and none
+      // deferred anymore because of the overflow
+      assertEquals(11, store.list().count());
+      assertEquals(0, store.getDeferredCount());
+      assertTrue(store.isDeferredOverflow());
+
+      // Run and verify all 11 transactions were processed
+      // and removed from the store
+      keepRunning.set(true);
+      future = executor.submit(() -> store.runnable(keepRunning, transactions::remove));
+      Wait.waitFor(transactions::isEmpty);
+      // Setting this flag to false should terminate the task if sleeping
+      keepRunning.set(false);
+      // wait for the future to finish to verify the task finished
+      future.get();
+
+      // Overflow should now be reset to false so adding another deferred
+      // transaction should now go back into the deferral map and flag should
+      // still be false as we are under the limit
+      assertFalse(store.isDeferredOverflow());
+      txStore = store.reserve(store.create());
+      txStore.unreserve(30, TimeUnit.SECONDS);
+      assertEquals(1, store.getDeferredCount());
+      assertFalse(store.isDeferredOverflow());
+    } finally {
+      executor.shutdownNow();
+      // Cleanup so we don't interfere with other tests
+      store.list().forEach(fateIdStatus -> store.reserve(fateIdStatus.getTxid()).delete());
+    }
+  }
+
+  private static class TestOperation2 extends TestRepo {
+
+    private static final long serialVersionUID = 1L;
+
+    public TestOperation2() {
+      super("testOperation2");
+    }
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooStoreFateIT.java
similarity index 61%
copy from test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
copy to test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooStoreFateIT.java
index 175785270d..04530e317f 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooStoreFateIT.java
@@ -18,7 +18,6 @@
  */
 package org.apache.accumulo.test.fate.zookeeper;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER;
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
@@ -28,20 +27,18 @@ import java.io.File;
 import java.util.UUID;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.fate.ZooStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.test.fate.FateIT;
+import org.apache.accumulo.test.fate.accumulo.FateStoreIT;
 import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
-import org.apache.zookeeper.KeeperException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.io.TempDir;
 
 @Tag(ZOOKEEPER_TESTING_SERVER)
-public class ZookeeperFateIT extends FateIT {
+public class ZooStoreFateIT extends FateStoreIT {
 
   private static ZooKeeperTestingServer szk = null;
   private static ZooReaderWriter zk = null;
@@ -64,44 +61,12 @@ public class ZookeeperFateIT extends FateIT {
   }
 
   @Override
-  protected void executeTest(FateTestExecutor testMethod) throws Exception {
-    executeTest(testMethod, 1000);
-  }
-
-  @Override
-  protected void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception {
-    final ZooStore<TestEnv> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred);
-
+  public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception {
     ServerContext sctx = createMock(ServerContext.class);
     expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
     expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
     replay(sctx);
 
-    testMethod.execute(zooStore, sctx);
-  }
-
-  @Override
-  protected TStatus getTxStatus(ServerContext sctx, long txid) {
-    try {
-      return getTxStatus(sctx.getZooReaderWriter(), txid);
-    } catch (KeeperException | InterruptedException e) {
-      throw new IllegalStateException(e);
-    }
+    testMethod.execute(new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred), sctx);
   }
-
-  /*
-   * Get the status of the TX from ZK directly. Unable to call ZooStore.getStatus because this test
-   * thread does not have the reservation (the FaTE thread does)
-   */
-  private static TStatus getTxStatus(ZooReaderWriter zrw, long txid)
-      throws KeeperException, InterruptedException {
-    zrw.sync(ZK_ROOT);
-    String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, txid);
-    try {
-      return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8));
-    } catch (KeeperException.NoNodeException e) {
-      return TStatus.UNKNOWN;
-    }
-  }
-
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
index 175785270d..64a18d38a1 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
@@ -64,20 +64,13 @@ public class ZookeeperFateIT extends FateIT {
   }
 
   @Override
-  protected void executeTest(FateTestExecutor testMethod) throws Exception {
-    executeTest(testMethod, 1000);
-  }
-
-  @Override
-  protected void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception {
-    final ZooStore<TestEnv> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred);
-
+  public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception {
     ServerContext sctx = createMock(ServerContext.class);
     expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
     expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
     replay(sctx);
 
-    testMethod.execute(zooStore, sctx);
+    testMethod.execute(new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred), sctx);
   }
 
   @Override