You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2024/01/20 20:11:30 UTC

(accumulo) 01/02: Exposes status when listing fate operations (#4169)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ec3050e813ec428236c58dde8788256ac7f1e2cd
Author: Keith Turner <kt...@apache.org>
AuthorDate: Wed Jan 17 15:57:00 2024 -0500

    Exposes status when listing fate operations (#4169)
    
    The Accumulo store can very efficiently gather status the status at the
    same time as the id for a FATE transaction.  This is already being
    gathered internally in implementation of the fate storage layer.
    This commit exposes this information further as it will be useful in
    refactoring the ageoff store.
---
 .../accumulo/core/fate/AbstractFateStore.java       |  8 ++++----
 .../org/apache/accumulo/core/fate/AdminUtil.java    |  3 ++-
 .../org/apache/accumulo/core/fate/AgeOffStore.java  |  4 ++--
 .../accumulo/core/fate/ReadOnlyFateStore.java       |  8 +++++++-
 .../org/apache/accumulo/core/fate/ZooStore.java     |  2 +-
 .../accumulo/core/fate/accumulo/AccumuloStore.java  |  2 +-
 .../apache/accumulo/core/logging/FateLogger.java    |  2 +-
 .../apache/accumulo/core/fate/AgeOffStoreTest.java  | 21 +++++++++++++--------
 .../org/apache/accumulo/core/fate/TestStore.java    | 14 ++++++++++++--
 9 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 7125f692fe..d6cbf2780f 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -171,8 +171,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
   }
 
   @Override
-  public Stream<Long> list() {
-    return getTransactions().map(fateIdStatus -> fateIdStatus.txid);
+  public Stream<FateIdStatus> list() {
+    return getTransactions();
   }
 
   @Override
@@ -189,10 +189,10 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
     return Long.parseLong(txdir.split("_")[1], 16);
   }
 
-  public static abstract class FateIdStatus {
+  public static abstract class FateIdStatusBase implements FateIdStatus {
     private final long txid;
 
-    public FateIdStatus(long txid) {
+    public FateIdStatusBase(long txid) {
       this.txid = txid;
     }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index bbb7f42572..85bc34141c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.fate.zookeeper.FateLock;
@@ -368,7 +369,7 @@ public class AdminUtil<T> {
     final List<TransactionStatus> statuses = new ArrayList<>();
 
     fateStores.forEach((type, store) -> {
-      try (Stream<Long> tids = store.list()) {
+      try (Stream<Long> tids = store.list().map(FateIdStatus::getTxid)) {
         tids.forEach(tid -> {
 
           ReadOnlyFateTxStore<T> txStore = store.read(tid);
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
index c18b13f583..9470de9e18 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
@@ -129,7 +129,7 @@ public class AgeOffStore<T> implements FateStore<T> {
 
     // ELASTICITY_TODO need to rework how this class works so that it does not buffer everything in
     // memory.
-    List<Long> txids = store.list().collect(Collectors.toList());
+    List<Long> txids = store.list().map(FateIdStatus::getTxid).collect(Collectors.toList());
     for (Long txid : txids) {
       FateTxStore<T> txStore = store.reserve(txid);
       try {
@@ -203,7 +203,7 @@ public class AgeOffStore<T> implements FateStore<T> {
   }
 
   @Override
-  public Stream<Long> list() {
+  public Stream<FateIdStatus> list() {
     return store.list();
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
index e525c89ff9..c5f7a9027c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
@@ -118,12 +118,18 @@ public interface ReadOnlyFateStore<T> {
     long getID();
   }
 
+  interface FateIdStatus {
+    long getTxid();
+
+    TStatus getStatus();
+  }
+
   /**
    * list all transaction ids in store.
    *
    * @return all outstanding transactions, including those reserved by others.
    */
-  Stream<Long> list();
+  Stream<FateIdStatus> list();
 
   /**
    * Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index b4ad365dfa..31c299cf68 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -308,7 +308,7 @@ public class ZooStore<T> extends AbstractFateStore<T> {
         // Memoizing for two reasons. First the status may never be requested, so in that case avoid
         // the lookup. Second, if its requested multiple times the result will always be consistent.
         Supplier<TStatus> statusSupplier = Suppliers.memoize(() -> _getStatus(parseTid(strTxid)));
-        return new FateIdStatus(parseTid(strTxid)) {
+        return new FateIdStatusBase(parseTid(strTxid)) {
           @Override
           public TStatus getStatus() {
             return statusSupplier.get();
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index 9b870537fa..4e81065f0c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -82,7 +82,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
       scanner.setRange(new Range());
       TxColumnFamily.STATUS_COLUMN.fetch(scanner);
       return scanner.stream().onClose(scanner::close).map(e -> {
-        return new FateIdStatus(parseTid(e.getKey().getRow().toString())) {
+        return new FateIdStatusBase(parseTid(e.getKey().getRow().toString())) {
           @Override
           public TStatus getStatus() {
             return TStatus.valueOf(e.getValue().toString());
diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index bac25c321b..ffd854bad4 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -115,7 +115,7 @@ public class FateLogger {
       }
 
       @Override
-      public Stream<Long> list() {
+      public Stream<FateIdStatus> list() {
         return store.list();
       }
 
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
index 88447771ec..5cc2671615 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.AgeOffStore.TimeSource;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.zookeeper.KeeperException;
 import org.junit.jupiter.api.Test;
@@ -75,19 +76,21 @@ public class AgeOffStoreTest {
 
     aoStore.ageOff();
 
-    assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1, txid2, txid3, txid4),
+        aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
 
     tts.time = 15;
 
     aoStore.ageOff();
 
-    assertEquals(Set.of(txid1, txid3, txid4), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1, txid3, txid4),
+        aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
 
     tts.time = 30;
 
     aoStore.ageOff();
 
-    assertEquals(Set.of(txid1), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
   }
 
   @Test
@@ -117,17 +120,19 @@ public class AgeOffStoreTest {
 
     AgeOffStore<String> aoStore = new AgeOffStore<>(testStore, 10, tts);
 
-    assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1, txid2, txid3, txid4),
+        aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
 
     aoStore.ageOff();
 
-    assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1, txid2, txid3, txid4),
+        aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
 
     tts.time = 15;
 
     aoStore.ageOff();
 
-    assertEquals(Set.of(txid1), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
 
     txStore1 = aoStore.reserve(txid1);
     txStore1.setStatus(TStatus.FAILED_IN_PROGRESS);
@@ -137,7 +142,7 @@ public class AgeOffStoreTest {
 
     aoStore.ageOff();
 
-    assertEquals(Set.of(txid1), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
 
     txStore1 = aoStore.reserve(txid1);
     txStore1.setStatus(TStatus.FAILED);
@@ -145,7 +150,7 @@ public class AgeOffStoreTest {
 
     aoStore.ageOff();
 
-    assertEquals(Set.of(txid1), aoStore.list().collect(toSet()));
+    assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet()));
 
     tts.time = 42;
 
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 6dd6368d52..df1d711bae 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -167,8 +167,18 @@ public class TestStore implements FateStore<String> {
   }
 
   @Override
-  public Stream<Long> list() {
-    return new ArrayList<>(statuses.keySet()).stream();
+  public Stream<FateIdStatus> list() {
+    return new ArrayList<>(statuses.entrySet()).stream().map(e -> new FateIdStatus() {
+      @Override
+      public long getTxid() {
+        return e.getKey();
+      }
+
+      @Override
+      public TStatus getStatus() {
+        return e.getValue();
+      }
+    });
   }
 
   @Override