You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2024/01/04 19:37:50 UTC
(accumulo) branch elasticity updated: Small improvements to AccumuloStore related code (#4121)
This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new e7d789dc79 Small improvements to AccumuloStore related code (#4121)
e7d789dc79 is described below
commit e7d789dc7903131ea8682b95ac55a6b954866433
Author: Dom G <do...@apache.org>
AuthorDate: Thu Jan 4 14:37:45 2024 -0500
Small improvements to AccumuloStore related code (#4121)
---
.../accumulo/core/fate/AbstractFateStore.java | 41 +++++++++-------------
.../accumulo/core/fate/accumulo/AccumuloStore.java | 39 +++++++++-----------
2 files changed, 33 insertions(+), 47 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 5e840d3247..874b58d8c6 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -52,7 +52,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class);
protected final Set<Long> reserved;
- protected final Map<Long,Long> defered;
+ protected final Map<Long,Long> deferred;
// This is incremented each time a transaction was unreserved that was non new
protected final SignalCount unreservedNonNewCount = new SignalCount();
@@ -62,16 +62,13 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
public AbstractFateStore() {
this.reserved = new HashSet<>();
- this.defered = new HashMap<>();
+ this.deferred = new HashMap<>();
}
public static byte[] serialize(Object o) {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(o);
- oos.close();
-
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -82,9 +79,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
justification = "unsafe to store arbitrary serialized objects like this, but needed for now"
+ " for backwards compatibility")
public static Object deserialize(byte[] ser) {
- try {
- ByteArrayInputStream bais = new ByteArrayInputStream(ser);
- ObjectInputStream ois = new ObjectInputStream(bais);
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(ser);
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -97,7 +93,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
* Attempt to reserve transaction
*
* @param tid transaction id
- * @return true if reserved by this call, false if already reserved
+ * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or
+ * an empty Optional if the transaction was already reserved.
*/
@Override
public Optional<FateTxStore<T>> tryReserve(long tid) {
@@ -144,28 +141,24 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
synchronized (this) {
runnableTids.removeIf(txid -> {
- var deferedTime = defered.get(txid);
- if (deferedTime != null) {
- if (deferedTime >= System.currentTimeMillis()) {
+ var deferredTime = deferred.get(txid);
+ if (deferredTime != null) {
+ if (deferredTime >= System.currentTimeMillis()) {
return true;
} else {
- defered.remove(txid);
+ deferred.remove(txid);
}
}
- if (reserved.contains(txid)) {
- return true;
- }
-
- return false;
+ return reserved.contains(txid);
});
}
if (runnableTids.isEmpty()) {
if (beforeCount == unreservedRunnableCount.getCount()) {
long waitTime = 5000;
- if (!defered.isEmpty()) {
- Long minTime = Collections.min(defered.values());
+ if (!deferred.isEmpty()) {
+ Long minTime = Collections.min(deferred.values());
waitTime = minTime - System.currentTimeMillis();
}
@@ -180,7 +173,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
}
- return List.<Long>of().iterator();
+ return Collections.emptyIterator();
}
@Override
@@ -258,7 +251,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
AbstractFateStore.this.notifyAll();
if (deferTime > 0) {
- defered.put(tid, System.currentTimeMillis() + deferTime);
+ deferred.put(tid, System.currentTimeMillis() + deferTime);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index aa5883a6d8..1c1284696e 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -26,7 +26,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -75,8 +74,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
return scanTx(scanner -> {
scanner.setRange(new Range());
TxColumnFamily.STATUS_COLUMN.fetch(scanner);
- return StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> e.getKey().getRow().toString()).collect(Collectors.toList());
+ return scanner.stream().map(e -> e.getKey().getRow().toString()).collect(Collectors.toList());
});
}
@@ -85,8 +83,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
return scanTx(scanner -> {
scanner.setRange(getRow(tid));
TxColumnFamily.STATUS_COLUMN.fetch(scanner);
- return StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
+ return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst()
+ .orElse(TStatus.UNKNOWN);
});
}
@@ -125,7 +123,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
scanner.setRange(getRow(tid));
scanner.setBatchSize(1);
scanner.fetchColumnFamily(RepoColumnFamily.NAME);
- return StreamSupport.stream(scanner.spliterator(), false).map(e -> {
+ return scanner.stream().map(e -> {
@SuppressWarnings("unchecked")
var repo = (Repo<T>) deserialize(e.getValue().get());
return repo;
@@ -140,7 +138,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
return scanTx(scanner -> {
scanner.setRange(getRow(tid));
scanner.fetchColumnFamily(RepoColumnFamily.NAME);
- return StreamSupport.stream(scanner.spliterator(), false).map(e -> {
+ return scanner.stream().map(e -> {
@SuppressWarnings("unchecked")
var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get());
return repo;
@@ -174,8 +172,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
}
scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier());
- return StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst().orElse(null);
+ return scanner.stream().map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst()
+ .orElse(null);
} catch (TableNotFoundException e) {
throw new IllegalStateException(tableName + " not found!", e);
}
@@ -188,8 +186,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
return scanTx(scanner -> {
scanner.setRange(getRow(tid));
TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner);
- return StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> Long.parseLong(e.getValue().toString())).findFirst().orElse(0L);
+ return scanner.stream().map(e -> Long.parseLong(e.getValue().toString())).findFirst()
+ .orElse(0L);
});
}
@@ -197,18 +195,14 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
public void push(Repo<T> repo) throws StackOverflowException {
verifyReserved(true);
- try {
- Optional<Integer> top = findTop();
-
- if (top.filter(t -> t >= maxRepos).isPresent()) {
- throw new StackOverflowException("Repo stack size too large");
- }
+ Optional<Integer> top = findTop();
- FateMutator<T> fateMutator = newMutator(tid);
- fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
- } catch (StackOverflowException soe) {
- throw soe;
+ if (top.filter(t -> t >= maxRepos).isPresent()) {
+ throw new StackOverflowException("Repo stack size too large");
}
+
+ FateMutator<T> fateMutator = newMutator(tid);
+ fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
}
@Override
@@ -266,8 +260,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
scanner.setRange(getRow(tid));
scanner.setBatchSize(1);
scanner.fetchColumnFamily(RepoColumnFamily.NAME);
- return StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst();
+ return scanner.stream().map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst();
});
}
}