You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2024/01/04 19:37:50 UTC

(accumulo) branch elasticity updated: Small improvements to AccumuloStore related code (#4121)

This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new e7d789dc79 Small improvements to AccumuloStore related code (#4121)
e7d789dc79 is described below

commit e7d789dc7903131ea8682b95ac55a6b954866433
Author: Dom G <do...@apache.org>
AuthorDate: Thu Jan 4 14:37:45 2024 -0500

    Small improvements to AccumuloStore related code (#4121)
---
 .../accumulo/core/fate/AbstractFateStore.java      | 41 +++++++++-------------
 .../accumulo/core/fate/accumulo/AccumuloStore.java | 39 +++++++++-----------
 2 files changed, 33 insertions(+), 47 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 5e840d3247..874b58d8c6 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -52,7 +52,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
   private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class);
 
   protected final Set<Long> reserved;
-  protected final Map<Long,Long> defered;
+  protected final Map<Long,Long> deferred;
 
   // This is incremented each time a transaction was unreserved that was non new
   protected final SignalCount unreservedNonNewCount = new SignalCount();
@@ -62,16 +62,13 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
 
   public AbstractFateStore() {
     this.reserved = new HashSet<>();
-    this.defered = new HashMap<>();
+    this.deferred = new HashMap<>();
   }
 
   public static byte[] serialize(Object o) {
-    try {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
       oos.writeObject(o);
-      oos.close();
-
       return baos.toByteArray();
     } catch (IOException e) {
       throw new UncheckedIOException(e);
@@ -82,9 +79,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
       justification = "unsafe to store arbitrary serialized objects like this, but needed for now"
           + " for backwards compatibility")
   public static Object deserialize(byte[] ser) {
-    try {
-      ByteArrayInputStream bais = new ByteArrayInputStream(ser);
-      ObjectInputStream ois = new ObjectInputStream(bais);
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(ser);
+        ObjectInputStream ois = new ObjectInputStream(bais)) {
       return ois.readObject();
     } catch (IOException e) {
       throw new UncheckedIOException(e);
@@ -97,7 +93,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
    * Attempt to reserve transaction
    *
    * @param tid transaction id
-   * @return true if reserved by this call, false if already reserved
+   * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or
+   *         an empty Optional if the transaction was already reserved.
    */
   @Override
   public Optional<FateTxStore<T>> tryReserve(long tid) {
@@ -144,28 +141,24 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
 
       synchronized (this) {
         runnableTids.removeIf(txid -> {
-          var deferedTime = defered.get(txid);
-          if (deferedTime != null) {
-            if (deferedTime >= System.currentTimeMillis()) {
+          var deferredTime = deferred.get(txid);
+          if (deferredTime != null) {
+            if (deferredTime >= System.currentTimeMillis()) {
               return true;
             } else {
-              defered.remove(txid);
+              deferred.remove(txid);
             }
           }
 
-          if (reserved.contains(txid)) {
-            return true;
-          }
-
-          return false;
+          return reserved.contains(txid);
         });
       }
 
       if (runnableTids.isEmpty()) {
         if (beforeCount == unreservedRunnableCount.getCount()) {
           long waitTime = 5000;
-          if (!defered.isEmpty()) {
-            Long minTime = Collections.min(defered.values());
+          if (!deferred.isEmpty()) {
+            Long minTime = Collections.min(deferred.values());
             waitTime = minTime - System.currentTimeMillis();
           }
 
@@ -180,7 +173,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
 
     }
 
-    return List.<Long>of().iterator();
+    return Collections.emptyIterator();
   }
 
   @Override
@@ -258,7 +251,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
         AbstractFateStore.this.notifyAll();
 
         if (deferTime > 0) {
-          defered.put(tid, System.currentTimeMillis() + deferTime);
+          deferred.put(tid, System.currentTimeMillis() + deferTime);
         }
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index aa5883a6d8..1c1284696e 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -26,7 +26,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -75,8 +74,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
     return scanTx(scanner -> {
       scanner.setRange(new Range());
       TxColumnFamily.STATUS_COLUMN.fetch(scanner);
-      return StreamSupport.stream(scanner.spliterator(), false)
-          .map(e -> e.getKey().getRow().toString()).collect(Collectors.toList());
+      return scanner.stream().map(e -> e.getKey().getRow().toString()).collect(Collectors.toList());
     });
   }
 
@@ -85,8 +83,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
     return scanTx(scanner -> {
       scanner.setRange(getRow(tid));
       TxColumnFamily.STATUS_COLUMN.fetch(scanner);
-      return StreamSupport.stream(scanner.spliterator(), false)
-          .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
+      return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst()
+          .orElse(TStatus.UNKNOWN);
     });
   }
 
@@ -125,7 +123,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
         scanner.setRange(getRow(tid));
         scanner.setBatchSize(1);
         scanner.fetchColumnFamily(RepoColumnFamily.NAME);
-        return StreamSupport.stream(scanner.spliterator(), false).map(e -> {
+        return scanner.stream().map(e -> {
           @SuppressWarnings("unchecked")
           var repo = (Repo<T>) deserialize(e.getValue().get());
           return repo;
@@ -140,7 +138,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
       return scanTx(scanner -> {
         scanner.setRange(getRow(tid));
         scanner.fetchColumnFamily(RepoColumnFamily.NAME);
-        return StreamSupport.stream(scanner.spliterator(), false).map(e -> {
+        return scanner.stream().map(e -> {
           @SuppressWarnings("unchecked")
           var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get());
           return repo;
@@ -174,8 +172,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
         }
         scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier());
 
-        return StreamSupport.stream(scanner.spliterator(), false)
-            .map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst().orElse(null);
+        return scanner.stream().map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst()
+            .orElse(null);
       } catch (TableNotFoundException e) {
         throw new IllegalStateException(tableName + " not found!", e);
       }
@@ -188,8 +186,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
       return scanTx(scanner -> {
         scanner.setRange(getRow(tid));
         TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner);
-        return StreamSupport.stream(scanner.spliterator(), false)
-            .map(e -> Long.parseLong(e.getValue().toString())).findFirst().orElse(0L);
+        return scanner.stream().map(e -> Long.parseLong(e.getValue().toString())).findFirst()
+            .orElse(0L);
       });
     }
 
@@ -197,18 +195,14 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
     public void push(Repo<T> repo) throws StackOverflowException {
       verifyReserved(true);
 
-      try {
-        Optional<Integer> top = findTop();
-
-        if (top.filter(t -> t >= maxRepos).isPresent()) {
-          throw new StackOverflowException("Repo stack size too large");
-        }
+      Optional<Integer> top = findTop();
 
-        FateMutator<T> fateMutator = newMutator(tid);
-        fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
-      } catch (StackOverflowException soe) {
-        throw soe;
+      if (top.filter(t -> t >= maxRepos).isPresent()) {
+        throw new StackOverflowException("Repo stack size too large");
       }
+
+      FateMutator<T> fateMutator = newMutator(tid);
+      fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
     }
 
     @Override
@@ -266,8 +260,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
         scanner.setRange(getRow(tid));
         scanner.setBatchSize(1);
         scanner.fetchColumnFamily(RepoColumnFamily.NAME);
-        return StreamSupport.stream(scanner.spliterator(), false)
-            .map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst();
+        return scanner.stream().map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst();
       });
     }
   }