You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2024/02/05 23:11:08 UTC
(accumulo) branch elasticity updated: Globally Unique FATE Transaction Ids - Part 2 (#4228)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new b1cf91cc5c Globally Unique FATE Transaction Ids - Part 2 (#4228)
b1cf91cc5c is described below
commit b1cf91cc5cf02795ab720c9e47fbc5ab83aadc25
Author: Kevin Rathbun <43...@users.noreply.github.com>
AuthorDate: Mon Feb 5 18:11:01 2024 -0500
Globally Unique FATE Transaction Ids - Part 2 (#4228)
Change Repo to use FateId
---
.../java/org/apache/accumulo/core/fate/Fate.java | 6 +-
.../java/org/apache/accumulo/core/fate/FateId.java | 3 +-
.../apache/accumulo/core/fate/ReadOnlyRepo.java | 3 +-
.../java/org/apache/accumulo/core/fate/Repo.java | 4 +-
.../coordinator/commit/CommitCompaction.java | 3 +-
.../coordinator/commit/PutGcCandidates.java | 3 +-
.../coordinator/commit/RefreshTablet.java | 3 +-
.../coordinator/commit/RenameCompactionFile.java | 3 +-
.../manager/tableOps/ChangeTableState.java | 19 +++--
.../accumulo/manager/tableOps/ManagerRepo.java | 7 +-
.../accumulo/manager/tableOps/TraceRepo.java | 13 +--
.../apache/accumulo/manager/tableOps/Utils.java | 52 ++++++------
.../availability/SetTabletAvailability.java | 19 +++--
.../manager/tableOps/bulkVer2/BulkImportMove.java | 19 ++---
.../tableOps/bulkVer2/CleanUpBulkImport.java | 41 ++++-----
.../manager/tableOps/bulkVer2/LoadFiles.java | 41 +++++----
.../manager/tableOps/bulkVer2/PrepBulkImport.java | 27 +++---
.../manager/tableOps/bulkVer2/RefreshTablets.java | 10 ++-
.../manager/tableOps/clone/CloneMetadata.java | 7 +-
.../manager/tableOps/clone/ClonePermissions.java | 7 +-
.../manager/tableOps/clone/CloneTable.java | 15 ++--
.../manager/tableOps/clone/CloneZookeeper.java | 17 ++--
.../manager/tableOps/clone/FinishCloneTable.java | 15 ++--
.../accumulo/manager/tableOps/compact/CleanUp.java | 26 +++---
.../manager/tableOps/compact/CompactRange.java | 21 +++--
.../manager/tableOps/compact/CompactionDriver.java | 96 ++++++++++++----------
.../manager/tableOps/compact/RefreshTablets.java | 8 +-
.../tableOps/compact/cancel/CancelCompactions.java | 21 ++---
.../compact/cancel/FinishCancelCompaction.java | 9 +-
.../manager/tableOps/create/ChooseDir.java | 7 +-
.../manager/tableOps/create/CreateTable.java | 11 +--
.../manager/tableOps/create/FinishCreateTable.java | 11 +--
.../manager/tableOps/create/PopulateMetadata.java | 7 +-
.../manager/tableOps/create/PopulateZookeeper.java | 11 +--
.../manager/tableOps/create/SetupPermissions.java | 5 +-
.../accumulo/manager/tableOps/delete/CleanUp.java | 11 +--
.../manager/tableOps/delete/DeleteTable.java | 15 ++--
.../manager/tableOps/delete/PreDeleteTable.java | 19 +++--
.../manager/tableOps/delete/ReserveTablets.java | 14 ++--
.../manager/tableOps/merge/CountFiles.java | 7 +-
.../manager/tableOps/merge/DeleteRows.java | 34 ++++----
.../manager/tableOps/merge/DeleteTablets.java | 18 ++--
.../manager/tableOps/merge/FinishTableRangeOp.java | 27 +++---
.../manager/tableOps/merge/MergeTablets.java | 30 +++----
.../manager/tableOps/merge/ReserveTablets.java | 15 ++--
.../manager/tableOps/merge/TableRangeOp.java | 15 ++--
.../manager/tableOps/merge/UnreserveAndError.java | 5 +-
.../tableOps/namespace/create/CreateNamespace.java | 7 +-
.../namespace/create/FinishCreateNamespace.java | 9 +-
.../create/PopulateZookeeperWithNamespace.java | 11 +--
.../create/SetupNamespacePermissions.java | 3 +-
.../tableOps/namespace/delete/DeleteNamespace.java | 12 +--
.../namespace/delete/NamespaceCleanUp.java | 9 +-
.../tableOps/namespace/rename/RenameNamespace.java | 14 ++--
.../manager/tableOps/rename/RenameTable.java | 19 +++--
.../manager/tableOps/split/DeleteOperationIds.java | 6 +-
.../accumulo/manager/tableOps/split/PreSplit.java | 33 ++++----
.../manager/tableOps/split/UpdateTablets.java | 28 ++++---
.../manager/tableOps/tableExport/ExportTable.java | 12 +--
.../tableOps/tableExport/WriteExportFiles.java | 22 ++---
.../tableOps/tableImport/CreateImportDir.java | 3 +-
.../tableOps/tableImport/FinishImportTable.java | 13 +--
.../tableImport/ImportPopulateZookeeper.java | 11 +--
.../tableImport/ImportSetupPermissions.java | 7 +-
.../manager/tableOps/tableImport/ImportTable.java | 15 ++--
.../tableOps/tableImport/MapImportFileNames.java | 5 +-
.../tableOps/tableImport/MoveExportedFiles.java | 13 ++-
.../tableImport/PopulateMetadataTable.java | 5 +-
.../manager/tserverOps/ShutdownTServer.java | 7 +-
.../manager/tableOps/ShutdownTServerTest.java | 10 ++-
.../tableOps/bulkVer2/PrepBulkImportTest.java | 3 +-
.../java/org/apache/accumulo/test/fate/FateIT.java | 22 +++--
72 files changed, 571 insertions(+), 508 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 938b76ef4c..35807ee0fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -157,7 +157,7 @@ public class Fate<T> {
} else if (status == SUBMITTED || status == IN_PROGRESS) {
Repo<T> prevOp = null;
try {
- deferTime = op.isReady(txStore.getID().getTid(), environment);
+ deferTime = op.isReady(txStore.getID(), environment);
// Here, deferTime is only used to determine success (zero) or failure (non-zero),
// proceeding on success and returning to the while loop on failure.
@@ -167,7 +167,7 @@ public class Fate<T> {
if (status == SUBMITTED) {
txStore.setStatus(IN_PROGRESS);
}
- op = op.call(txStore.getID().getTid(), environment);
+ op = op.call(txStore.getID(), environment);
} else {
continue;
}
@@ -278,7 +278,7 @@ public class Fate<T> {
private void undo(FateId fateId, Repo<T> op) {
try {
- op.undo(fateId.getTid(), environment);
+ op.undo(fateId, environment);
} catch (Exception e) {
log.warn("Failed to undo Repo, " + fateId, e);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java
index 90e87c67d5..2e0214782a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java
@@ -105,10 +105,9 @@ public class FateId extends AbstractId<FateId> {
}
/**
- * Formats transaction ids in a consistent way that is useful for logging and persisting.
+ * Returns the hex string equivalent of the tid
*/
public static String formatTid(long tid) {
- // do not change how this formats without considering implications for persistence
return FastFormat.toHexString(tid);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyRepo.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyRepo.java
index 836a42d675..79d7b70fd0 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyRepo.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyRepo.java
@@ -25,8 +25,7 @@ package org.apache.accumulo.core.fate;
* also be safe to call without impacting the state of system components.
*/
public interface ReadOnlyRepo<T> {
-
- long isReady(long tid, T environment) throws Exception;
+ long isReady(FateId fateId, T environment) throws Exception;
String getName();
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Repo.java b/core/src/main/java/org/apache/accumulo/core/fate/Repo.java
index d191df9e21..9f00519d6a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Repo.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Repo.java
@@ -25,9 +25,9 @@ import java.io.Serializable;
*/
public interface Repo<T> extends ReadOnlyRepo<T>, Serializable {
- Repo<T> call(long tid, T environment) throws Exception;
+ Repo<T> call(FateId fateId, T environment) throws Exception;
- void undo(long tid, T environment) throws Exception;
+ void undo(FateId fateId, T environment) throws Exception;
// this allows the last fate op to return something to the user
String getReturn();
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index 6063a3b38f..ca80bb8d2e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.AbstractTabletFile;
@@ -69,7 +70,7 @@ public class CommitCompaction extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
var ecid = ExternalCompactionId.of(commitData.ecid);
var newFile = Optional.ofNullable(newDatafile).map(f -> ReferencedTabletFile.of(new Path(f)));
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java
index 6ce5e37ea7..b0cefe3dfe 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.compaction.coordinator.commit;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.manager.Manager;
@@ -34,7 +35,7 @@ public class PutGcCandidates extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
// add the GC candidates
manager.getContext().getAmple().putGcCandidates(commitData.getTableId(),
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java
index d2044990a0..9cc486411b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -44,7 +45,7 @@ public class RefreshTablet extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
TServerInstance tsi = new TServerInstance(tserverInstance);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java
index 2261ae7bf3..af271a219a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.coordinator.commit;
import java.io.IOException;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.manager.Manager;
@@ -39,7 +40,7 @@ public class RenameCompactionFile extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
ReferencedTabletFile newDatafile = null;
var ctx = manager.getContext();
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java
index 12039d0c22..17fb9b40c0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
@@ -49,31 +50,31 @@ public class ChangeTableState extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager env) throws Exception {
+ public long isReady(FateId fateId, Manager env) throws Exception {
// reserve the table so that this op does not run concurrently with create, clone, or delete
// table
- return Utils.reserveNamespace(env, namespaceId, tid, false, true, top)
- + Utils.reserveTable(env, tableId, tid, true, true, top);
+ return Utils.reserveNamespace(env, namespaceId, fateId, false, true, top)
+ + Utils.reserveTable(env, tableId, fateId, true, true, top);
}
@Override
- public Repo<Manager> call(long tid, Manager env) {
+ public Repo<Manager> call(FateId fateId, Manager env) {
TableState ts = TableState.ONLINE;
if (top == TableOperation.OFFLINE) {
ts = TableState.OFFLINE;
}
env.getTableManager().transitionTableState(tableId, ts, expectedCurrStates);
- Utils.unreserveNamespace(env, namespaceId, tid, false);
- Utils.unreserveTable(env, tableId, tid, true);
+ Utils.unreserveNamespace(env, namespaceId, fateId, false);
+ Utils.unreserveTable(env, tableId, fateId, true);
LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state {} {}", tableId, ts);
env.getEventCoordinator().event(tableId, "Set table state of %s to %s", tableId, ts);
return null;
}
@Override
- public void undo(long tid, Manager env) {
- Utils.unreserveNamespace(env, namespaceId, tid, false);
- Utils.unreserveTable(env, tableId, tid, true);
+ public void undo(FateId fateId, Manager env) {
+ Utils.unreserveNamespace(env, namespaceId, fateId, false);
+ Utils.unreserveTable(env, tableId, fateId, true);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ManagerRepo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ManagerRepo.java
index e2910c8a0f..3bcbac9959 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ManagerRepo.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ManagerRepo.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.tableOps;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
@@ -26,12 +27,12 @@ public abstract class ManagerRepo implements Repo<Manager> {
private static final long serialVersionUID = 1L;
@Override
- public long isReady(long tid, Manager environment) throws Exception {
+ public long isReady(FateId fateId, Manager environment) throws Exception {
return 0;
}
@Override
- public void undo(long tid, Manager environment) throws Exception {}
+ public void undo(FateId fateId, Manager environment) throws Exception {}
@Override
public String getName() {
@@ -44,6 +45,6 @@ public abstract class ManagerRepo implements Repo<Manager> {
}
@Override
- public abstract Repo<Manager> call(long tid, Manager environment) throws Exception;
+ public abstract Repo<Manager> call(FateId fateId, Manager environment) throws Exception;
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TraceRepo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TraceRepo.java
index 752bc90d44..1c6227862e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TraceRepo.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TraceRepo.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps;
import static org.apache.accumulo.core.util.LazySingletons.GSON;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.manager.Manager;
@@ -41,10 +42,10 @@ public class TraceRepo<T> implements Repo<T> {
}
@Override
- public long isReady(long tid, T environment) throws Exception {
+ public long isReady(FateId fateId, T environment) throws Exception {
Span span = TraceUtil.startFateSpan(repo.getClass(), repo.getName(), tinfo);
try (Scope scope = span.makeCurrent()) {
- return repo.isReady(tid, environment);
+ return repo.isReady(fateId, environment);
} catch (Exception e) {
TraceUtil.setException(span, e, true);
throw e;
@@ -54,10 +55,10 @@ public class TraceRepo<T> implements Repo<T> {
}
@Override
- public Repo<T> call(long tid, T environment) throws Exception {
+ public Repo<T> call(FateId fateId, T environment) throws Exception {
Span span = TraceUtil.startFateSpan(repo.getClass(), repo.getName(), tinfo);
try (Scope scope = span.makeCurrent()) {
- Repo<T> result = repo.call(tid, environment);
+ Repo<T> result = repo.call(fateId, environment);
if (result == null) {
return null;
}
@@ -71,10 +72,10 @@ public class TraceRepo<T> implements Repo<T> {
}
@Override
- public void undo(long tid, T environment) throws Exception {
+ public void undo(FateId fateId, T environment) throws Exception {
Span span = TraceUtil.startFateSpan(repo.getClass(), repo.getName(), tinfo);
try (Scope scope = span.makeCurrent()) {
- repo.undo(tid, environment);
+ repo.undo(fateId, environment);
} catch (Exception e) {
TraceUtil.setException(span, e, true);
throw e;
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
index b4d92b3a9d..e5ffc8039d 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
@@ -37,14 +37,13 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.AbstractId;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.DistributedLock;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.fate.zookeeper.FateLock;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooReservation;
-import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.FileSystem;
@@ -90,9 +89,9 @@ public class Utils {
static final Lock tableNameLock = new ReentrantLock();
static final Lock idLock = new ReentrantLock();
- public static long reserveTable(Manager env, TableId tableId, long tid, boolean writeLock,
+ public static long reserveTable(Manager env, TableId tableId, FateId fateId, boolean writeLock,
boolean tableMustExist, TableOperation op) throws Exception {
- if (getLock(env.getContext(), tableId, tid, writeLock).tryLock()) {
+ if (getLock(env.getContext(), tableId, fateId, writeLock).tryLock()) {
if (tableMustExist) {
ZooReaderWriter zk = env.getContext().getZooReaderWriter();
if (!zk.exists(env.getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId)) {
@@ -100,7 +99,7 @@ public class Utils {
TableOperationExceptionType.NOTFOUND, "Table does not exist");
}
}
- log.info("table {} {} locked for {} operation: {}", tableId, FateTxId.formatTid(tid),
+ log.info("table {} {} locked for {} operation: {}", tableId, fateId,
(writeLock ? "write" : "read"), op);
return 0;
} else {
@@ -108,22 +107,22 @@ public class Utils {
}
}
- public static void unreserveTable(Manager env, TableId tableId, long tid, boolean writeLock) {
- getLock(env.getContext(), tableId, tid, writeLock).unlock();
- log.info("table {} {} unlocked for {}", tableId, FateTxId.formatTid(tid),
- (writeLock ? "write" : "read"));
+ public static void unreserveTable(Manager env, TableId tableId, FateId fateId,
+ boolean writeLock) {
+ getLock(env.getContext(), tableId, fateId, writeLock).unlock();
+ log.info("table {} {} unlocked for {}", tableId, fateId, (writeLock ? "write" : "read"));
}
- public static void unreserveNamespace(Manager env, NamespaceId namespaceId, long id,
+ public static void unreserveNamespace(Manager env, NamespaceId namespaceId, FateId fateId,
boolean writeLock) {
- getLock(env.getContext(), namespaceId, id, writeLock).unlock();
- log.info("namespace {} {} unlocked for {}", namespaceId, FateTxId.formatTid(id),
+ getLock(env.getContext(), namespaceId, fateId, writeLock).unlock();
+ log.info("namespace {} {} unlocked for {}", namespaceId, fateId,
(writeLock ? "write" : "read"));
}
- public static long reserveNamespace(Manager env, NamespaceId namespaceId, long id,
+ public static long reserveNamespace(Manager env, NamespaceId namespaceId, FateId fateId,
boolean writeLock, boolean mustExist, TableOperation op) throws Exception {
- if (getLock(env.getContext(), namespaceId, id, writeLock).tryLock()) {
+ if (getLock(env.getContext(), namespaceId, fateId, writeLock).tryLock()) {
if (mustExist) {
ZooReaderWriter zk = env.getContext().getZooReaderWriter();
if (!zk.exists(
@@ -132,7 +131,7 @@ public class Utils {
TableOperationExceptionType.NAMESPACE_NOTFOUND, "Namespace does not exist");
}
}
- log.info("namespace {} {} locked for {} operation: {}", namespaceId, FateTxId.formatTid(id),
+ log.info("namespace {} {} locked for {} operation: {}", namespaceId, fateId,
(writeLock ? "write" : "read"), op);
return 0;
} else {
@@ -140,31 +139,32 @@ public class Utils {
}
}
- public static long reserveHdfsDirectory(Manager env, String directory, long tid)
+ public static long reserveHdfsDirectory(Manager env, String directory, FateId fateId)
throws KeeperException, InterruptedException {
String resvPath = env.getContext().getZooKeeperRoot() + Constants.ZHDFS_RESERVATIONS + "/"
+ Base64.getEncoder().encodeToString(directory.getBytes(UTF_8));
ZooReaderWriter zk = env.getContext().getZooReaderWriter();
- if (ZooReservation.attempt(zk, resvPath, FastFormat.toHexString(tid), "")) {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044 .. should the full FateId be passed below?
+ if (ZooReservation.attempt(zk, resvPath, fateId.getHexTid(), "")) {
return 0;
} else {
return 50;
}
}
- public static void unreserveHdfsDirectory(Manager env, String directory, long tid)
+ public static void unreserveHdfsDirectory(Manager env, String directory, FateId fateId)
throws KeeperException, InterruptedException {
String resvPath = env.getContext().getZooKeeperRoot() + Constants.ZHDFS_RESERVATIONS + "/"
+ Base64.getEncoder().encodeToString(directory.getBytes(UTF_8));
- ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath,
- FastFormat.toHexString(tid));
+ ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath, fateId.getHexTid());
}
- private static Lock getLock(ServerContext context, AbstractId<?> id, long tid,
+ private static Lock getLock(ServerContext context, AbstractId<?> id, FateId fateId,
boolean writeLock) {
- byte[] lockData = FastFormat.toZeroPaddedHex(tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044 ... should lock data use full FateId?
+ byte[] lockData = fateId.getHexTid().getBytes(UTF_8);
var fLockPath =
FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical());
FateLock qlock = new FateLock(context.getZooReaderWriter(), fLockPath);
@@ -175,8 +175,8 @@ public class Utils {
boolean isWriteLock = lock.getType() == LockType.WRITE;
if (writeLock != isWriteLock) {
throw new IllegalStateException("Unexpected lock type " + lock.getType()
- + " recovered for transaction " + FateTxId.formatTid(tid) + " on object " + id
- + ". Expected " + (writeLock ? LockType.WRITE : LockType.READ) + " lock instead.");
+ + " recovered for transaction " + fateId + " on object " + id + ". Expected "
+ + (writeLock ? LockType.WRITE : LockType.READ) + " lock instead.");
}
} else {
DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, lockData);
@@ -197,8 +197,8 @@ public class Utils {
return tableNameLock;
}
- public static Lock getReadLock(Manager env, AbstractId<?> id, long tid) {
- return Utils.getLock(env.getContext(), id, tid, false);
+ public static Lock getReadLock(Manager env, AbstractId<?> id, FateId fateId) {
+ return Utils.getLock(env.getContext(), id, fateId, false);
}
public static void checkNamespaceDoesNotExist(ServerContext context, String namespace,
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java
index 5514f6295e..30cfeb9d8d 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -57,15 +58,15 @@ public class SetTabletAvailability extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
- return Utils.reserveNamespace(manager, namespaceId, tid, false, true,
+ public long isReady(FateId fateId, Manager manager) throws Exception {
+ return Utils.reserveNamespace(manager, namespaceId, fateId, false, true,
TableOperation.SET_TABLET_AVAILABILITY)
- + Utils.reserveTable(manager, tableId, tid, true, true,
+ + Utils.reserveTable(manager, tableId, fateId, true, true,
TableOperation.SET_TABLET_AVAILABILITY);
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
final Range range = new Range(tRange);
LOG.debug("Finding tablets in Range: {} for table:{}", range, tableId);
@@ -115,15 +116,15 @@ public class SetTabletAvailability extends ManagerRepo {
mutator.mutateTablet(tabletExtent).putTabletAvailability(tabletAvailability).mutate();
}
}
- Utils.unreserveNamespace(manager, namespaceId, tid, false);
- Utils.unreserveTable(manager, tableId, tid, true);
+ Utils.unreserveNamespace(manager, namespaceId, fateId, false);
+ Utils.unreserveTable(manager, tableId, fateId, true);
return null;
}
@Override
- public void undo(long tid, Manager manager) throws Exception {
- Utils.unreserveNamespace(manager, namespaceId, tid, false);
- Utils.unreserveTable(manager, tableId, tid, true);
+ public void undo(FateId fateId, Manager manager) throws Exception {
+ Utils.unreserveNamespace(manager, namespaceId, fateId, false);
+ Utils.unreserveTable(manager, tableId, fateId, true);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
index ad293bc666..eb78e48ce8 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
@@ -28,7 +28,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
import org.apache.accumulo.manager.Manager;
@@ -66,13 +66,11 @@ class BulkImportMove extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
final Path bulkDir = new Path(bulkInfo.bulkDir);
final Path sourceDir = new Path(bulkInfo.sourceDir);
- String fmtTid = FateTxId.formatTid(tid);
-
- log.debug("{} sourceDir {}", fmtTid, sourceDir);
+ log.debug("{} sourceDir {}", fateId, sourceDir);
VolumeManager fs = manager.getVolumeManager();
@@ -80,7 +78,7 @@ class BulkImportMove extends ManagerRepo {
manager.updateBulkImportStatus(sourceDir.toString(), BulkImportState.MOVING);
Map<String,String> oldToNewNameMap =
BulkSerialize.readRenameMap(bulkDir.toString(), fs::open);
- moveFiles(tid, sourceDir, bulkDir, manager, fs, oldToNewNameMap);
+ moveFiles(fateId, sourceDir, bulkDir, manager, fs, oldToNewNameMap);
return new LoadFiles(bulkInfo);
} catch (Exception ex) {
@@ -93,14 +91,14 @@ class BulkImportMove extends ManagerRepo {
/**
* For every entry in renames, move the file from the key path to the value path
*/
- private void moveFiles(long tid, Path sourceDir, Path bulkDir, Manager manager,
+ private void moveFiles(FateId fateId, Path sourceDir, Path bulkDir, Manager manager,
final VolumeManager fs, Map<String,String> renames) throws Exception {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
manager.getContext().getAmple().addBulkLoadInProgressFlag(
- "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid);
+ "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), fateId.getTid());
AccumuloConfiguration aConf = manager.getConfiguration();
int workerCount = aConf.getCount(Property.MANAGER_RENAME_THREADS);
Map<Path,Path> oldToNewMap = new HashMap<>();
- String fmtTid = FateTxId.formatTid(tid);
for (Map.Entry<String,String> renameEntry : renames.entrySet()) {
final Path originalPath = new Path(sourceDir, renameEntry.getKey());
@@ -108,7 +106,8 @@ class BulkImportMove extends ManagerRepo {
oldToNewMap.put(originalPath, newPath);
}
try {
- fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fmtTid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId.getHexTid());
} catch (IOException ioe) {
throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null,
TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER,
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index ab78cb3ffb..88ca476ee3 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -30,7 +30,7 @@ import java.util.Optional;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.data.AbstractId;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
@@ -60,10 +60,9 @@ public class CleanUpBulkImport extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
manager.updateBulkImportStatus(info.sourceDir, BulkImportState.CLEANUP);
- log.debug("{} removing the bulkDir processing flag file in {}", FateTxId.formatTid(tid),
- info.bulkDir);
+ log.debug("{} removing the bulkDir processing flag file in {}", fateId, info.bulkDir);
Ample ample = manager.getContext().getAmple();
Path bulkDir = new Path(info.bulkDir);
ample.removeBulkLoadInProgressFlag(
@@ -74,12 +73,12 @@ public class CleanUpBulkImport extends ManagerRepo {
Text firstSplit = info.firstSplit == null ? null : new Text(info.firstSplit);
Text lastSplit = info.lastSplit == null ? null : new Text(info.lastSplit);
- log.debug("{} removing the metadata table markers for loaded files in range {} {}",
- FateTxId.formatTid(tid), firstSplit, lastSplit);
- removeBulkLoadEntries(ample, info.tableId, tid, firstSplit, lastSplit);
+ log.debug("{} removing the metadata table markers for loaded files in range {} {}", fateId,
+ firstSplit, lastSplit);
+ removeBulkLoadEntries(ample, info.tableId, fateId, firstSplit, lastSplit);
- Utils.unreserveHdfsDirectory(manager, info.sourceDir, tid);
- Utils.getReadLock(manager, info.tableId, tid).unlock();
+ Utils.unreserveHdfsDirectory(manager, info.sourceDir, fateId);
+ Utils.getReadLock(manager, info.tableId, fateId).unlock();
// delete json renames and mapping files
Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING);
@@ -87,16 +86,16 @@ public class CleanUpBulkImport extends ManagerRepo {
manager.getVolumeManager().delete(renamingFile);
manager.getVolumeManager().delete(mappingFile);
} catch (IOException ioe) {
- log.debug("{} Failed to delete renames and/or loadmap", FateTxId.formatTid(tid), ioe);
+ log.debug("{} Failed to delete renames and/or loadmap", fateId, ioe);
}
- log.debug("completing bulkDir import transaction " + FateTxId.formatTid(tid));
+ log.debug("completing bulkDir import transaction " + fateId);
manager.removeBulkImportStatus(info.sourceDir);
return null;
}
- private static void removeBulkLoadEntries(Ample ample, TableId tableId, long tid, Text firstSplit,
- Text lastSplit) {
+ private static void removeBulkLoadEntries(Ample ample, TableId tableId, FateId fateId,
+ Text firstSplit, Text lastSplit) {
Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
.incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5)
@@ -109,11 +108,13 @@ public class CleanUpBulkImport extends ManagerRepo {
var tabletsMutator = ample.conditionallyMutateTablets()) {
for (var tablet : tablets) {
- if (tablet.getLoaded().values().stream().anyMatch(l -> l == tid)) {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ if (tablet.getLoaded().values().stream().anyMatch(l -> l == fateId.getTid())) {
var tabletMutator =
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
- tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue() == tid)
- .map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile);
+ tablet.getLoaded().entrySet().stream()
+ .filter(entry -> entry.getValue() == fateId.getTid()).map(Map.Entry::getKey)
+ .forEach(tabletMutator::deleteBulkFile);
tabletMutator.submit(tm -> false);
}
}
@@ -126,16 +127,16 @@ public class CleanUpBulkImport extends ManagerRepo {
results.forEach((extent, condResult) -> {
if (condResult.getStatus() != Status.ACCEPTED) {
var metadata = Optional.ofNullable(condResult.readMetadata());
- log.debug("Tablet update failed {} {} {} {} ", FateTxId.formatTid(tid), extent,
- condResult.getStatus(), metadata.map(TabletMetadata::getOperationId)
- .map(AbstractId::toString).orElse("tablet is gone"));
+ log.debug("Tablet update failed {} {} {} {} ", fateId, extent, condResult.getStatus(),
+ metadata.map(TabletMetadata::getOperationId).map(AbstractId::toString)
+ .orElse("tablet is gone"));
}
});
try {
retry.waitForNextAttempt(log,
String.format("%s tableId:%s conditional mutations to delete load markers failed.",
- FateTxId.formatTid(tid), tableId));
+ fateId, tableId));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 86b8c2b71e..682bbbb354 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -42,7 +42,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
@@ -86,10 +86,10 @@ class LoadFiles extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
if (manager.onlineTabletServers().isEmpty()) {
- log.warn("There are no tablet server to process bulkDir import, waiting (tid = "
- + FateTxId.formatTid(tid) + ")");
+ log.warn("There are no tablet server to process bulkDir import, waiting (fateId = " + fateId
+ + ")");
return 100;
}
VolumeManager fs = manager.getVolumeManager();
@@ -97,30 +97,28 @@ class LoadFiles extends ManagerRepo {
manager.updateBulkImportStatus(bulkInfo.sourceDir, BulkImportState.LOADING);
try (LoadMappingIterator lmi =
BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) {
- return loadFiles(bulkInfo.tableId, bulkDir, lmi, manager, tid);
+ return loadFiles(bulkInfo.tableId, bulkDir, lmi, manager, fateId);
}
}
@Override
- public Repo<Manager> call(final long tid, final Manager manager) {
+ public Repo<Manager> call(final FateId fateId, final Manager manager) {
return new RefreshTablets(bulkInfo);
}
private static class Loader {
protected Path bulkDir;
protected Manager manager;
- protected long tid;
- private String logId;
+ protected FateId fateId;
protected boolean setTime;
Ample.ConditionalTabletsMutator conditionalMutator;
private long skipped = 0;
- void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception {
+ void start(Path bulkDir, Manager manager, FateId fateId, boolean setTime) throws Exception {
this.bulkDir = bulkDir;
this.manager = manager;
- this.tid = tid;
- this.logId = FateTxId.formatTid(tid);
+ this.fateId = fateId;
this.setTime = setTime;
conditionalMutator = manager.getContext().getAmple().conditionallyMutateTablets();
this.skipped = 0;
@@ -145,7 +143,7 @@ class LoadFiles extends ManagerRepo {
if (setTime) {
hostedTimestamps = allocateTimestamps(tablets, toLoad.size());
hostedTimestamps.forEach((e, t) -> {
- log.trace("{} allocated timestamp {} {}", logId, e, t);
+ log.trace("{} allocated timestamp {} {}", fateId, e, t);
});
} else {
hostedTimestamps = Map.of();
@@ -155,7 +153,7 @@ class LoadFiles extends ManagerRepo {
if (setTime && tablet.getLocation() != null
&& !hostedTimestamps.containsKey(tablet.getExtent())) {
skipped++;
- log.debug("{} tablet {} did not have a timestamp allocated, will retry later", logId,
+ log.debug("{} tablet {} did not have a timestamp allocated, will retry later", fateId,
tablet.getExtent());
continue;
}
@@ -199,7 +197,8 @@ class LoadFiles extends ManagerRepo {
.requireAbsentOperation().requireSame(tablet, LOADED, TIME, LOCATION);
filesToLoad.forEach((f, v) -> {
- tabletMutator.putBulkFile(f, tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ tabletMutator.putBulkFile(f, fateId.getTid());
tabletMutator.putFile(f, v);
});
@@ -244,7 +243,7 @@ class LoadFiles extends ManagerRepo {
var context = manager.getContext();
try {
- log.trace("{} sending allocate timestamps request to {} for {} extents", logId, server,
+ log.trace("{} sending allocate timestamps request to {} for {} extents", fateId, server,
extents.size());
var timeInMillis =
context.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
@@ -254,14 +253,14 @@ class LoadFiles extends ManagerRepo {
var timestamps = client.allocateTimestamps(TraceUtil.traceInfo(), context.rpcCreds(),
extents, numStamps);
- log.trace("{} allocate timestamps request to {} returned {} timestamps", logId, server,
+ log.trace("{} allocate timestamps request to {} returned {} timestamps", fateId, server,
timestamps.size());
var converted = new HashMap<KeyExtent,Long>();
timestamps.forEach((k, v) -> converted.put(KeyExtent.fromThrift(k), v));
return converted;
} catch (TException ex) {
- log.debug("rpc failed server: " + server + ", " + logId + " " + ex.getMessage(), ex);
+ log.debug("rpc failed server: " + server + ", " + fateId + " " + ex.getMessage(), ex);
// return an empty map, should retry later
return Map.of();
} finally {
@@ -285,10 +284,10 @@ class LoadFiles extends ManagerRepo {
if (condResult.getStatus() != Status.ACCEPTED) {
var metadata = condResult.readMetadata();
if (metadata == null) {
- log.debug("Tablet update failed, tablet is gone {} {} {}", logId, extent,
+ log.debug("Tablet update failed, tablet is gone {} {} {}", fateId, extent,
condResult.getStatus());
} else {
- log.debug("Tablet update failed {} {} {} {} {} {}", logId, extent,
+ log.debug("Tablet update failed {} {} {} {} {} {}", fateId, extent,
condResult.getStatus(), metadata.getOperationId(), metadata.getLocation(),
metadata.getLoaded());
}
@@ -307,7 +306,7 @@ class LoadFiles extends ManagerRepo {
* all files have been loaded.
*/
private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMapIter,
- Manager manager, long tid) throws Exception {
+ Manager manager, FateId fateId) throws Exception {
PeekingIterator<Map.Entry<KeyExtent,Bulk.Files>> lmi = new PeekingIterator<>(loadMapIter);
Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.peek();
@@ -315,7 +314,7 @@ class LoadFiles extends ManagerRepo {
Loader loader = new Loader();
long t1;
- loader.start(bulkDir, manager, tid, bulkInfo.setTime);
+ loader.start(bulkDir, manager, fateId, bulkInfo.setTime);
try (TabletsMetadata tabletsMetadata =
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null)
.checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, TIME).build()) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
index 26ff1681e0..c2ae6f9cd7 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
@@ -43,7 +43,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.file.FilePrefix;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -89,8 +89,8 @@ public class PrepBulkImport extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
- if (!Utils.getReadLock(manager, bulkInfo.tableId, tid).tryLock()) {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
+ if (!Utils.getReadLock(manager, bulkInfo.tableId, fateId).tryLock()) {
return 100;
}
@@ -98,7 +98,7 @@ public class PrepBulkImport extends ManagerRepo {
return 500;
}
- return Utils.reserveHdfsDirectory(manager, bulkInfo.sourceDir, tid);
+ return Utils.reserveHdfsDirectory(manager, bulkInfo.sourceDir, fateId);
}
@VisibleForTesting
@@ -116,7 +116,7 @@ public class PrepBulkImport extends ManagerRepo {
*/
@VisibleForTesting
static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi,
- TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws Exception {
+ TabletIterFactory tabletIterFactory, int maxNumTablets) throws Exception {
var currRange = lmi.next();
Text startRow = currRange.getKey().prevEndRow();
@@ -221,7 +221,7 @@ public class PrepBulkImport extends ManagerRepo {
}
}
- private KeyExtent checkForMerge(final long tid, final Manager manager) throws Exception {
+ private KeyExtent checkForMerge(final Manager manager) throws Exception {
VolumeManager fs = manager.getVolumeManager();
final Path bulkDir = new Path(bulkInfo.sourceDir);
@@ -233,22 +233,21 @@ public class PrepBulkImport extends ManagerRepo {
LoadMappingIterator lmi =
BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open);
TabletIterFactory tabletIterFactory = new TabletIterFactoryImpl(manager, bulkInfo)) {
- return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets,
- tid);
+ return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets);
}
}
@Override
- public Repo<Manager> call(final long tid, final Manager manager) throws Exception {
+ public Repo<Manager> call(final FateId fateId, final Manager manager) throws Exception {
// now that table lock is acquired check that all splits in load mapping exists in table
- KeyExtent tabletsRange = checkForMerge(tid, manager);
+ KeyExtent tabletsRange = checkForMerge(manager);
bulkInfo.firstSplit =
Optional.ofNullable(tabletsRange.prevEndRow()).map(Text::getBytes).orElse(null);
bulkInfo.lastSplit =
Optional.ofNullable(tabletsRange.endRow()).map(Text::getBytes).orElse(null);
- log.trace("{} first split:{} last split:{}", FateTxId.formatTid(tid), tabletsRange.prevEndRow(),
+ log.trace("{} first split:{} last split:{}", fateId, tabletsRange.prevEndRow(),
tabletsRange.endRow());
VolumeManager fs = manager.getVolumeManager();
@@ -302,9 +301,9 @@ public class PrepBulkImport extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) throws Exception {
+ public void undo(FateId fateId, Manager environment) throws Exception {
// unreserve sourceDir/error directories
- Utils.unreserveHdfsDirectory(environment, bulkInfo.sourceDir, tid);
- Utils.getReadLock(environment, bulkInfo.tableId, tid).unlock();
+ Utils.unreserveHdfsDirectory(environment, bulkInfo.sourceDir, fateId);
+ Utils.getReadLock(environment, bulkInfo.tableId, fateId).unlock();
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
index de7c296188..c01e50114f 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.tableOps.bulkVer2;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -44,16 +45,17 @@ public class RefreshTablets extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
- TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, tid,
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(),
bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit,
- tabletMetadata -> tabletMetadata.getLoaded().containsValue(tid));
+ tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId.getTid()));
return new CleanUpBulkImport(bulkInfo);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java
index bd3a836e27..4e10c777b5 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.tableOps.clone;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -34,12 +35,12 @@ class CloneMetadata extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
LoggerFactory.getLogger(CloneMetadata.class)
.info(String.format("Cloning %s with tableId %s from srcTableId %s", cloneInfo.tableName,
cloneInfo.tableId, cloneInfo.srcTableId));
@@ -52,7 +53,7 @@ class CloneMetadata extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) throws Exception {
+ public void undo(FateId fateId, Manager environment) throws Exception {
MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment.getContext(),
environment.getManagerLock());
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/ClonePermissions.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/ClonePermissions.java
index 7ac8d6fd0b..3b3a9bbded 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/ClonePermissions.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/ClonePermissions.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationExcepti
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.manager.Manager;
@@ -40,12 +41,12 @@ class ClonePermissions extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
// give all table permissions to the creator
for (TablePermission permission : TablePermission.values()) {
try {
@@ -71,7 +72,7 @@ class ClonePermissions extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) throws Exception {
+ public void undo(FateId fateId, Manager environment) throws Exception {
environment.getContext().getSecurityOperation().deleteTable(environment.getContext().rpcCreds(),
cloneInfo.tableId, cloneInfo.namespaceId);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneTable.java
index 352f3d1b89..d7bad0955c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneTable.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -47,16 +48,16 @@ public class CloneTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) throws Exception {
- long val = Utils.reserveNamespace(environment, cloneInfo.srcNamespaceId, tid, false, true,
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ long val = Utils.reserveNamespace(environment, cloneInfo.srcNamespaceId, fateId, false, true,
TableOperation.CLONE);
- val += Utils.reserveTable(environment, cloneInfo.srcTableId, tid, false, true,
+ val += Utils.reserveTable(environment, cloneInfo.srcTableId, fateId, false, true,
TableOperation.CLONE);
return val;
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
Utils.getIdLock().lock();
try {
@@ -70,9 +71,9 @@ public class CloneTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) {
- Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, tid, false);
- Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, false);
+ public void undo(FateId fateId, Manager environment) {
+ Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, fateId, false);
+ Utils.unreserveTable(environment, cloneInfo.srcTableId, fateId, false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
index cfc10d015c..9ce4e76243 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
@@ -22,6 +22,7 @@ import org.apache.accumulo.core.client.NamespaceNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Namespaces;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.util.tables.TableNameUtil;
import org.apache.accumulo.manager.Manager;
@@ -42,19 +43,19 @@ class CloneZookeeper extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) throws Exception {
+ public long isReady(FateId fateId, Manager environment) throws Exception {
long val = 0;
if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) {
- val += Utils.reserveNamespace(environment, cloneInfo.namespaceId, tid, false, true,
+ val += Utils.reserveNamespace(environment, cloneInfo.namespaceId, fateId, false, true,
TableOperation.CLONE);
}
- val +=
- Utils.reserveTable(environment, cloneInfo.tableId, tid, true, false, TableOperation.CLONE);
+ val += Utils.reserveTable(environment, cloneInfo.tableId, fateId, true, false,
+ TableOperation.CLONE);
return val;
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
Utils.getTableNameLock().lock();
try {
// write tableName & tableId to zookeeper
@@ -74,12 +75,12 @@ class CloneZookeeper extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) throws Exception {
+ public void undo(FateId fateId, Manager environment) throws Exception {
environment.getTableManager().removeTable(cloneInfo.tableId);
if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) {
- Utils.unreserveNamespace(environment, cloneInfo.namespaceId, tid, false);
+ Utils.unreserveNamespace(environment, cloneInfo.namespaceId, fateId, false);
}
- Utils.unreserveTable(environment, cloneInfo.tableId, tid, true);
+ Utils.unreserveTable(environment, cloneInfo.tableId, fateId, true);
environment.getContext().clearTableListCache();
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java
index b406d43243..fc3b4706cf 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.clone;
import java.util.EnumSet;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
@@ -37,12 +38,12 @@ class FinishCloneTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager environment) {
+ public Repo<Manager> call(FateId fateId, Manager environment) {
// directories are intentionally not created.... this is done because directories should be
// unique
// because they occupy a different namespace than normal tablet directories... also some clones
@@ -59,12 +60,12 @@ class FinishCloneTable extends ManagerRepo {
environment.getTableManager().transitionTableState(cloneInfo.tableId, ts, expectedCurrStates);
}
- Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, tid, false);
+ Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, fateId, false);
if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) {
- Utils.unreserveNamespace(environment, cloneInfo.namespaceId, tid, false);
+ Utils.unreserveNamespace(environment, cloneInfo.namespaceId, fateId, false);
}
- Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, false);
- Utils.unreserveTable(environment, cloneInfo.tableId, tid, true);
+ Utils.unreserveTable(environment, cloneInfo.srcTableId, fateId, false);
+ Utils.unreserveTable(environment, cloneInfo.tableId, fateId, true);
environment.getEventCoordinator().event(cloneInfo.tableId, "Cloned table %s from %s",
cloneInfo.tableName, cloneInfo.srcTableId);
@@ -76,6 +77,6 @@ class FinishCloneTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) {}
+ public void undo(FateId fateId, Manager environment) {}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java
index e1c6e1f504..0c263605b2 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java
@@ -27,7 +27,7 @@ import java.util.function.Consumer;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
@@ -58,16 +58,14 @@ public class CleanUp extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
var ample = manager.getContext().getAmple();
- var fateStr = FateTxId.formatTid(tid);
-
AtomicLong rejectedCount = new AtomicLong(0);
Consumer<Ample.ConditionalResult> resultConsumer = result -> {
if (result.getStatus() == Status.REJECTED) {
- log.debug("{} update for {} was rejected ", fateStr, result.getExtent());
+ log.debug("{} update for {} was rejected ", fateId, result.getExtent());
rejectedCount.incrementAndGet();
}
};
@@ -82,10 +80,11 @@ public class CleanUp extends ManagerRepo {
t1 = System.nanoTime();
for (TabletMetadata tablet : tablets) {
total++;
- if (tablet.getCompacted().contains(tid)) {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ if (tablet.getCompacted().contains(fateId.getTid())) {
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
- .requireSame(tablet, COMPACTED).deleteCompacted(tid)
- .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(tid));
+ .requireSame(tablet, COMPACTED).deleteCompacted(fateId.getTid())
+ .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(fateId.getTid()));
submitted++;
}
}
@@ -95,7 +94,7 @@ public class CleanUp extends ManagerRepo {
long scanTime = Duration.ofNanos(t2 - t1).toMillis();
- log.debug("{} removed {} of {} compacted markers for {} tablets in {}ms", fateStr,
+ log.debug("{} removed {} of {} compacted markers for {} tablets in {}ms", fateId,
submitted - rejectedCount.get(), submitted, total, scanTime);
if (rejectedCount.get() > 0) {
@@ -108,10 +107,11 @@ public class CleanUp extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
- CompactionConfigStorage.deleteConfig(manager.getContext(), tid);
- Utils.getReadLock(manager, tableId, tid).unlock();
- Utils.getReadLock(manager, namespaceId, tid).unlock();
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ CompactionConfigStorage.deleteConfig(manager.getContext(), fateId.getTid());
+ Utils.getReadLock(manager, tableId, fateId).unlock();
+ Utils.getReadLock(manager, namespaceId, fateId).unlock();
return null;
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java
index 5edeadab79..f4cb4dbe42 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.manager.Manager;
@@ -72,24 +73,26 @@ public class CompactRange extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager env) throws Exception {
- return Utils.reserveNamespace(env, namespaceId, tid, false, true, TableOperation.COMPACT)
- + Utils.reserveTable(env, tableId, tid, false, true, TableOperation.COMPACT);
+ public long isReady(FateId fateId, Manager env) throws Exception {
+ return Utils.reserveNamespace(env, namespaceId, fateId, false, true, TableOperation.COMPACT)
+ + Utils.reserveTable(env, tableId, fateId, false, true, TableOperation.COMPACT);
}
@Override
- public Repo<Manager> call(final long tid, Manager env) throws Exception {
- CompactionConfigStorage.setConfig(env.getContext(), tid, config);
+ public Repo<Manager> call(final FateId fateId, Manager env) throws Exception {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ CompactionConfigStorage.setConfig(env.getContext(), fateId.getTid(), config);
return new CompactionDriver(namespaceId, tableId, startRow, endRow);
}
@Override
- public void undo(long tid, Manager env) throws Exception {
+ public void undo(FateId fateId, Manager env) throws Exception {
try {
- CompactionConfigStorage.deleteConfig(env.getContext(), tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ CompactionConfigStorage.deleteConfig(env.getContext(), fateId.getTid());
} finally {
- Utils.unreserveNamespace(env, namespaceId, tid, false);
- Utils.unreserveTable(env, tableId, tid, false);
+ Utils.unreserveNamespace(env, namespaceId, fateId, false);
+ Utils.unreserveTable(env, tableId, fateId, false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 9e0e627392..fa7e4d31e0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
@@ -84,7 +85,7 @@ class CompactionDriver extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
if (tableId.equals(AccumuloTable.ROOT.tableId())) {
// this codes not properly handle the root table. See #798
@@ -93,7 +94,7 @@ class CompactionDriver extends ManagerRepo {
ZooReaderWriter zoo = manager.getContext().getZooReaderWriter();
- if (isCancelled(tid, manager.getContext())) {
+ if (isCancelled(fateId, manager.getContext())) {
// compaction was canceled
throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
TableOperation.COMPACT, TableOperationExceptionType.OTHER,
@@ -111,7 +112,7 @@ class CompactionDriver extends ManagerRepo {
long t1 = System.currentTimeMillis();
- int tabletsToWaitFor = updateAndCheckTablets(manager, tid);
+ int tabletsToWaitFor = updateAndCheckTablets(manager, fateId);
long scanTime = System.currentTimeMillis() - t1;
@@ -128,23 +129,22 @@ class CompactionDriver extends ManagerRepo {
return sleepTime;
}
- private boolean isCancelled(long tid, ServerContext context)
+ private boolean isCancelled(FateId fateId, ServerContext context)
throws InterruptedException, KeeperException {
- return CompactionConfigStorage.getConfig(context, tid) == null;
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ return CompactionConfigStorage.getConfig(context, fateId.getTid()) == null;
}
- public int updateAndCheckTablets(Manager manager, long tid)
+ public int updateAndCheckTablets(Manager manager, FateId fateId)
throws AcceptableThriftTableOperationException {
var ample = manager.getContext().getAmple();
// ELASTICITY_TODO use existing compaction logging
- var fateStr = FateTxId.formatTid(tid);
-
Consumer<Ample.ConditionalResult> resultConsumer = result -> {
if (result.getStatus() == Status.REJECTED) {
- log.debug("{} update for {} was rejected ", fateStr, result.getExtent());
+ log.debug("{} update for {} was rejected ", fateId, result.getExtent());
}
};
@@ -168,31 +168,35 @@ class CompactionDriver extends ManagerRepo {
.fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build();
var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) {
- CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ CompactionConfig config =
+ CompactionConfigStorage.getConfig(manager.getContext(), fateId.getTid());
for (TabletMetadata tablet : tablets) {
total++;
- if (tablet.getCompacted().contains(tid)) {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ if (tablet.getCompacted().contains(fateId.getTid())) {
// this tablet is already considered done
- log.trace("{} compaction for {} is complete", fateStr, tablet.getExtent());
+ log.trace("{} compaction for {} is complete", fateId, tablet.getExtent());
complete++;
} else if (tablet.getOperationId() != null) {
- log.trace("{} ignoring tablet {} with active operation {} ", fateStr, tablet.getExtent(),
+ log.trace("{} ignoring tablet {} with active operation {} ", fateId, tablet.getExtent(),
tablet.getOperationId());
opidsSeen++;
} else if (tablet.getFiles().isEmpty()) {
- log.trace("{} tablet {} has no files, attempting to mark as compacted ", fateStr,
+ log.trace("{} tablet {} has no files, attempting to mark as compacted ", fateId,
tablet.getExtent());
// this tablet has no files try to mark it as done
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
- .requireSame(tablet, FILES, COMPACTED).putCompacted(tid)
- .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid));
+ .requireSame(tablet, FILES, COMPACTED).putCompacted(fateId.getTid())
+ .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId.getTid()));
noFiles++;
} else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) {
// there are no selected files
- log.trace("{} selecting {} files compaction for {}", fateStr, tablet.getFiles().size(),
+ log.trace("{} selecting {} files compaction for {}", fateId, tablet.getFiles().size(),
tablet.getExtent());
Set<StoredTabletFile> filesToCompact;
@@ -200,7 +204,7 @@ class CompactionDriver extends ManagerRepo {
filesToCompact = CompactionPluginUtils.selectFiles(manager.getContext(),
tablet.getExtent(), config, tablet.getFilesMap());
} catch (Exception e) {
- log.warn("{} failed to select files for {} using {}", fateStr, tablet.getExtent(),
+ log.warn("{} failed to select files for {} using {}", fateId, tablet.getExtent(),
config.getSelector(), e);
throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
TableOperation.COMPACT, TableOperationExceptionType.OTHER,
@@ -208,7 +212,7 @@ class CompactionDriver extends ManagerRepo {
}
if (log.isTraceEnabled()) {
- log.trace("{} selected {} of {} files for {}", fateStr,
+ log.trace("{} selected {} of {} files for {}", fateId,
filesToCompact.stream().map(AbstractTabletFile::getFileName)
.collect(Collectors.toList()),
tablet.getFiles().stream().map(AbstractTabletFile::getFileName)
@@ -217,16 +221,19 @@ class CompactionDriver extends ManagerRepo {
}
if (filesToCompact.isEmpty()) {
// no files were selected so mark the tablet as compacted
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
- .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(tid)
- .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid));
+ .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED)
+ .putCompacted(fateId.getTid())
+ .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId.getTid()));
noneSelected++;
} else {
var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED);
- var selectedFiles =
- new SelectedFiles(filesToCompact, tablet.getFiles().equals(filesToCompact), tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var selectedFiles = new SelectedFiles(filesToCompact,
+ tablet.getFiles().equals(filesToCompact), fateId.getTid());
mutator.putSelectedFiles(selectedFiles);
@@ -246,15 +253,16 @@ class CompactionDriver extends ManagerRepo {
}
} else if (tablet.getSelectedFiles() != null) {
- if (tablet.getSelectedFiles().getFateTxId() == tid) {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ if (tablet.getSelectedFiles().getFateTxId() == fateId.getTid()) {
log.trace(
"{} tablet {} already has {} selected files for this compaction, waiting for them be processed",
- fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size());
+ fateId, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size());
alreadySelected++;
} else {
log.trace(
"{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed",
- fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(),
+ fateId, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(),
FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId()));
otherSelected++;
}
@@ -272,13 +280,13 @@ class CompactionDriver extends ManagerRepo {
log.debug("{} tablet stats, total:{} complete:{} selected_now:{} selected_prev:{}"
+ " selected_by_other:{} no_files:{} none_selected:{} other_compaction:{} opids:{} scan_update_time:{}ms",
- fateStr, total, complete, selected, alreadySelected, otherSelected, noFiles, noneSelected,
+ fateId, total, complete, selected, alreadySelected, otherSelected, noFiles, noneSelected,
otherCompaction, opidsSeen, t2 - t1);
if (selected > 0) {
manager.getEventCoordinator().event(
new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()),
- "%s selected files for compaction for %d tablets", fateStr, selected);
+ "%s selected files for compaction for %d tablets", fateId, selected);
}
return total - complete;
@@ -287,24 +295,25 @@ class CompactionDriver extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
return new RefreshTablets(tableId, namespaceId, startRow, endRow);
}
@Override
- public void undo(long tid, Manager env) throws Exception {
- cleanupTabletMetadata(tid, env);
+ public void undo(FateId fateId, Manager env) throws Exception {
+ cleanupTabletMetadata(fateId, env);
// For any compactions that may have happened before this operation failed, attempt to refresh
// tablets.
- TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, tid, tableId, startRow,
- endRow, tabletMetadata -> true);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId.getTid(), tableId,
+ startRow, endRow, tabletMetadata -> true);
}
/**
* Cleans up any tablet metadata that may have been added as part of this compaction operation.
*/
- private void cleanupTabletMetadata(long tid, Manager manager) throws Exception {
+ private void cleanupTabletMetadata(FateId fateId, Manager manager) throws Exception {
var ample = manager.getContext().getAmple();
// ELASTICITY_TODO use existing compaction logging
@@ -315,14 +324,12 @@ class CompactionDriver extends ManagerRepo {
.incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5)
.logInterval(3, MINUTES).createRetry();
- var fateStr = FateTxId.formatTid(tid);
-
while (!allCleanedUp) {
AtomicLong rejectedCount = new AtomicLong(0);
Consumer<Ample.ConditionalResult> resultConsumer = result -> {
if (result.getStatus() == Status.REJECTED) {
- log.debug("{} update for {} was rejected ", fateStr, result.getExtent());
+ log.debug("{} update for {} was rejected ", fateId, result.getExtent());
rejectedCount.incrementAndGet();
}
};
@@ -331,10 +338,11 @@ class CompactionDriver extends ManagerRepo {
var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow)
.fetch(PREV_ROW, COMPACTED, SELECTED).checkConsistency().build();
var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
Predicate<TabletMetadata> needsUpdate =
tabletMetadata -> (tabletMetadata.getSelectedFiles() != null
- && tabletMetadata.getSelectedFiles().getFateTxId() == tid)
- || tabletMetadata.getCompacted().contains(tid);
+ && tabletMetadata.getSelectedFiles().getFateTxId() == fateId.getTid())
+ || tabletMetadata.getCompacted().contains(fateId.getTid());
Predicate<TabletMetadata> needsNoUpdate = needsUpdate.negate();
for (TabletMetadata tablet : tablets) {
@@ -342,13 +350,15 @@ class CompactionDriver extends ManagerRepo {
if (needsUpdate.test(tablet)) {
var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, COMPACTED, SELECTED);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
if (tablet.getSelectedFiles() != null
- && tablet.getSelectedFiles().getFateTxId() == tid) {
+ && tablet.getSelectedFiles().getFateTxId() == fateId.getTid()) {
mutator.deleteSelectedFiles();
}
- if (tablet.getCompacted().contains(tid)) {
- mutator.deleteCompacted(tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ if (tablet.getCompacted().contains(fateId.getTid())) {
+ mutator.deleteCompacted(fateId.getTid());
}
mutator.submit(needsNoUpdate::test);
@@ -359,7 +369,7 @@ class CompactionDriver extends ManagerRepo {
allCleanedUp = rejectedCount.get() == 0;
if (!allCleanedUp) {
- retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction " + fateStr);
+ retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction " + fateId);
}
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
index cccfad771c..aca1242276 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.compact;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -43,9 +44,10 @@ public class RefreshTablets extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
- TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, tid, tableId,
- startRow, endRow, tabletMetadata -> true);
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(),
+ tableId, startRow, endRow, tabletMetadata -> true);
return new CleanUp(tableId, namespaceId, startRow, endRow);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java
index 2249b9c511..d156545b8a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.compact.cancel;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
@@ -44,29 +45,29 @@ public class CancelCompactions extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager env) throws Exception {
- return Utils.reserveNamespace(env, namespaceId, tid, false, true, TableOperation.COMPACT_CANCEL)
- + Utils.reserveTable(env, tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
+ public long isReady(FateId fateId, Manager env) throws Exception {
+ return Utils.reserveNamespace(env, namespaceId, fateId, false, true,
+ TableOperation.COMPACT_CANCEL)
+ + Utils.reserveTable(env, tableId, fateId, false, true, TableOperation.COMPACT_CANCEL);
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
-
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
var idsToCancel =
CompactionConfigStorage.getAllConfig(environment.getContext(), tableId::equals).keySet();
for (var idToCancel : idsToCancel) {
- log.debug("{} deleting compaction config {}", FateTxId.formatTid(tid),
- FateTxId.formatTid(idToCancel));
+ log.debug("{} deleting compaction config {}", fateId, FateTxId.formatTid(idToCancel));
CompactionConfigStorage.deleteConfig(environment.getContext(), idToCancel);
}
return new FinishCancelCompaction(namespaceId, tableId);
}
@Override
- public void undo(long tid, Manager env) {
- Utils.unreserveTable(env, tableId, tid, false);
- Utils.unreserveNamespace(env, namespaceId, tid, false);
+ public void undo(FateId fateId, Manager env) {
+ Utils.unreserveTable(env, tableId, fateId, false);
+ Utils.unreserveNamespace(env, namespaceId, fateId, false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/FinishCancelCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/FinishCancelCompaction.java
index 70fa526221..3b25add6ae 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/FinishCancelCompaction.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/FinishCancelCompaction.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.compact.cancel;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -36,14 +37,14 @@ class FinishCancelCompaction extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager environment) {
- Utils.unreserveTable(environment, tableId, tid, false);
- Utils.unreserveNamespace(environment, namespaceId, tid, false);
+ public Repo<Manager> call(FateId fateId, Manager environment) {
+ Utils.unreserveTable(environment, tableId, fateId, false);
+ Utils.unreserveNamespace(environment, namespaceId, fateId, false);
return null;
}
@Override
- public void undo(long tid, Manager environment) {
+ public void undo(FateId fateId, Manager environment) {
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java
index 4f40ab2c6a..6532cb30fe 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java
@@ -25,6 +25,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -49,12 +50,12 @@ class ChooseDir extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
if (tableInfo.getInitialSplitSize() > 0) {
createTableDirectoriesInfo(manager);
}
@@ -62,7 +63,7 @@ class ChooseDir extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager manager) throws Exception {
+ public void undo(FateId fateId, Manager manager) throws Exception {
// Clean up split files if ChooseDir operation fails
Path p = null;
try {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java
index 2b224b7ad2..6d13d40f68 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -60,14 +61,14 @@ public class CreateTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) throws Exception {
+ public long isReady(FateId fateId, Manager environment) throws Exception {
// reserve the table's namespace to make sure it doesn't change while the table is created
- return Utils.reserveNamespace(environment, tableInfo.getNamespaceId(), tid, false, true,
+ return Utils.reserveNamespace(environment, tableInfo.getNamespaceId(), fateId, false, true,
TableOperation.CREATE);
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
// first step is to reserve a table id.. if the machine fails during this step
// it is ok to retry... the only side effect is that a table id may not be used
// or skipped
@@ -85,7 +86,7 @@ public class CreateTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) throws IOException {
+ public void undo(FateId fateId, Manager env) throws IOException {
// Clean up split files if create table operation fails
Path p = null;
try {
@@ -97,7 +98,7 @@ public class CreateTable extends ManagerRepo {
} catch (IOException e) {
log.error("Table failed to be created and failed to clean up split files at {}", p, e);
} finally {
- Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, false);
+ Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), fateId, false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
index eddd8bad6c..f0bff38e5f 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.EnumSet;
import org.apache.accumulo.core.client.admin.InitialTableState;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
@@ -45,12 +46,12 @@ class FinishCreateTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
final EnumSet<TableState> expectedCurrStates = EnumSet.of(TableState.NEW);
if (tableInfo.getInitialTableState() == InitialTableState.OFFLINE) {
@@ -61,8 +62,8 @@ class FinishCreateTable extends ManagerRepo {
TableState.ONLINE, expectedCurrStates);
}
- Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, false);
- Utils.unreserveTable(env, tableInfo.getTableId(), tid, true);
+ Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), fateId, false);
+ Utils.unreserveTable(env, tableInfo.getTableId(), fateId, true);
env.getEventCoordinator().event(tableInfo.getTableId(), "Created table %s ",
tableInfo.getTableName());
@@ -92,6 +93,6 @@ class FinishCreateTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) {}
+ public void undo(FateId fateId, Manager env) {}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
index 957df6cb17..dd6a0986f6 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
@@ -26,6 +26,7 @@ import java.util.TreeSet;
import java.util.stream.Stream;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
@@ -51,12 +52,12 @@ class PopulateMetadata extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
SortedSet<Text> splits;
Map<Text,Text> splitDirMap;
@@ -100,7 +101,7 @@ class PopulateMetadata extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) throws Exception {
+ public void undo(FateId fateId, Manager environment) throws Exception {
MetadataTableUtil.deleteTable(tableInfo.getTableId(), false, environment.getContext(),
environment.getManagerLock());
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateZookeeper.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateZookeeper.java
index 40ad2d276d..a9a0048817 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateZookeeper.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateZookeeper.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.create;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -40,13 +41,13 @@ class PopulateZookeeper extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) throws Exception {
- return Utils.reserveTable(environment, tableInfo.getTableId(), tid, true, false,
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ return Utils.reserveTable(environment, tableInfo.getTableId(), fateId, true, false,
TableOperation.CREATE);
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
// reserve the table name in zookeeper or fail
Utils.getTableNameLock().lock();
@@ -77,9 +78,9 @@ class PopulateZookeeper extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager manager) throws Exception {
+ public void undo(FateId fateId, Manager manager) throws Exception {
manager.getTableManager().removeTable(tableInfo.getTableId());
- Utils.unreserveTable(manager, tableInfo.getTableId(), tid, true);
+ Utils.unreserveTable(manager, tableInfo.getTableId(), fateId, true);
manager.getContext().clearTableListCache();
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/SetupPermissions.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/SetupPermissions.java
index 5b840c16ed..80571367a4 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/SetupPermissions.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/SetupPermissions.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.create;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.manager.Manager;
@@ -38,7 +39,7 @@ class SetupPermissions extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
// give all table permissions to the creator
SecurityOperation security = env.getContext().getSecurityOperation();
if (!tableInfo.getUser().equals(env.getContext().getCredentials().getPrincipal())) {
@@ -60,7 +61,7 @@ class SetupPermissions extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) throws Exception {
+ public void undo(FateId fateId, Manager env) throws Exception {
env.getContext().getSecurityOperation().deleteTable(env.getContext().rpcCreds(),
tableInfo.getTableId(), tableInfo.getNamespaceId());
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
index d8c722d953..be197d7519 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.metadata.AccumuloTable;
@@ -78,7 +79,7 @@ class CleanUp extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
// ELASTICITY_TODO investigate this, what is it for and is it still needed?
if (!manager.hasCycled(creationTime)) {
return 50;
@@ -88,7 +89,7 @@ class CleanUp extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) {
+ public Repo<Manager> call(FateId fateId, Manager manager) {
manager.clearMigrations(tableId);
@@ -177,8 +178,8 @@ class CleanUp extends ManagerRepo {
log.error("{}", e.getMessage(), e);
}
- Utils.unreserveTable(manager, tableId, tid, true);
- Utils.unreserveNamespace(manager, namespaceId, tid, false);
+ Utils.unreserveTable(manager, tableId, fateId, true);
+ Utils.unreserveNamespace(manager, namespaceId, fateId, false);
LoggerFactory.getLogger(CleanUp.class).debug("Deleted table " + tableId);
@@ -186,7 +187,7 @@ class CleanUp extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) {
+ public void undo(FateId fateId, Manager environment) {
// nothing to do
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
index 2e523483c1..e7006d56b0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
@@ -42,13 +43,13 @@ public class DeleteTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager env) throws Exception {
- return Utils.reserveNamespace(env, namespaceId, tid, false, false, TableOperation.DELETE)
- + Utils.reserveTable(env, tableId, tid, true, true, TableOperation.DELETE);
+ public long isReady(FateId fateId, Manager env) throws Exception {
+ return Utils.reserveNamespace(env, namespaceId, fateId, false, false, TableOperation.DELETE)
+ + Utils.reserveTable(env, tableId, fateId, true, true, TableOperation.DELETE);
}
@Override
- public Repo<Manager> call(long tid, Manager env) {
+ public Repo<Manager> call(FateId fateId, Manager env) {
final EnumSet<TableState> expectedCurrStates =
EnumSet.of(TableState.ONLINE, TableState.OFFLINE);
env.getTableManager().transitionTableState(tableId, TableState.DELETING, expectedCurrStates);
@@ -57,8 +58,8 @@ public class DeleteTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) {
- Utils.unreserveTable(env, tableId, tid, true);
- Utils.unreserveNamespace(env, namespaceId, tid, false);
+ public void undo(FateId fateId, Manager env) {
+ Utils.unreserveTable(env, tableId, fateId, true);
+ Utils.unreserveNamespace(env, namespaceId, fateId, false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
index 41ce26dcbb..be7cef2ee6 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -50,9 +51,9 @@ public class PreDeleteTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager env) throws Exception {
- return Utils.reserveNamespace(env, namespaceId, tid, false, true, TableOperation.DELETE)
- + Utils.reserveTable(env, tableId, tid, false, true, TableOperation.DELETE);
+ public long isReady(FateId fateId, Manager env) throws Exception {
+ return Utils.reserveNamespace(env, namespaceId, fateId, false, true, TableOperation.DELETE)
+ + Utils.reserveTable(env, tableId, fateId, false, true, TableOperation.DELETE);
}
private void preventFutureCompactions(Manager environment)
@@ -64,7 +65,7 @@ public class PreDeleteTable extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
try {
preventFutureCompactions(environment);
@@ -76,15 +77,15 @@ public class PreDeleteTable extends ManagerRepo {
}
return new DeleteTable(namespaceId, tableId);
} finally {
- Utils.unreserveTable(environment, tableId, tid, false);
- Utils.unreserveNamespace(environment, namespaceId, tid, false);
+ Utils.unreserveTable(environment, tableId, fateId, false);
+ Utils.unreserveNamespace(environment, namespaceId, fateId, false);
}
}
@Override
- public void undo(long tid, Manager env) {
- Utils.unreserveTable(env, tableId, tid, false);
- Utils.unreserveNamespace(env, namespaceId, tid, false);
+ public void undo(FateId fateId, Manager env) {
+ Utils.unreserveTable(env, tableId, fateId, false);
+ Utils.unreserveNamespace(env, namespaceId, fateId, false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
index f98e851f50..91d62752e7 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
@@ -27,7 +27,7 @@ import java.util.function.Consumer;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
@@ -52,9 +52,10 @@ public class ReserveTablets extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
- var opid = TabletOperationId.from(TabletOperationType.DELETING, tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.DELETING, fateId.getTid());
// The consumer may be called in another thread so use an AtomicLong
AtomicLong accepted = new AtomicLong(0);
@@ -62,8 +63,7 @@ public class ReserveTablets extends ManagerRepo {
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
accepted.incrementAndGet();
} else {
- log.debug("{} Failed to set operation id {} {}", FateTxId.formatTid(tid), opid,
- result.getExtent());
+ log.debug("{} Failed to set operation id {} {}", fateId, opid, result.getExtent());
}
};
@@ -98,7 +98,7 @@ public class ReserveTablets extends ManagerRepo {
if (locations > 0 || otherOps > 0 || submitted != accepted.get()) {
log.debug("{} Waiting to delete table locations:{} operations:{} submitted:{} accepted:{}",
- FateTxId.formatTid(tid), locations, otherOps, submitted, accepted.get());
+ fateId, locations, otherOps, submitted, accepted.get());
return Math.min(Math.max(100, tabletsSeen), 30000);
}
@@ -106,7 +106,7 @@ public class ReserveTablets extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
return new CleanUp(tableId, namespaceId);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
index a6f09dcbe0..4083eecc86 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
@@ -21,7 +21,7 @@ package org.apache.accumulo.manager.tableOps.merge;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -38,7 +38,7 @@ public class CountFiles extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
var range = data.getReserveExtent();
@@ -70,8 +70,7 @@ public class CountFiles extends ManagerRepo {
long maxFiles = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId())
.getCount(Property.TABLE_MERGE_FILE_MAX);
- log.debug("{} found {} files in the merge range, maxFiles is {}", FateTxId.formatTid(tid),
- totalFiles, maxFiles);
+ log.debug("{} found {} files in the merge range, maxFiles is {}", fateId, totalFiles, maxFiles);
if (totalFiles >= maxFiles) {
return new UnreserveAndError(data, totalFiles, maxFiles);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
index 1c01b82306..7df3561e1a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
@@ -39,7 +39,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
@@ -73,20 +73,20 @@ public class DeleteRows extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
// delete or fence files within the deletion range
- var mergeRange = deleteTabletFiles(manager, tid);
+ var mergeRange = deleteTabletFiles(manager, fateId);
// merge away empty tablets in the deletion range
return new MergeTablets(mergeRange.map(mre -> data.useMergeRange(mre)).orElse(data));
}
- private Optional<KeyExtent> deleteTabletFiles(Manager manager, long tid) {
+ private Optional<KeyExtent> deleteTabletFiles(Manager manager, FateId fateId) {
// Only delete data within the original extent specified by the user
KeyExtent range = data.getOriginalExtent();
- var fateStr = FateTxId.formatTid(tid);
- log.debug("{} deleting tablet files in range {}", fateStr, range);
- var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
+ log.debug("{} deleting tablet files in range {}", fateId, range);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid());
try (
var tabletsMetadata = manager.getContext().getAmple().readTablets()
@@ -98,7 +98,7 @@ public class DeleteRows extends ManagerRepo {
KeyExtent lastCompletelyContained = null;
for (var tabletMetadata : tabletsMetadata) {
- validateTablet(tabletMetadata, fateStr, opid, data.tableId);
+ validateTablet(tabletMetadata, fateId, opid, data.tableId);
var tabletMutator = tabletsMutator.mutateTablet(tabletMetadata.getExtent())
.requireOperation(opid).requireAbsentLocation();
@@ -114,7 +114,7 @@ public class DeleteRows extends ManagerRepo {
filesToDelete.addAll(tabletMetadata.getFiles());
} else {
Preconditions.checkState(range.overlaps(tabletMetadata.getExtent()),
- "%s tablet %s does not overlap delete range %s", fateStr, tabletMetadata.getExtent(),
+ "%s tablet %s does not overlap delete range %s", fateId, tabletMetadata.getExtent(),
range);
// Create the ranges for fencing the files, this takes the place of
@@ -144,7 +144,7 @@ public class DeleteRows extends ManagerRepo {
// of the newFiles set which means it is disjoint with all ranges
if (fenced != null) {
final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced);
- log.trace("{} Adding new file {} with range {}", fateStr, newFile.getMetadataPath(),
+ log.trace("{} Adding new file {} with range {}", fateId, newFile.getMetadataPath(),
newFile.getRange());
// Add the new file to the newFiles set, it will be added later if it doesn't match
@@ -155,7 +155,7 @@ public class DeleteRows extends ManagerRepo {
// with all ranges.
newFiles.add(newFile);
} else {
- log.trace("{} Found a disjoint file {} with range {} on delete", fateStr,
+ log.trace("{} Found a disjoint file {} with range {} on delete", fateId,
existing.getMetadataPath(), existing.getRange());
}
}
@@ -184,9 +184,9 @@ public class DeleteRows extends ManagerRepo {
}
}
- filesToDelete.forEach(file -> log.debug("{} deleting file {} for {}", fateStr, file,
+ filesToDelete.forEach(file -> log.debug("{} deleting file {} for {}", fateId, file,
tabletMetadata.getExtent()));
- filesToAddMap.forEach((file, dfv) -> log.debug("{} adding file {} {} for {}", fateStr, file,
+ filesToAddMap.forEach((file, dfv) -> log.debug("{} adding file {} {} for {}", fateId, file,
dfv, tabletMetadata.getExtent()));
filesToDelete.forEach(tabletMutator::deleteFile);
@@ -197,7 +197,7 @@ public class DeleteRows extends ManagerRepo {
}
var results = tabletsMutator.process();
- verifyAccepted(results, fateStr);
+ verifyAccepted(results, fateId);
return computeMergeRange(range, firstCompleteContained, lastCompletelyContained);
}
@@ -228,16 +228,16 @@ public class DeleteRows extends ManagerRepo {
.of(new KeyExtent(deleteRange.tableId(), end, firstCompleteContained.prevEndRow()));
}
- static void verifyAccepted(Map<KeyExtent,Ample.ConditionalResult> results, String fateStr) {
+ static void verifyAccepted(Map<KeyExtent,Ample.ConditionalResult> results, FateId fateId) {
if (results.values().stream()
.anyMatch(conditionalResult -> conditionalResult.getStatus() != Status.ACCEPTED)) {
results.forEach(((extent, conditionalResult) -> {
if (conditionalResult.getStatus() != Status.ACCEPTED) {
- log.error("{} failed to update {}", fateStr, extent);
+ log.error("{} failed to update {}", fateId, extent);
}
}));
- throw new IllegalStateException(fateStr + " failed to update tablet files");
+ throw new IllegalStateException(fateId + " failed to update tablet files");
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java
index 49190e9e0f..53c24959ae 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
@@ -56,12 +56,12 @@ public class DeleteTablets extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
- var fateStr = FateTxId.formatTid(tid);
KeyExtent range = data.getMergeExtent();
- log.debug("{} Deleting tablets for {}", fateStr, range);
- var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
+ log.debug("{} Deleting tablets for {}", fateId, range);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid());
AtomicLong acceptedCount = new AtomicLong();
AtomicLong rejectedCount = new AtomicLong();
@@ -70,7 +70,7 @@ public class DeleteTablets extends ManagerRepo {
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
acceptedCount.incrementAndGet();
} else {
- log.error("{} failed to update {}", fateStr, result.getExtent());
+ log.error("{} failed to update {}", fateId, result.getExtent());
rejectedCount.incrementAndGet();
}
};
@@ -87,7 +87,7 @@ public class DeleteTablets extends ManagerRepo {
var lastEndRow = lastTabletEndRow == null ? null : new Text(lastTabletEndRow);
for (var tabletMeta : tabletsMetadata) {
- MergeTablets.validateTablet(tabletMeta, fateStr, opid, data.tableId);
+ MergeTablets.validateTablet(tabletMeta, fateId, opid, data.tableId);
var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent())
.requireOperation(opid).requireAbsentLocation();
@@ -102,7 +102,7 @@ public class DeleteTablets extends ManagerRepo {
}
tabletMeta.getKeyValues().keySet().forEach(key -> {
- log.trace("{} deleting {}", fateStr, key);
+ log.trace("{} deleting {}", fateId, key);
});
tabletMutator.deleteAll(tabletMeta.getKeyValues().keySet());
@@ -116,7 +116,7 @@ public class DeleteTablets extends ManagerRepo {
"Failed to delete tablets accepted:%s != %s rejected:%s", acceptedCount.get(), submitted,
rejectedCount.get());
- log.debug("{} deleted {} tablets", fateStr, submitted);
+ log.debug("{} deleted {} tablets", fateId, submitted);
return new FinishTableRangeOp(data);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
index c58efca885..643a3428a9 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
@@ -51,19 +51,19 @@ class FinishTableRangeOp extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
- removeOperationIds(log, data, tid, manager);
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
+ removeOperationIds(log, data, fateId, manager);
- Utils.unreserveTable(manager, data.tableId, tid, true);
- Utils.unreserveNamespace(manager, data.namespaceId, tid, false);
+ Utils.unreserveTable(manager, data.tableId, fateId, true);
+ Utils.unreserveNamespace(manager, data.namespaceId, fateId, false);
return null;
}
- static void removeOperationIds(Logger log, MergeInfo data, long tid, Manager manager) {
+ static void removeOperationIds(Logger log, MergeInfo data, FateId fateId, Manager manager) {
KeyExtent range = data.getReserveExtent();
- var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
- log.debug("{} unreserving tablet in range {}", FateTxId.formatTid(tid), range);
- var fateStr = FateTxId.formatTid(tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid());
+ log.debug("{} unreserving tablet in range {}", fateId, range);
AtomicLong acceptedCount = new AtomicLong();
AtomicLong rejectedCount = new AtomicLong();
@@ -72,7 +72,7 @@ class FinishTableRangeOp extends ManagerRepo {
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
acceptedCount.incrementAndGet();
} else {
- log.error("{} failed to update {}", fateStr, result.getExtent());
+ log.error("{} failed to update {}", fateId, result.getExtent());
rejectedCount.incrementAndGet();
}
};
@@ -97,11 +97,10 @@ class FinishTableRangeOp extends ManagerRepo {
Preconditions.checkState(count > 0);
}
- log.debug("{} deleted {}/{} opids out of {} tablets", FateTxId.formatTid(tid),
- acceptedCount.get(), submitted, count);
+ log.debug("{} deleted {}/{} opids out of {} tablets", fateId, acceptedCount.get(), submitted,
+ count);
- manager.getEventCoordinator().event(range, "Merge or deleterows completed %s",
- FateTxId.formatTid(tid));
+ manager.getEventCoordinator().event(range, "Merge or deleterows completed %s", fateId);
Preconditions.checkState(acceptedCount.get() == submitted && rejectedCount.get() == 0,
"Failed to delete tablets accepted:%s != %s rejected:%s", acceptedCount.get(), submitted,
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
index 6428dcdbf1..b46f7bd2ef 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
@@ -42,7 +42,7 @@ import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -74,12 +74,12 @@ public class MergeTablets extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
- var fateStr = FateTxId.formatTid(tid);
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
KeyExtent range = data.getMergeExtent();
- log.debug("{} Merging metadata for {}", fateStr, range);
+ log.debug("{} Merging metadata for {}", fateId, range);
- var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid());
Set<TabletAvailability> tabletAvailabilities = new HashSet<>();
MetadataTime maxLogicalTime = null;
List<ReferenceFile> dirs = new ArrayList<>();
@@ -96,9 +96,9 @@ public class MergeTablets extends ManagerRepo {
for (var tabletMeta : tabletsMetadata) {
Preconditions.checkState(lastTabletMeta == null,
- "%s unexpectedly saw multiple last tablets %s %s", fateStr, tabletMeta.getExtent(),
+ "%s unexpectedly saw multiple last tablets %s %s", fateId, tabletMeta.getExtent(),
range);
- validateTablet(tabletMeta, fateStr, opid, data.tableId);
+ validateTablet(tabletMeta, fateId, opid, data.tableId);
if (firstTabletMeta == null) {
firstTabletMeta = Objects.requireNonNull(tabletMeta);
@@ -143,8 +143,8 @@ public class MergeTablets extends ManagerRepo {
lastTabletMeta);
}
- log.info("{} merge low tablet {}", fateStr, firstTabletMeta.getExtent());
- log.info("{} merge high tablet {}", fateStr, lastTabletMeta.getExtent());
+ log.info("{} merge low tablet {}", fateId, firstTabletMeta.getExtent());
+ log.info("{} merge high tablet {}", fateId, lastTabletMeta.getExtent());
// Check if the last tablet was already updated, this could happen if a process died and this
// code is running a 2nd time. If running a 2nd time it possible the last tablet was updated and
@@ -184,7 +184,7 @@ public class MergeTablets extends ManagerRepo {
// successful.
tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet());
- verifyAccepted(tabletsMutator.process(), fateStr);
+ verifyAccepted(tabletsMutator.process(), fateId);
}
}
@@ -195,21 +195,21 @@ public class MergeTablets extends ManagerRepo {
return new DeleteTablets(data, lastTabletMeta.getEndRow());
}
- static void validateTablet(TabletMetadata tabletMeta, String fateStr, TabletOperationId opid,
+ static void validateTablet(TabletMetadata tabletMeta, FateId fateId, TabletOperationId opid,
TableId expectedTableId) {
// its expected at this point that tablets have our operation id and no location, so lets
// check that
Preconditions.checkState(tabletMeta.getLocation() == null,
- "%s merging tablet %s had location %s", fateStr, tabletMeta.getExtent(),
+ "%s merging tablet %s had location %s", fateId, tabletMeta.getExtent(),
tabletMeta.getLocation());
Preconditions.checkState(opid.equals(tabletMeta.getOperationId()),
- "%s merging tablet %s had unexpected opid %s", fateStr, tabletMeta.getExtent(),
+ "%s merging tablet %s had unexpected opid %s", fateId, tabletMeta.getExtent(),
tabletMeta.getOperationId());
Preconditions.checkState(expectedTableId.equals(tabletMeta.getTableId()),
- "%s tablet %s has unexpected table id %s expected %s", fateStr, tabletMeta.getExtent(),
+ "%s tablet %s has unexpected table id %s expected %s", fateId, tabletMeta.getExtent(),
tabletMeta.getTableId(), expectedTableId);
Preconditions.checkState(tabletMeta.getLogs().isEmpty(),
- "%s merging tablet %s has unexpected walogs %s", fateStr, tabletMeta.getExtent(),
+ "%s merging tablet %s has unexpected walogs %s", fateId, tabletMeta.getExtent(),
tabletMeta.getLogs().size());
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
index e2fd71e65b..df8bd977ba 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
@@ -26,7 +26,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
@@ -52,10 +52,11 @@ public class ReserveTablets extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager env) throws Exception {
+ public long isReady(FateId fateId, Manager env) throws Exception {
var range = data.getReserveExtent();
- log.debug("{} reserving tablets in range {}", FateTxId.formatTid(tid), range);
- var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
+ log.debug("{} reserving tablets in range {}", fateId, range);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid());
AtomicLong opsAccepted = new AtomicLong(0);
Consumer<Ample.ConditionalResult> resultConsumer = result -> {
@@ -98,7 +99,7 @@ public class ReserveTablets extends ManagerRepo {
log.debug(
"{} reserve tablets op:{} count:{} other opids:{} opids set:{} locations:{} accepted:{} wals:{}",
- FateTxId.formatTid(tid), data.op, count, otherOps, opsSet, locations, opsAccepted, wals);
+ fateId, data.op, count, otherOps, opsSet, locations, opsAccepted, wals);
// while there are table lock a tablet can be concurrently deleted, so should always see
// tablets
@@ -108,7 +109,7 @@ public class ReserveTablets extends ManagerRepo {
// operation ids were set and tablets have locations, so lets send a signal to get them
// unassigned
env.getEventCoordinator().event(range, "Tablets %d were reserved for merge %s",
- opsAccepted.get(), FateTxId.formatTid(tid));
+ opsAccepted.get(), fateId);
}
if (locations > 0 || otherOps > 0 || wals > 0) {
@@ -126,7 +127,7 @@ public class ReserveTablets extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
return new CountFiles(data);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
index f2b93315a7..2165629244 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.merge;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.util.TextUtil;
@@ -39,9 +40,9 @@ public class TableRangeOp extends ManagerRepo {
private final MergeInfo data;
@Override
- public long isReady(long tid, Manager env) throws Exception {
- return Utils.reserveNamespace(env, data.namespaceId, tid, false, true, TableOperation.MERGE)
- + Utils.reserveTable(env, data.tableId, tid, true, true, TableOperation.MERGE);
+ public long isReady(FateId fateId, Manager env) throws Exception {
+ return Utils.reserveNamespace(env, data.namespaceId, fateId, false, true, TableOperation.MERGE)
+ + Utils.reserveTable(env, data.tableId, fateId, true, true, TableOperation.MERGE);
}
public TableRangeOp(MergeInfo.Operation op, NamespaceId namespaceId, TableId tableId,
@@ -52,7 +53,7 @@ public class TableRangeOp extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
if (AccumuloTable.ROOT.tableId().equals(data.tableId)
&& MergeInfo.Operation.MERGE.equals(data.op)) {
@@ -68,8 +69,8 @@ public class TableRangeOp extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) throws Exception {
- Utils.unreserveNamespace(env, data.namespaceId, tid, false);
- Utils.unreserveTable(env, data.tableId, tid, true);
+ public void undo(FateId fateId, Manager env) throws Exception {
+ Utils.unreserveNamespace(env, data.namespaceId, fateId, false);
+ Utils.unreserveTable(env, data.tableId, fateId, true);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java
index 6d6e7c9e78..926e190c88 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.merge;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -41,8 +42,8 @@ public class UnreserveAndError extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
- FinishTableRangeOp.removeOperationIds(log, mergeInfo, tid, environment);
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
+ FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment);
throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null,
mergeInfo.op == MergeInfo.Operation.MERGE ? TableOperation.MERGE
: TableOperation.DELETE_RANGE,
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/CreateNamespace.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/CreateNamespace.java
index eb2e07559e..854bc28aa6 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/CreateNamespace.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/CreateNamespace.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.namespace.create;
import java.util.Map;
import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -39,12 +40,12 @@ public class CreateNamespace extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
Utils.getIdLock().lock();
try {
namespaceInfo.namespaceId =
@@ -57,7 +58,7 @@ public class CreateNamespace extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) {
+ public void undo(FateId fateId, Manager env) {
// nothing to do, the namespace id was allocated!
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/FinishCreateNamespace.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/FinishCreateNamespace.java
index bb462f94fe..e22e1f30c3 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/FinishCreateNamespace.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/FinishCreateNamespace.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.tableOps.namespace.create;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -35,14 +36,14 @@ class FinishCreateNamespace extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long id, Manager env) {
+ public Repo<Manager> call(FateId fateId, Manager env) {
- Utils.unreserveNamespace(env, namespaceInfo.namespaceId, id, true);
+ Utils.unreserveNamespace(env, namespaceInfo.namespaceId, fateId, true);
env.getEventCoordinator().event("Created namespace %s ", namespaceInfo.namespaceName);
@@ -58,6 +59,6 @@ class FinishCreateNamespace extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) {}
+ public void undo(FateId fateId, Manager env) {}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
index 1b054b228e..58fd48b958 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.namespace.create;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.manager.Manager;
@@ -39,13 +40,13 @@ class PopulateZookeeperWithNamespace extends ManagerRepo {
}
@Override
- public long isReady(long id, Manager environment) throws Exception {
- return Utils.reserveNamespace(environment, namespaceInfo.namespaceId, id, true, false,
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ return Utils.reserveNamespace(environment, namespaceInfo.namespaceId, fateId, true, false,
TableOperation.CREATE);
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
Utils.getTableNameLock().lock();
try {
@@ -68,10 +69,10 @@ class PopulateZookeeperWithNamespace extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager manager) throws Exception {
+ public void undo(FateId fateId, Manager manager) throws Exception {
manager.getTableManager().removeNamespace(namespaceInfo.namespaceId);
manager.getContext().clearTableListCache();
- Utils.unreserveNamespace(manager, namespaceInfo.namespaceId, tid, true);
+ Utils.unreserveNamespace(manager, namespaceInfo.namespaceId, fateId, true);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/SetupNamespacePermissions.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/SetupNamespacePermissions.java
index 5efa183a1c..bee3a21a51 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/SetupNamespacePermissions.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/SetupNamespacePermissions.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.namespace.create;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.security.NamespacePermission;
import org.apache.accumulo.manager.Manager;
@@ -37,7 +38,7 @@ class SetupNamespacePermissions extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fate, Manager env) throws Exception {
// give all namespace permissions to the creator
SecurityOperation security = env.getContext().getSecurityOperation();
for (var permission : NamespacePermission.values()) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/DeleteNamespace.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/DeleteNamespace.java
index 6dd61c52d3..4f2c5a5119 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/DeleteNamespace.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/DeleteNamespace.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.namespace.delete;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -36,19 +37,20 @@ public class DeleteNamespace extends ManagerRepo {
}
@Override
- public long isReady(long id, Manager environment) throws Exception {
- return Utils.reserveNamespace(environment, namespaceId, id, true, true, TableOperation.DELETE);
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ return Utils.reserveNamespace(environment, namespaceId, fateId, true, true,
+ TableOperation.DELETE);
}
@Override
- public Repo<Manager> call(long tid, Manager environment) {
+ public Repo<Manager> call(FateId fateId, Manager environment) {
environment.getEventCoordinator().event("deleting namespace %s ", namespaceId);
return new NamespaceCleanUp(namespaceId);
}
@Override
- public void undo(long id, Manager environment) {
- Utils.unreserveNamespace(environment, namespaceId, id, true);
+ public void undo(FateId fateId, Manager environment) {
+ Utils.unreserveNamespace(environment, namespaceId, fateId, true);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/NamespaceCleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/NamespaceCleanUp.java
index 27fc850d0f..1123753388 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/NamespaceCleanUp.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/delete/NamespaceCleanUp.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.namespace.delete;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -40,12 +41,12 @@ class NamespaceCleanUp extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) {
+ public long isReady(FateId fateId, Manager manager) {
return 0;
}
@Override
- public Repo<Manager> call(long id, Manager manager) {
+ public Repo<Manager> call(FateId fateId, Manager manager) {
// remove from zookeeper
try {
@@ -63,7 +64,7 @@ class NamespaceCleanUp extends ManagerRepo {
log.error("{}", e.getMessage(), e);
}
- Utils.unreserveNamespace(manager, namespaceId, id, true);
+ Utils.unreserveNamespace(manager, namespaceId, fateId, true);
log.debug("Deleted namespace " + namespaceId);
@@ -71,7 +72,7 @@ class NamespaceCleanUp extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) {
+ public void undo(FateId fateId, Manager environment) {
// nothing to do
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
index 4eabcde2a3..1a0f290635 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationExcepti
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.manager.Manager;
@@ -40,8 +41,9 @@ public class RenameNamespace extends ManagerRepo {
private String newName;
@Override
- public long isReady(long id, Manager environment) throws Exception {
- return Utils.reserveNamespace(environment, namespaceId, id, true, true, TableOperation.RENAME);
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ return Utils.reserveNamespace(environment, namespaceId, fateId, true, true,
+ TableOperation.RENAME);
}
public RenameNamespace(NamespaceId namespaceId, String oldName, String newName) {
@@ -51,7 +53,7 @@ public class RenameNamespace extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long id, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
ZooReaderWriter zoo = manager.getContext().getZooReaderWriter();
@@ -77,7 +79,7 @@ public class RenameNamespace extends ManagerRepo {
manager.getContext().clearTableListCache();
} finally {
Utils.getTableNameLock().unlock();
- Utils.unreserveNamespace(manager, namespaceId, id, true);
+ Utils.unreserveNamespace(manager, namespaceId, fateId, true);
}
LoggerFactory.getLogger(RenameNamespace.class).debug("Renamed namespace {} {} {}", namespaceId,
@@ -87,8 +89,8 @@ public class RenameNamespace extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) {
- Utils.unreserveNamespace(env, namespaceId, tid, true);
+ public void undo(FateId fateId, Manager env) {
+ Utils.unreserveNamespace(env, namespaceId, fateId, true);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
index a91b0ca8a8..b5f8ae2eaa 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.util.Pair;
@@ -46,9 +47,9 @@ public class RenameTable extends ManagerRepo {
private String newTableName;
@Override
- public long isReady(long tid, Manager env) throws Exception {
- return Utils.reserveNamespace(env, namespaceId, tid, false, true, TableOperation.RENAME)
- + Utils.reserveTable(env, tableId, tid, true, true, TableOperation.RENAME);
+ public long isReady(FateId fateId, Manager env) throws Exception {
+ return Utils.reserveNamespace(env, namespaceId, fateId, false, true, TableOperation.RENAME)
+ + Utils.reserveTable(env, tableId, fateId, true, true, TableOperation.RENAME);
}
public RenameTable(NamespaceId namespaceId, TableId tableId, String oldTableName,
@@ -60,7 +61,7 @@ public class RenameTable extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
Pair<String,String> qualifiedOldTableName = TableNameUtil.qualify(oldTableName);
Pair<String,String> qualifiedNewTableName = TableNameUtil.qualify(newTableName);
@@ -100,8 +101,8 @@ public class RenameTable extends ManagerRepo {
manager.getContext().clearTableListCache();
} finally {
Utils.getTableNameLock().unlock();
- Utils.unreserveTable(manager, tableId, tid, true);
- Utils.unreserveNamespace(manager, namespaceId, tid, false);
+ Utils.unreserveTable(manager, tableId, fateId, true);
+ Utils.unreserveNamespace(manager, namespaceId, fateId, false);
}
LoggerFactory.getLogger(RenameTable.class).debug("Renamed table {} {} {}", tableId,
@@ -111,9 +112,9 @@ public class RenameTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) {
- Utils.unreserveTable(env, tableId, tid, true);
- Utils.unreserveNamespace(env, namespaceId, tid, false);
+ public void undo(FateId fateId, Manager env) {
+ Utils.unreserveTable(env, tableId, fateId, true);
+ Utils.unreserveNamespace(env, namespaceId, fateId, false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
index a2c7169ea5..e3262daa8c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.split;
import java.util.stream.Collectors;
import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.metadata.schema.Ample;
@@ -39,9 +40,10 @@ public class DeleteOperationIds extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
- var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid());
try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
index 6704ef6d54..813f525692 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
@@ -30,7 +30,7 @@ import java.util.Optional;
import java.util.SortedSet;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
@@ -61,12 +61,13 @@ public class PreSplit extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
// ELASTICITY_TODO intentionally not getting the table lock because not sure if its needed,
// revist later when more operations are moved out of tablet server
- var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid());
// ELASTICITY_TODO write IT that spins up 100 threads that all try to add a diff split to
// the same tablet.
@@ -79,7 +80,7 @@ public class PreSplit extends ManagerRepo {
var tabletMetadata = manager.getContext().getAmple().readTablet(splitInfo.getOriginal(),
PREV_ROW, LOCATION, OPID);
- log.trace("Attempting tablet split {} {} {}", FateTxId.formatTid(tid), splitInfo.getOriginal(),
+ log.trace("Attempting tablet split {} {} {}", fateId, splitInfo.getOriginal(),
tabletMetadata == null ? null : tabletMetadata.getLocation());
if (tabletMetadata == null || (tabletMetadata.getOperationId() != null
@@ -103,19 +104,19 @@ public class PreSplit extends ManagerRepo {
Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process();
if (results.get(splitInfo.getOriginal()).getStatus() == Status.ACCEPTED) {
- log.trace("Successfully set operation id for split {}", FateTxId.formatTid(tid));
+ log.trace("Successfully set operation id for split {}", fateId);
if (tabletMetadata.getLocation() == null) {
// the operation id was set and there is no location, so can move on
return 0;
} else {
// now that the operation id set, generate an event to unload the tablet
manager.getEventCoordinator().event(splitInfo.getOriginal(),
- "Set operation id %s on tablet for split", FateTxId.formatTid(tid));
+ "Set operation id %s on tablet for split", fateId);
// the operation id was set, but a location is also set wait for it be unset
return 1000;
}
} else {
- log.trace("Failed to set operation id for split {}", FateTxId.formatTid(tid));
+ log.trace("Failed to set operation id for split {}", fateId);
// something changed with the tablet, so setting the operation id failed. Try again later
return 1000;
}
@@ -124,22 +125,22 @@ public class PreSplit extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
manager.getSplitter().removeSplitStarting(splitInfo.getOriginal());
TabletMetadata tabletMetadata = manager.getContext().getAmple()
.readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID);
- var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid());
if (tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId())) {
// the tablet no longer exists or we could not set the operation id, maybe another operation
// was running, lets not proceed with the split.
var optMeta = Optional.ofNullable(tabletMetadata);
- log.trace("{} Not proceeding with split. extent:{} location:{} opid:{}",
- FateTxId.formatTid(tid), splitInfo.getOriginal(),
- optMeta.map(TabletMetadata::getLocation).orElse(null),
+ log.trace("{} Not proceeding with split. extent:{} location:{} opid:{}", fateId,
+ splitInfo.getOriginal(), optMeta.map(TabletMetadata::getLocation).orElse(null),
optMeta.map(TabletMetadata::getOperationId).orElse(null));
return null;
}
@@ -147,8 +148,8 @@ public class PreSplit extends ManagerRepo {
// Its expected that the tablet has no location at this point and if it does its an indication
// of a bug.
Preconditions.checkState(tabletMetadata.getLocation() == null,
- "Tablet unexpectedly had location set %s %s %s", FateTxId.formatTid(tid),
- tabletMetadata.getLocation(), tabletMetadata.getExtent());
+ "Tablet unexpectedly had location set %s %s %s", fateId, tabletMetadata.getLocation(),
+ tabletMetadata.getExtent());
// Create the dir name here for the next step. If the next step fails it will always have the
// same dir name each time it runs again making it idempotent.
@@ -158,14 +159,14 @@ public class PreSplit extends ManagerRepo {
splitInfo.getSplits().forEach(split -> {
String dirName = TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split);
dirs.add(dirName);
- log.trace("{} allocated dir name {}", FateTxId.formatTid(tid), dirName);
+ log.trace("{} allocated dir name {}", fateId, dirName);
});
return new UpdateTablets(splitInfo, dirs);
}
@Override
- public void undo(long tid, Manager manager) throws Exception {
+ public void undo(FateId fateId, Manager manager) throws Exception {
// TODO is this called if isReady fails?
manager.getSplitter().removeSplitStarting(splitInfo.getOriginal());
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
index 413e41fd39..b3d7a85cf7 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -57,11 +58,12 @@ public class UpdateTablets extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
TabletMetadata tabletMetadata =
manager.getContext().getAmple().readTablet(splitInfo.getOriginal());
- var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid());
if (tabletMetadata == null) {
// check to see if this operation has already succeeded.
@@ -73,7 +75,7 @@ public class UpdateTablets extends ManagerRepo {
// lets go ahead and return the next step.
log.trace(
"{} creating new tablet was rejected because it existed, operation probably failed before.",
- FateTxId.formatTid(tid));
+ fateId);
return new DeleteOperationIds(splitInfo);
} else {
throw new IllegalStateException("Tablet is in an unexpected condition "
@@ -95,12 +97,12 @@ public class UpdateTablets extends ManagerRepo {
var newTabletsFiles = getNewTabletFiles(newTablets, tabletMetadata,
file -> manager.getSplitter().getCachedFileInfo(splitInfo.getOriginal().tableId(), file));
- addNewTablets(tid, manager, tabletMetadata, opid, newTablets, newTabletsFiles);
+ addNewTablets(fateId, manager, tabletMetadata, opid, newTablets, newTabletsFiles);
// Only update the original tablet after successfully creating the new tablets, this is
// important for failure cases where this operation partially runs a then runs again.
- updateExistingTablet(tid, manager, tabletMetadata, opid, newTablets, newTabletsFiles);
+ updateExistingTablet(fateId, manager, tabletMetadata, opid, newTablets, newTabletsFiles);
return new DeleteOperationIds(splitInfo);
}
@@ -165,7 +167,7 @@ public class UpdateTablets extends ManagerRepo {
return tabletsFiles;
}
- private void addNewTablets(long tid, Manager manager, TabletMetadata tabletMetadata,
+ private void addNewTablets(FateId fateId, Manager manager, TabletMetadata tabletMetadata,
TabletOperationId opid, SortedSet<KeyExtent> newTablets,
Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) {
Iterator<String> dirNameIter = dirNames.iterator();
@@ -186,9 +188,10 @@ public class UpdateTablets extends ManagerRepo {
mutator.putPrevEndRow(newExtent.prevEndRow());
tabletMetadata.getCompacted().forEach(mutator::putCompacted);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
tabletMetadata.getCompacted()
- .forEach(ctid -> log.debug("{} copying compacted marker to new child tablet {}",
- FateTxId.formatTid(tid), FateTxId.formatTid(ctid)));
+ .forEach(ctid -> log.debug("{} copying compacted marker to new child tablet {}", fateId,
+ FateTxId.formatTid(ctid)));
mutator.putTabletAvailability(tabletMetadata.getTabletAvailability());
@@ -210,7 +213,7 @@ public class UpdateTablets extends ManagerRepo {
}
}
- private void updateExistingTablet(long tid, Manager manager, TabletMetadata tabletMetadata,
+ private void updateExistingTablet(FateId fateId, Manager manager, TabletMetadata tabletMetadata,
TabletOperationId opid, SortedSet<KeyExtent> newTablets,
Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) {
try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
@@ -233,15 +236,14 @@ public class UpdateTablets extends ManagerRepo {
// remove any external compaction entries that are present
tabletMetadata.getExternalCompactions().keySet().forEach(mutator::deleteExternalCompaction);
- tabletMetadata.getExternalCompactions().keySet()
- .forEach(ecid -> log.debug("{} deleting external compaction entry for split {}",
- FateTxId.formatTid(tid), ecid));
+ tabletMetadata.getExternalCompactions().keySet().forEach(
+ ecid -> log.debug("{} deleting external compaction entry for split {}", fateId, ecid));
// remove any selected file entries that are present, the compaction operation will need to
// reselect files
if (tabletMetadata.getSelectedFiles() != null) {
mutator.deleteSelectedFiles();
- log.debug("{} deleting selected files {} because of split", FateTxId.formatTid(tid),
+ log.debug("{} deleting selected files {} because of split", fateId,
FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId()));
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/ExportTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/ExportTable.java
index 8662607c18..dcb277c747 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/ExportTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/ExportTable.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.tableExport;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -40,18 +41,19 @@ public class ExportTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) throws Exception {
- return Utils.reserveHdfsDirectory(environment, new Path(tableInfo.exportDir).toString(), tid);
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ return Utils.reserveHdfsDirectory(environment, new Path(tableInfo.exportDir).toString(),
+ fateId);
}
@Override
- public Repo<Manager> call(long tid, Manager env) {
+ public Repo<Manager> call(FateId fateId, Manager env) {
return new WriteExportFiles(tableInfo);
}
@Override
- public void undo(long tid, Manager env) throws Exception {
- Utils.unreserveHdfsDirectory(env, new Path(tableInfo.exportDir).toString(), tid);
+ public void undo(FateId fateId, Manager env) throws Exception {
+ Utils.unreserveHdfsDirectory(env, new Path(tableInfo.exportDir).toString(), fateId);
}
/**
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java
index e718fb5edd..85dcd5298f 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.AccumuloTable;
@@ -90,11 +91,12 @@ class WriteExportFiles extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) throws Exception {
+ public long isReady(FateId fateId, Manager manager) throws Exception {
- long reserved = Utils.reserveNamespace(manager, tableInfo.namespaceID, tid, false, true,
+ long reserved = Utils.reserveNamespace(manager, tableInfo.namespaceID, fateId, false, true,
TableOperation.EXPORT)
- + Utils.reserveTable(manager, tableInfo.tableID, tid, false, true, TableOperation.EXPORT);
+ + Utils.reserveTable(manager, tableInfo.tableID, fateId, false, true,
+ TableOperation.EXPORT);
if (reserved > 0) {
return reserved;
}
@@ -132,7 +134,7 @@ class WriteExportFiles extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
try {
exportTable(manager.getVolumeManager(), manager.getContext(), tableInfo.tableName,
tableInfo.tableID, tableInfo.exportDir);
@@ -141,16 +143,16 @@ class WriteExportFiles extends ManagerRepo {
tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
"Failed to create export files " + ioe.getMessage());
}
- Utils.unreserveNamespace(manager, tableInfo.namespaceID, tid, false);
- Utils.unreserveTable(manager, tableInfo.tableID, tid, false);
- Utils.unreserveHdfsDirectory(manager, new Path(tableInfo.exportDir).toString(), tid);
+ Utils.unreserveNamespace(manager, tableInfo.namespaceID, fateId, false);
+ Utils.unreserveTable(manager, tableInfo.tableID, fateId, false);
+ Utils.unreserveHdfsDirectory(manager, new Path(tableInfo.exportDir).toString(), fateId);
return null;
}
@Override
- public void undo(long tid, Manager env) {
- Utils.unreserveNamespace(env, tableInfo.namespaceID, tid, false);
- Utils.unreserveTable(env, tableInfo.tableID, tid, false);
+ public void undo(FateId fateId, Manager env) {
+ Utils.unreserveNamespace(env, tableInfo.namespaceID, fateId, false);
+ Utils.unreserveTable(env, tableInfo.tableID, fateId, false);
}
public static void exportTable(VolumeManager fs, ServerContext context, String tableName,
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java
index 3888b8c23d..2102a08fcb 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Set;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -41,7 +42,7 @@ class CreateImportDir extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
Set<String> tableDirs = manager.getContext().getTablesDirs();
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java
index 5b2b294c61..245d10676a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java
@@ -22,6 +22,7 @@ import static org.apache.accumulo.core.Constants.IMPORT_MAPPINGS_FILE;
import java.util.EnumSet;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
@@ -41,12 +42,12 @@ class FinishImportTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
if (!tableInfo.keepMappings) {
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
@@ -58,11 +59,11 @@ class FinishImportTable extends ManagerRepo {
final TableState newState = tableInfo.keepOffline ? TableState.OFFLINE : TableState.ONLINE;
env.getTableManager().transitionTableState(tableInfo.tableId, newState, expectedCurrStates);
- Utils.unreserveNamespace(env, tableInfo.namespaceId, tid, false);
- Utils.unreserveTable(env, tableInfo.tableId, tid, true);
+ Utils.unreserveNamespace(env, tableInfo.namespaceId, fateId, false);
+ Utils.unreserveTable(env, tableInfo.tableId, fateId, true);
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
- Utils.unreserveHdfsDirectory(env, new Path(dm.exportDir).toString(), tid);
+ Utils.unreserveHdfsDirectory(env, new Path(dm.exportDir).toString(), fateId);
}
env.getEventCoordinator().event(tableInfo.tableId, "Imported table %s ", tableInfo.tableName);
@@ -79,6 +80,6 @@ class FinishImportTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) {}
+ public void undo(FateId fateId, Manager env) {}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
index eb57a80032..2b3af949db 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.util.tables.TableNameUtil;
import org.apache.accumulo.manager.Manager;
@@ -49,8 +50,8 @@ class ImportPopulateZookeeper extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) throws Exception {
- return Utils.reserveTable(environment, tableInfo.tableId, tid, true, false,
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ return Utils.reserveTable(environment, tableInfo.tableId, fateId, true, false,
TableOperation.IMPORT);
}
@@ -69,7 +70,7 @@ class ImportPopulateZookeeper extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
// reserve the table name in zookeeper or fail
Utils.getTableNameLock().lock();
@@ -102,9 +103,9 @@ class ImportPopulateZookeeper extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) throws Exception {
+ public void undo(FateId fateId, Manager env) throws Exception {
env.getTableManager().removeTable(tableInfo.tableId);
- Utils.unreserveTable(env, tableInfo.tableId, tid, true);
+ Utils.unreserveTable(env, tableInfo.tableId, fateId, true);
env.getContext().clearTableListCache();
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportSetupPermissions.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportSetupPermissions.java
index 8f53de1011..368bdcee51 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportSetupPermissions.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportSetupPermissions.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.tableImport;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.manager.Manager;
@@ -37,12 +38,12 @@ class ImportSetupPermissions extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) {
+ public long isReady(FateId fateId, Manager environment) {
return 0;
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
// give all table permissions to the creator
SecurityOperation security = env.getContext().getSecurityOperation();
for (TablePermission permission : TablePermission.values()) {
@@ -62,7 +63,7 @@ class ImportSetupPermissions extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) throws Exception {
+ public void undo(FateId fateId, Manager env) throws Exception {
env.getContext().getSecurityOperation().deleteTable(env.getContext().rpcCreds(),
tableInfo.tableId, tableInfo.namespaceId);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java
index 58993783b7..90eb7f1fbf 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -73,18 +74,18 @@ public class ImportTable extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager environment) throws Exception {
+ public long isReady(FateId fateId, Manager environment) throws Exception {
long result = 0;
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
- result += Utils.reserveHdfsDirectory(environment, new Path(dm.exportDir).toString(), tid);
+ result += Utils.reserveHdfsDirectory(environment, new Path(dm.exportDir).toString(), fateId);
}
- result += Utils.reserveNamespace(environment, tableInfo.namespaceId, tid, false, true,
+ result += Utils.reserveNamespace(environment, tableInfo.namespaceId, fateId, false, true,
TableOperation.IMPORT);
return result;
}
@Override
- public Repo<Manager> call(long tid, Manager env) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
checkVersions(env);
// first step is to reserve a table id.. if the machine fails during this step
@@ -156,12 +157,12 @@ public class ImportTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) throws Exception {
+ public void undo(FateId fateId, Manager env) throws Exception {
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
- Utils.unreserveHdfsDirectory(env, new Path(dm.exportDir).toString(), tid);
+ Utils.unreserveHdfsDirectory(env, new Path(dm.exportDir).toString(), fateId);
}
- Utils.unreserveNamespace(env, tableInfo.namespaceId, tid, false);
+ Utils.unreserveNamespace(env, tableInfo.namespaceId, fateId, false);
}
static List<ImportedTableInfo.DirectoryMapping> parseExportDir(Set<String> exportDirs) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java
index 19c9635b99..9cb94144e1 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java
@@ -28,6 +28,7 @@ import java.io.OutputStreamWriter;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.manager.Manager;
@@ -51,7 +52,7 @@ class MapImportFileNames extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
Path path = new Path(dm.importDir, IMPORT_MAPPINGS_FILE);
@@ -116,7 +117,7 @@ class MapImportFileNames extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager env) throws Exception {
+ public void undo(FateId fateId, Manager env) throws Exception {
// TODO: will this be OK for partially complete operations?
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
env.getVolumeManager().deleteRecursively(new Path(dm.importDir));
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
index 87b4a62ef9..0bc2e86983 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
@@ -31,7 +31,7 @@ import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationExcepti
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -55,9 +55,7 @@ class MoveExportedFiles extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
- String fmtTid = FateTxId.formatTid(tid);
-
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
int workerCount = manager.getConfiguration().getCount(Property.MANAGER_RENAME_THREADS);
VolumeManager fs = manager.getVolumeManager();
Map<Path,Path> oldToNewPaths = new HashMap<>();
@@ -78,7 +76,7 @@ class MoveExportedFiles extends ManagerRepo {
Arrays.stream(importedFiles).map(fileStatusName).collect(Collectors.toSet());
if (log.isDebugEnabled()) {
- log.debug("{} files already present in imported (target) directory: {}", fmtTid,
+ log.debug("{} files already present in imported (target) directory: {}", fateId,
String.join(",", imported));
}
@@ -103,12 +101,13 @@ class MoveExportedFiles extends ManagerRepo {
// operation would be truly unexpected
oldToNewPaths.put(originalPath, newPath);
} else {
- log.debug("{} not moving (unmapped) file {}", fmtTid, originalPath);
+ log.debug("{} not moving (unmapped) file {}", fateId, originalPath);
}
}
}
try {
- fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fmtTid);
+ // ELASTICITY_TODO DEFERRED - ISSUE 4044
+ fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId.getHexTid());
} catch (IOException ioe) {
throw new AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null,
TableOperation.IMPORT, TableOperationExceptionType.OTHER, ioe.getCause().getMessage());
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java
index ae13813b03..830dc503fe 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -93,7 +94,7 @@ class PopulateMetadataTable extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
Path path = new Path(tableInfo.exportFile);
@@ -216,7 +217,7 @@ class PopulateMetadataTable extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager environment) throws Exception {
+ public void undo(FateId fateId, Manager environment) throws Exception {
MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment.getContext(),
environment.getManagerLock());
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java
index eb7ab83904..b43a0f9634 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tserverOps;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -51,7 +52,7 @@ public class ShutdownTServer extends ManagerRepo {
}
@Override
- public long isReady(long tid, Manager manager) {
+ public long isReady(FateId fateId, Manager manager) {
TServerInstance server = new TServerInstance(hostAndPort, serverSession);
// suppress assignment of tablets to the server
if (force) {
@@ -92,7 +93,7 @@ public class ShutdownTServer extends ManagerRepo {
}
@Override
- public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
// suppress assignment of tablets to the server
if (force) {
ZooReaderWriter zoo = manager.getContext().getZooReaderWriter();
@@ -109,5 +110,5 @@ public class ShutdownTServer extends ManagerRepo {
}
@Override
- public void undo(long tid, Manager m) {}
+ public void undo(FateId fateId, Manager m) {}
}
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java
index 9aff9a89fa..f73a94d170 100644
--- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java
@@ -25,6 +25,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Collections;
import java.util.HashMap;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.thrift.TableInfo;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
@@ -48,7 +50,7 @@ public class ShutdownTServerTest {
final ShutdownTServer op = new ShutdownTServer(tserver, force);
final Manager manager = EasyMock.createMock(Manager.class);
- final long tid = 1L;
+ final FateId fateId = FateId.from(FateInstanceType.USER, 1L);
final TServerConnection tserverCnxn = EasyMock.createMock(TServerConnection.class);
final TabletServerStatus status = new TabletServerStatus();
@@ -65,7 +67,7 @@ public class ShutdownTServerTest {
EasyMock.replay(tserverCnxn, manager);
// FATE op is not ready
- long wait = op.isReady(tid, manager);
+ long wait = op.isReady(fateId, manager);
assertTrue(wait > 0, "Expected wait to be greater than 0");
EasyMock.verify(tserverCnxn, manager);
@@ -87,10 +89,10 @@ public class ShutdownTServerTest {
EasyMock.replay(tserverCnxn, manager);
// FATE op is not ready
- wait = op.isReady(tid, manager);
+ wait = op.isReady(fateId, manager);
assertEquals(0, wait, "Expected wait to be 0");
- Repo<Manager> op2 = op.call(tid, manager);
+ Repo<Manager> op2 = op.call(fateId, manager);
assertNull(op2, "Expected no follow on step");
EasyMock.verify(tserverCnxn, manager);
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
index dff909896b..6c34c6aad3 100644
--- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -129,8 +129,7 @@ public class PrepBulkImportTest {
.map(Text::toString).orElse(null);
try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) {
- var extent =
- PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, maxTablets, 10001);
+ var extent = PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, maxTablets);
assertEquals(nke(minPrevEndRow, maxPrevEndRow), extent, loadRanges + " " + tabletRanges);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index ee9635b7e5..79cbdae89d 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -71,7 +70,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu
}
@Override
- public long isReady(long tid, TestEnv environment) throws Exception {
+ public long isReady(FateId fateId, TestEnv environment) throws Exception {
return 0;
}
@@ -81,18 +80,18 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu
}
@Override
- public Repo<TestEnv> call(long tid, TestEnv environment) throws Exception {
- LOG.debug("Entering call {}", FateTxId.formatTid(tid));
+ public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception {
+ LOG.debug("Entering call {}", fateId);
try {
FateIT.inCall();
return null;
} finally {
- LOG.debug("Leaving call {}", FateTxId.formatTid(tid));
+ LOG.debug("Leaving call {}", fateId);
}
}
@Override
- public void undo(long tid, TestEnv environment) throws Exception {
+ public void undo(FateId fateId, TestEnv environment) throws Exception {
}
@@ -121,8 +120,8 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu
}
@Override
- public long isReady(long tid, TestEnv environment) {
- LOG.debug("Fate {} delayed {}", tid, delay.get());
+ public long isReady(FateId fateId, TestEnv environment) {
+ LOG.debug("{} delayed {}", fateId, delay.get());
return delay.get();
}
@@ -132,15 +131,14 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu
}
@Override
- public Repo<TestEnv> call(long tid, TestEnv environment) throws Exception {
+ public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception {
callLatch.await();
- LOG.debug("Executing call {}, total executed {}", FateTxId.formatTid(tid),
- executedCalls.incrementAndGet());
+ LOG.debug("Executing call {}, total executed {}", fateId, executedCalls.incrementAndGet());
return null;
}
@Override
- public void undo(long tid, TestEnv environment) {
+ public void undo(FateId fateId, TestEnv environment) {
}