You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2022/09/21 12:52:41 UTC
[accumulo] branch main updated: Deprecate FateCommand (#2914)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new c19694c7a1 Deprecate FateCommand (#2914)
c19694c7a1 is described below
commit c19694c7a1121786017ce2e53c72f74ec0f1b324
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Sep 21 12:52:36 2022 +0000
Deprecate FateCommand (#2914)
* Deprecate FateCommand by adding warning to usage & Shell log
* Move new code for cancel and summary options to Admin, printing message in Shell.
* Drop pagination option for Admin
* Drop recently added tests from FateCommandTest
* Supports #2215
Co-authored-by: Dave Marion <dl...@apache.org>
Co-authored-by: EdColeman <de...@etcoleman.com>
---
.../java/org/apache/accumulo/fate/AdminUtil.java | 10 +-
.../java/org/apache/accumulo/fate/FateTxId.java | 8 +
.../org/apache/accumulo/server/util/Admin.java | 203 +++++++++++++++++++++
.../util}/fateCommand/FateSummaryReport.java | 18 +-
.../server/util}/fateCommand/FateTxnDetails.java | 6 +-
.../util}/fateCommand/SummaryReportTest.java | 2 +-
.../server/util}/fateCommand/TxnDetailsTest.java | 2 +-
.../apache/accumulo/manager/util/FateAdmin.java | 2 +-
.../accumulo/shell/commands/FateCommand.java | 121 ++----------
.../accumulo/shell/commands/FateCommandTest.java | 71 -------
.../apache/accumulo/test/shell/ShellServerIT.java | 2 +-
11 files changed, 248 insertions(+), 197 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/fate/AdminUtil.java
index e22d005687..80607a12f2 100644
--- a/core/src/main/java/org/apache/accumulo/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/fate/AdminUtil.java
@@ -406,15 +406,15 @@ public class AdminUtil<T> {
return (filterTxid == null) || filterTxid.isEmpty() || filterTxid.contains(tid);
}
- public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPath lockPath)
- throws KeeperException, InterruptedException {
- print(zs, zk, lockPath, new Formatter(System.out), null, null);
+ public void printAll(ReadOnlyTStore<T> zs, ZooReader zk,
+ ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException {
+ print(zs, zk, tableLocksPath, new Formatter(System.out), null, null);
}
- public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPath lockPath,
+ public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath,
Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
throws KeeperException, InterruptedException {
- FateStatus fateStatus = getStatus(zs, zk, lockPath, filterTxid, filterStatus);
+ FateStatus fateStatus = getStatus(zs, zk, tableLocksPath, filterTxid, filterStatus);
for (TransactionStatus txStatus : fateStatus.getTransactions()) {
fmt.format(
diff --git a/core/src/main/java/org/apache/accumulo/fate/FateTxId.java b/core/src/main/java/org/apache/accumulo/fate/FateTxId.java
index 07b2704a16..5aff99d9bf 100644
--- a/core/src/main/java/org/apache/accumulo/fate/FateTxId.java
+++ b/core/src/main/java/org/apache/accumulo/fate/FateTxId.java
@@ -59,4 +59,12 @@ public class FateTxId {
return FastFormat.toHexString(PREFIX, tid, SUFFIX);
}
+ public static long parseTidFromUserInput(String s) {
+ if (isFormatedTid(s)) {
+ return fromString(s);
+ } else {
+ return Long.parseLong(s, 16);
+ }
+ }
+
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 3275d3d1cd..f6605bd6f3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.server.util;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
+import static org.apache.accumulo.fate.FateTxId.parseTidFromUserInput;
import java.io.BufferedWriter;
import java.io.File;
@@ -28,12 +29,16 @@ import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Formatter;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.Constants;
@@ -47,7 +52,10 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.NamespacePermission;
@@ -58,12 +66,20 @@ import org.apache.accumulo.core.singletons.SingletonManager.Mode;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.tables.TableMap;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.ReadOnlyStore;
+import org.apache.accumulo.fate.ReadOnlyTStore;
+import org.apache.accumulo.fate.ZooStore;
import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.cli.ServerUtilOpts;
import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.server.util.fateCommand.FateSummaryReport;
import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -198,6 +214,39 @@ public class Admin implements KeywordExecutable {
String file;
}
+ @Parameters(commandNames = "fate",
+ commandDescription = "Operations performed on the Manager FaTE system.")
+ static class FateOpsCommand {
+ @Parameter(description = "[<txId>...]")
+ List<String> txList = new ArrayList<>();
+
+ @Parameter(names = {"-c", "--cancel"},
+ description = "<txId>[ <txId>...] Cancel new or submitted FaTE transactions")
+ boolean cancel;
+
+ @Parameter(names = {"-f", "--fail"},
+ description = "<txId>[ <txId>...] Transition FaTE transaction status to FAILED_IN_PROGRESS (requires Manager to be down)")
+ boolean fail;
+
+ @Parameter(names = {"-d", "--delete"},
+ description = "<txId>[ <txId>...] Delete locks associated with transactions (Requires Manager to be down)")
+ boolean delete;
+
+ @Parameter(names = {"-p", "--print", "-l", "--list"},
+ description = "[txId <txId>...] Print information about FaTE transactions. Print only the 'txId's specified or print all transactions if empty. Use -s to only print certain states.")
+ boolean print;
+
+ @Parameter(names = "--summary", description = "Print a summary of all FaTE transactions")
+ boolean summarize;
+
+ @Parameter(names = {"-j", "--json"}, description = "Print transactions in json")
+ boolean printJson;
+
+ @Parameter(names = {"-s", "--state"},
+ description = "<state>[ <state>...] Print transactions in the state(s) {NEW, IN_PROGRESS, FAILED_IN_PROGRESS, FAILED, SUCCESSFUL}")
+ List<String> states = new ArrayList<>();
+ }
+
public static void main(String[] args) {
new Admin().execute(args);
}
@@ -226,6 +275,9 @@ public class Admin implements KeywordExecutable {
JCommander cl = new JCommander(opts);
cl.setProgramName("accumulo admin");
+ FateOpsCommand fateOpsCommand = new FateOpsCommand();
+ cl.addCommand("fate", fateOpsCommand);
+
ChangeSecretCommand changeSecretCommand = new ChangeSecretCommand();
cl.addCommand("changeSecret", changeSecretCommand);
@@ -331,6 +383,8 @@ public class Admin implements KeywordExecutable {
} else if (cl.getParsedCommand().equals("locks")) {
TabletServerLocks.execute(context, args.length > 2 ? args[2] : null,
tServerLocksOpts.delete);
+ } else if (cl.getParsedCommand().equals("fate")) {
+ executeFateOpsCommand(context, fateOpsCommand);
} else {
everything = cl.getParsedCommand().equals("stopAll");
@@ -700,4 +754,153 @@ public class Admin implements KeywordExecutable {
}
}
}
+
+ // Fate Operations
+ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOpsCommand)
+ throws AccumuloException, AccumuloSecurityException, InterruptedException, KeeperException {
+
+ validateFateUserInput(fateOpsCommand);
+
+ AdminUtil<Admin> admin = new AdminUtil<>(true);
+ final String zkRoot = context.getZooKeeperRoot();
+ var zLockManagerPath = ServiceLock.path(zkRoot + Constants.ZMANAGER_LOCK);
+ var zTableLocksPath = ServiceLock.path(zkRoot + Constants.ZTABLE_LOCKS);
+ String fateZkPath = zkRoot + Constants.ZFATE;
+ ZooReaderWriter zk = context.getZooReaderWriter();
+ ZooStore<Admin> zs = new ZooStore<>(fateZkPath, zk);
+
+ if (fateOpsCommand.cancel) {
+ cancelSubmittedFateTxs(context, fateOpsCommand.txList);
+ } else if (fateOpsCommand.fail) {
+ for (String txid : fateOpsCommand.txList) {
+ if (!admin.prepFail(zs, zk, zLockManagerPath, txid)) {
+ throw new AccumuloException("Could not fail transaction: " + txid);
+ }
+ }
+ } else if (fateOpsCommand.delete) {
+ for (String txid : fateOpsCommand.txList) {
+ if (!admin.prepDelete(zs, zk, zLockManagerPath, txid)) {
+ throw new AccumuloException("Could not delete transaction: " + txid);
+ }
+ admin.deleteLocks(zk, zTableLocksPath, txid);
+ }
+ }
+
+ ReadOnlyStore<Admin> readOnlyStore = new ReadOnlyStore<>(zs);
+
+ if (fateOpsCommand.print) {
+ final Set<Long> sortedTxs = new TreeSet<>();
+ fateOpsCommand.txList.forEach(s -> sortedTxs.add(parseTidFromUserInput(s)));
+ if (!fateOpsCommand.txList.isEmpty()) {
+ EnumSet<ReadOnlyTStore.TStatus> statusFilter =
+ getCmdLineStatusFilters(fateOpsCommand.states);
+ admin.print(readOnlyStore, zk, zTableLocksPath, new Formatter(System.out), sortedTxs,
+ statusFilter);
+ } else {
+ admin.printAll(readOnlyStore, zk, zTableLocksPath);
+ }
+ // print line break at the end
+ System.out.println();
+ }
+
+ if (fateOpsCommand.summarize) {
+ summarizeFateTx(context, fateOpsCommand, admin, readOnlyStore, zTableLocksPath);
+ }
+ }
+
+ private void validateFateUserInput(FateOpsCommand cmd) {
+ if (cmd.cancel && cmd.fail || cmd.cancel && cmd.delete || cmd.fail && cmd.delete) {
+ throw new IllegalArgumentException(
+ "Can only perform one of the following at a time: cancel, fail or delete.");
+ }
+ if ((cmd.cancel || cmd.fail || cmd.delete) && cmd.txList.isEmpty()) {
+ throw new IllegalArgumentException(
+ "At least one txId required when using cancel, fail or delete");
+ }
+ }
+
+ private void cancelSubmittedFateTxs(ServerContext context, List<String> txList)
+ throws AccumuloException {
+ for (String txStr : txList) {
+ long txid = Long.parseLong(txStr, 16);
+ boolean cancelled = cancelFateOperation(context, txid);
+ if (cancelled) {
+ System.out.println("FaTE transaction " + txid + " was cancelled or already completed.");
+ } else {
+ System.out
+ .println("FaTE transaction " + txid + " was not cancelled, status may have changed.");
+ }
+ }
+ }
+
+ private boolean cancelFateOperation(ClientContext context, long txid) throws AccumuloException {
+ FateService.Client client = null;
+ try {
+ client = ThriftClientTypes.FATE.getConnectionWithRetry(context);
+ return client.cancelFateOperation(TraceUtil.traceInfo(), context.rpcCreds(), txid);
+ } catch (Exception e) {
+ throw new AccumuloException(e);
+ } finally {
+ if (client != null)
+ ThriftUtil.close(client, context);
+ }
+ }
+
+ private void summarizeFateTx(ServerContext context, FateOpsCommand cmd, AdminUtil<Admin> admin,
+ ReadOnlyStore<Admin> zs, ServiceLock.ServiceLockPath tableLocksPath)
+ throws InterruptedException, AccumuloException, AccumuloSecurityException, KeeperException {
+
+ ZooReaderWriter zk = context.getZooReaderWriter();
+ var transactions = admin.getStatus(zs, zk, tableLocksPath, null, null);
+
+ // build id map - relies on unique ids for tables and namespaces
+ // used to look up the names of either table or namespace by id.
+ Map<TableId,String> tidToNameMap = new TableMap(context).getIdtoNameMap();
+ Map<String,String> idsToNameMap = new HashMap<>(tidToNameMap.size() * 2);
+ tidToNameMap.forEach((tid, name) -> idsToNameMap.put(tid.canonical(), "t:" + name));
+ context.namespaceOperations().namespaceIdMap().forEach((name, nsid) -> {
+ String prev = idsToNameMap.put(nsid, "ns:" + name);
+ if (prev != null) {
+ log.warn("duplicate id found for table / namespace id. table name: {}, namespace name: {}",
+ prev, name);
+ }
+ });
+
+ EnumSet<ReadOnlyTStore.TStatus> statusFilter = getCmdLineStatusFilters(cmd.states);
+
+ FateSummaryReport report = new FateSummaryReport(idsToNameMap, statusFilter);
+
+ // gather statistics
+ transactions.getTransactions().forEach(report::gatherTxnStatus);
+ if (cmd.printJson) {
+ printLines(Collections.singletonList(report.toJson()));
+ } else {
+ printLines(report.formatLines());
+ }
+ }
+
+ private void printLines(List<String> lines) {
+ for (String nextLine : lines) {
+ if (nextLine == null) {
+ continue;
+ }
+ System.out.println(nextLine);
+ }
+ }
+
+ /**
+ * If provided on the command line, get the TStatus values provided.
+ *
+ * @return a set of status filters, or an empty set if none provides
+ */
+ private EnumSet<ReadOnlyTStore.TStatus> getCmdLineStatusFilters(List<String> states) {
+ EnumSet<ReadOnlyTStore.TStatus> statusFilter = null;
+ if (!states.isEmpty()) {
+ statusFilter = EnumSet.noneOf(ReadOnlyTStore.TStatus.class);
+ for (String element : states) {
+ statusFilter.add(ReadOnlyTStore.TStatus.valueOf(element));
+ }
+ }
+ return statusFilter;
+ }
}
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/fateCommand/FateSummaryReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java
similarity index 92%
rename from shell/src/main/java/org/apache/accumulo/shell/commands/fateCommand/FateSummaryReport.java
rename to server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java
index 5dec5ca3c2..3ab8d16ad4 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/fateCommand/FateSummaryReport.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.shell.commands.fateCommand;
+package org.apache.accumulo.server.util.fateCommand;
import java.time.Instant;
import java.time.ZoneId;
@@ -130,20 +130,20 @@ public class FateSummaryReport {
lines.add(String.format("Report Time: %s",
fmt.format(Instant.ofEpochMilli(reportTime).truncatedTo(ChronoUnit.SECONDS))));
- lines.add("Status counts:\n\n");
- statusCounts.forEach((status, count) -> lines.add(String.format(" %s: %d\n", status, count)));
+ lines.add("Status counts:");
+ statusCounts.forEach((status, count) -> lines.add(String.format(" %s: %d", status, count)));
- lines.add("\nCommand counts:\n\n");
- cmdCounts.forEach((cmd, count) -> lines.add(String.format(" %s: %d\n", cmd, count)));
+ lines.add("Command counts:");
+ cmdCounts.forEach((cmd, count) -> lines.add(String.format(" %s: %d", cmd, count)));
- lines.add("\nStep counts:\n\n");
- stepCounts.forEach((step, count) -> lines.add(String.format(" %s: %d\n", step, count)));
+ lines.add("Step counts:");
+ stepCounts.forEach((step, count) -> lines.add(String.format(" %s: %d", step, count)));
- lines.add("\nFate transactions (oldest first):\n\n");
+ lines.add("\nFate transactions (oldest first):");
lines.add("Status Filters: "
+ (statusFilterNames.isEmpty() ? "[NONE]" : statusFilterNames.toString()));
- lines.add("\n" + FateTxnDetails.TXN_HEADER);
+ lines.add(FateTxnDetails.TXN_HEADER);
fateDetails.forEach(txnDetails -> lines.add(txnDetails.toString()));
return lines;
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/fateCommand/FateTxnDetails.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
similarity index 97%
rename from shell/src/main/java/org/apache/accumulo/shell/commands/fateCommand/FateTxnDetails.java
rename to server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
index 12ff2e1c96..6fc4fccbb6 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/fateCommand/FateTxnDetails.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.shell.commands.fateCommand;
+package org.apache.accumulo.server.util.fateCommand;
import java.time.Duration;
import java.util.ArrayList;
@@ -28,7 +28,7 @@ import org.apache.accumulo.fate.AdminUtil;
public class FateTxnDetails implements Comparable<FateTxnDetails> {
final static String TXN_HEADER =
- "Running\ttxn_id\t\t\t\tStatus\t\tCommand\t\tStep (top)\t\tlocks held:(table id, name)\tlocks waiting:(table id, name)\n";
+ "Running\ttxn_id\t\t\t\tStatus\t\tCommand\t\tStep (top)\t\tlocks held:(table id, name)\tlocks waiting:(table id, name)";
private long running;
private String status = "?";
@@ -134,7 +134,7 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> {
elapsed.toSecondsPart());
return hms + "\t" + txnId + "\t" + status + "\t" + txName + "\t" + step + "\theld:"
- + locksHeld.toString() + "\twaiting:" + locksWaiting.toString() + "\n";
+ + locksHeld.toString() + "\twaiting:" + locksWaiting.toString();
}
}
diff --git a/shell/src/test/java/org/apache/accumulo/shell/commands/fateCommand/SummaryReportTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java
similarity index 98%
rename from shell/src/test/java/org/apache/accumulo/shell/commands/fateCommand/SummaryReportTest.java
rename to server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java
index 14543e81d8..70ac62c93e 100644
--- a/shell/src/test/java/org/apache/accumulo/shell/commands/fateCommand/SummaryReportTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.shell.commands.fateCommand;
+package org.apache.accumulo.server.util.fateCommand;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
diff --git a/shell/src/test/java/org/apache/accumulo/shell/commands/fateCommand/TxnDetailsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java
similarity index 98%
rename from shell/src/test/java/org/apache/accumulo/shell/commands/fateCommand/TxnDetailsTest.java
rename to server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java
index df3d99b488..b553c5c1b8 100644
--- a/shell/src/test/java/org/apache/accumulo/shell/commands/fateCommand/TxnDetailsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.shell.commands.fateCommand;
+package org.apache.accumulo.server.util.fateCommand;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/util/FateAdmin.java b/server/manager/src/main/java/org/apache/accumulo/manager/util/FateAdmin.java
index 9b8d1ab26d..ed76b52ec8 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/util/FateAdmin.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/util/FateAdmin.java
@@ -102,7 +102,7 @@ public class FateAdmin {
admin.deleteLocks(zk, zTableLocksPath, txid);
}
} else if (jc.getParsedCommand().equals("print")) {
- admin.print(new ReadOnlyStore<>(zs), zk, zTableLocksPath);
+ admin.printAll(new ReadOnlyStore<>(zs), zk, zTableLocksPath);
}
}
}
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java
index a364bbbdf6..371a8e2057 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java
@@ -19,19 +19,17 @@
package org.apache.accumulo.shell.commands;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.FateTxId.parseTidFromUserInput;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Formatter;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.Constants;
@@ -40,15 +38,8 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.manager.thrift.FateService;
-import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
-import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.FastFormat;
-import org.apache.accumulo.core.util.tables.TableMap;
import org.apache.accumulo.fate.AdminUtil;
-import org.apache.accumulo.fate.FateTxId;
import org.apache.accumulo.fate.ReadOnlyRepo;
import org.apache.accumulo.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.fate.Repo;
@@ -58,15 +49,12 @@ import org.apache.accumulo.fate.zookeeper.ServiceLock.ServiceLockPath;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.Shell.Command;
-import org.apache.accumulo.shell.commands.fateCommand.FateSummaryReport;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -79,8 +67,8 @@ import com.google.gson.JsonSerializer;
* Manage FATE transactions
*/
public class FateCommand extends Command {
-
- private final static Logger LOG = LoggerFactory.getLogger(FateCommand.class);
+ private static final String warning =
+ "WARNING: This command is deprecated for removal. Use 'accumulo admin'\n";
// this class serializes references to interfaces with the concrete class name
private static class InterfaceSerializer<T> implements JsonSerializer<T> {
@@ -134,14 +122,6 @@ public class FateCommand extends Command {
private Option statusOption;
private Option disablePaginationOpt;
- private long parseTxid(String s) {
- if (FateTxId.isFormatedTid(s)) {
- return FateTxId.fromString(s);
- } else {
- return Long.parseLong(s, 16);
- }
- }
-
protected String getZKRoot(ClientContext context) {
return context.getZooKeeperRoot();
}
@@ -162,6 +142,8 @@ public class FateCommand extends Command {
public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws ParseException, KeeperException, InterruptedException, IOException, AccumuloException,
AccumuloSecurityException {
+ Shell.log.warn(warning);
+
ClientContext context = shellState.getContext();
boolean failedCommand = false;
@@ -177,7 +159,8 @@ public class FateCommand extends Command {
if (cl.hasOption(cancel.getOpt())) {
String[] txids = cl.getOptionValues(cancel.getOpt());
validateArgs(txids);
- failedCommand = cancelSubmittedTxs(shellState, txids);
+ System.out.println(
+ "Option not available. Use 'accumulo admin fate -c " + String.join(" ", txids) + "'");
} else if (cl.hasOption(fail.getOpt())) {
String[] txids = cl.getOptionValues(fail.getOpt());
validateArgs(txids);
@@ -191,8 +174,7 @@ public class FateCommand extends Command {
} else if (cl.hasOption(print.getOpt())) {
printTx(shellState, admin, zs, zk, tableLocksPath, cl.getOptionValues(print.getOpt()), cl);
} else if (cl.hasOption(summary.getOpt())) {
- summarizeTx(shellState, admin, zs, zk, tableLocksPath, cl.getOptionValues(summary.getOpt()),
- cl);
+ System.out.println("Option not available. Use 'accumulo admin fate --summary'");
} else if (cl.hasOption(dump.getOpt())) {
String output = dumpTx(zs, cl.getOptionValues(dump.getOpt()));
System.out.println(output);
@@ -210,7 +192,7 @@ public class FateCommand extends Command {
} else {
txids = new ArrayList<>();
for (int i = 1; i < args.length; i++) {
- txids.add(parseTxid(args[i]));
+ txids.add(parseTidFromUserInput(args[i]));
}
}
@@ -236,7 +218,7 @@ public class FateCommand extends Command {
if (args != null && args.length >= 1) {
for (int i = 0; i < args.length; i++) {
if (!args[i].isEmpty()) {
- Long val = parseTxid(args[i]);
+ Long val = parseTidFromUserInput(args[i]);
filterTxid.add(val);
}
}
@@ -252,42 +234,6 @@ public class FateCommand extends Command {
!cl.hasOption(disablePaginationOpt.getOpt()));
}
- protected void summarizeTx(Shell shellState, AdminUtil<FateCommand> admin,
- ZooStore<FateCommand> zs, ZooReaderWriter zk, ServiceLockPath tableLocksPath, String[] args,
- CommandLine cl) throws InterruptedException, AccumuloException, AccumuloSecurityException,
- KeeperException, IOException {
-
- var transactions = admin.getStatus(zs, zk, tableLocksPath, null, null);
-
- // build id map - relies on unique ids for tables and namespaces
- // used to look up the names of either table or namespace by id.
- Map<TableId,String> tidToNameMap = new TableMap(shellState.getContext()).getIdtoNameMap();
- Map<String,String> idsToNameMap = new HashMap<>(tidToNameMap.size() * 2);
- tidToNameMap.forEach((tid, name) -> idsToNameMap.put(tid.canonical(), "t:" + name));
- shellState.getContext().namespaceOperations().namespaceIdMap().forEach((name, nsid) -> {
- String prev = idsToNameMap.put(nsid, "ns:" + name);
- if (prev != null) {
- LOG.warn("duplicate id found for table / namespace id. table name: {}, namespace name: {}",
- prev, name);
- }
- });
-
- EnumSet<TStatus> statusFilter = getCmdLineStatusFilters(cl);
-
- FateSummaryReport report = new FateSummaryReport(idsToNameMap, statusFilter);
-
- // gather statistics
- transactions.getTransactions().forEach(report::gatherTxnStatus);
- if (Arrays.asList(cl.getArgs()).contains("json")) {
- shellState.printLines(Collections.singletonList(report.toJson()).iterator(),
- !cl.hasOption(disablePaginationOpt.getOpt()));
- } else {
- // print the formatted report by lines to allow pagination
- shellState.printLines(report.formatLines().iterator(),
- !cl.hasOption(disablePaginationOpt.getOpt()));
- }
- }
-
protected boolean deleteTx(AdminUtil<FateCommand> admin, ZooStore<FateCommand> zs,
ZooReaderWriter zk, ServiceLockPath zLockManagerPath, String[] args)
throws InterruptedException, KeeperException {
@@ -308,47 +254,6 @@ public class FateCommand extends Command {
}
}
- protected boolean cancelSubmittedTxs(final Shell shellState, String[] args)
- throws AccumuloException, AccumuloSecurityException {
- ClientContext context = shellState.getContext();
- for (int i = 1; i < args.length; i++) {
- long txid = Long.parseLong(args[i], 16);
- shellState.getWriter().flush();
- String line = shellState.getReader().readLine("Cancel FaTE Tx " + txid + " (yes|no)? ");
- boolean cancelTx =
- line != null && (line.equalsIgnoreCase("y") || line.equalsIgnoreCase("yes"));
- if (cancelTx) {
- boolean cancelled = cancelFateOperation(context, txid, shellState);
- if (cancelled) {
- shellState.getWriter()
- .println("FaTE transaction " + txid + " was cancelled or already completed.");
- } else {
- shellState.getWriter()
- .println("FaTE transaction " + txid + " was not cancelled, status may have changed.");
- }
- } else {
- shellState.getWriter().println("Not cancelling FaTE transaction " + txid);
- }
- }
- return true;
- }
-
- private static boolean cancelFateOperation(ClientContext context, long txid,
- final Shell shellState) throws AccumuloException, AccumuloSecurityException {
- FateService.Client client = null;
- try {
- client = ThriftClientTypes.FATE.getConnectionWithRetry(context);
- return client.cancelFateOperation(TraceUtil.traceInfo(), context.rpcCreds(), txid);
- } catch (Exception e) {
- shellState.getWriter()
- .println("ManagerClient request failed, retrying. Cause: " + e.getMessage());
- throw new AccumuloException(e);
- } finally {
- if (client != null)
- ThriftUtil.close(client, context);
- }
- }
-
public boolean failTx(AdminUtil<FateCommand> admin, ZooStore<FateCommand> zs, ZooReaderWriter zk,
ServiceLockPath managerLockPath, String[] args) {
boolean success = true;
@@ -441,6 +346,12 @@ public class FateCommand extends Command {
return -1;
}
+ @Override
+ public String usage() {
+ String msg = super.usage();
+ return warning + msg;
+ }
+
/**
* If provided on the command line, get the TStatus values provided.
*
diff --git a/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java b/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java
index ef73b028ea..b2a23fe1a0 100644
--- a/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java
+++ b/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java
@@ -36,8 +36,6 @@ import java.io.PrintStream;
import java.nio.file.Files;
import java.util.List;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.fate.AdminUtil;
import org.apache.accumulo.fate.ReadOnlyRepo;
@@ -65,9 +63,7 @@ public class FateCommandTest {
private boolean dumpCalled = false;
private boolean deleteCalled = false;
private boolean failCalled = false;
- private boolean cancelCalled = false;
private boolean printCalled = false;
- private boolean summarizeCalled = false;
@Override
public String getName() {
@@ -104,13 +100,6 @@ public class FateCommandTest {
return true;
}
- @Override
- protected boolean cancelSubmittedTxs(Shell shellState, String[] args)
- throws AccumuloException, AccumuloSecurityException {
- cancelCalled = true;
- return true;
- }
-
@Override
public boolean failTx(AdminUtil<FateCommand> admin, ZooStore<FateCommand> zs,
ZooReaderWriter zk, ServiceLockPath managerLockPath, String[] args) {
@@ -125,20 +114,11 @@ public class FateCommandTest {
printCalled = true;
}
- @Override
- protected void summarizeTx(Shell shellState, AdminUtil<FateCommand> admin,
- ZooStore<FateCommand> zs, ZooReaderWriter zk, ServiceLockPath tableLocksPath, String[] args,
- CommandLine cl) {
- summarizeCalled = true;
- }
-
public void reset() {
dumpCalled = false;
deleteCalled = false;
failCalled = false;
- cancelCalled = false;
printCalled = false;
- summarizeCalled = false;
}
}
@@ -247,42 +227,6 @@ public class FateCommandTest {
verify(zs, zk);
}
- @Test
- public void testSummary() throws IOException, InterruptedException, AccumuloException,
- AccumuloSecurityException, KeeperException {
- reset(zk);
- PrintStream out = System.out;
- File config = Files.createTempFile(null, null).toFile();
- TestOutputStream output = new TestOutputStream();
- Shell shell = createShell(output);
-
- ServiceLockPath tableLocksPath = ServiceLock.path("/accumulo" + ZTABLE_LOCKS);
- ZooStore<FateCommand> zs = createMock(ZooStore.class);
- expect(zk.getChildren(tableLocksPath.toString())).andReturn(List.of("5")).anyTimes();
- expect(zk.getChildren("/accumulo/table_locks/5")).andReturn(List.of()).anyTimes();
- expect(zs.list()).andReturn(List.of()).anyTimes();
-
- replay(zs, zk);
-
- TestHelper helper = new TestHelper(true);
- FateCommand cmd = new TestFateCommand();
- var options = cmd.getOptions();
- CommandLine cli = new CommandLine.Builder().addOption(options.getOption("summary"))
- .addOption(options.getOption("np")).build();
-
- try {
- cmd.summarizeTx(shell, helper, zs, zk, tableLocksPath, cli.getOptionValues("list"), cli);
- } finally {
- output.clear();
- System.setOut(out);
- if (config.exists()) {
- assertTrue(config.delete());
- }
- }
-
- verify(zs, zk);
- }
-
@Test
public void testCommandLineOptions() throws Exception {
PrintStream out = System.out;
@@ -300,17 +244,6 @@ public class FateCommandTest {
shell.execCommand("fate -?", true, false);
Shell.log.info("{}", output.get());
shell.execCommand("fate --help", true, false);
- shell.execCommand("fate cancel", true, false);
- assertFalse(cmd.cancelCalled);
- cmd.reset();
- shell.execCommand("fate -cancel", true, false);
- assertFalse(cmd.cancelCalled);
- cmd.reset();
- shell.execCommand("fate -cancel 12345", true, false);
- assertTrue(cmd.cancelCalled);
- cmd.reset();
- shell.execCommand("fate --cancel-submitted 12345 67890", true, false);
- assertTrue(cmd.cancelCalled);
cmd.reset();
shell.execCommand("fate delete", true, false);
assertFalse(cmd.deleteCalled);
@@ -371,10 +304,6 @@ public class FateCommandTest {
cmd.reset();
shell.execCommand("fate --list 12345 67890", true, false);
assertTrue(cmd.printCalled);
- shell.execCommand("fate -summary", true, false);
- assertTrue(cmd.summarizeCalled);
- shell.execCommand("fate --summary", true, false);
- assertTrue(cmd.summarizeCalled);
cmd.reset();
} finally {
shell.shutdown();
diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index 2a804524da..1ba1ebf1f3 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -78,7 +78,7 @@ import org.apache.accumulo.core.util.format.FormatterConfig;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.shell.commands.fateCommand.FateSummaryReport;
+import org.apache.accumulo.server.util.fateCommand.FateSummaryReport;
import org.apache.accumulo.test.compaction.TestCompactionStrategy;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.conf.Configuration;