You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/01/25 17:19:45 UTC
accumulo git commit: ACCUMULO-4575 Fixed concurrent delete table bug
Repository: accumulo
Updated Branches:
refs/heads/1.7 4ea66d4a4 -> df400c59e
ACCUMULO-4575 Fixed concurrent delete table bug
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/df400c59
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/df400c59
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/df400c59
Branch: refs/heads/1.7
Commit: df400c59efd2274d8714cad4c9d3648bb0845c50
Parents: 4ea66d4
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jan 25 12:19:01 2017 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Wed Jan 25 12:19:01 2017 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/fate/AdminUtil.java | 104 +++++++++++--
.../accumulo/cluster/AccumuloCluster.java | 6 +
.../standalone/StandaloneAccumuloCluster.java | 12 ++
.../impl/MiniAccumuloClusterImpl.java | 8 +
.../accumulo/master/FateServiceHandler.java | 2 +-
.../accumulo/master/tableOps/DeleteTable.java | 25 +++-
.../apache/accumulo/master/tableOps/Utils.java | 13 ++
.../test/functional/BackupMasterIT.java | 7 +-
.../functional/ConcurrentDeleteTableIT.java | 147 +++++++++++++++++++
9 files changed, 303 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
index f6aa811..b8baa67 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.fate.ReadOnlyTStore.TStatus;
+import org.apache.accumulo.fate.zookeeper.IZooReader;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -62,11 +63,77 @@ public class AdminUtil<T> {
this.exitOnError = exitOnError;
}
- public void print(ReadOnlyTStore<T> zs, IZooReaderWriter zk, String lockPath) throws KeeperException, InterruptedException {
- print(zs, zk, lockPath, new Formatter(System.out), null, null);
+ public static class TransactionStatus {
+
+ private final long txid;
+ private final TStatus status;
+ private final String debug;
+ private final List<String> hlocks;
+ private final List<String> wlocks;
+ private final String top;
+
+ private TransactionStatus(Long tid, TStatus status, String debug, List<String> hlocks, List<String> wlocks, String top) {
+ this.txid = tid;
+ this.status = status;
+ this.debug = debug;
+ this.hlocks = Collections.unmodifiableList(hlocks);
+ this.wlocks = Collections.unmodifiableList(wlocks);
+ this.top = top;
+ }
+
+ public long getTxid() {
+ return txid;
+ }
+
+ public TStatus getStatus() {
+ return status;
+ }
+
+ public String getDebug() {
+ return debug;
+ }
+
+ public List<String> getHeldLocks() {
+ return hlocks;
+ }
+
+ public List<String> getWaitingLocks() {
+ return wlocks;
+ }
+
+ public String getTop() {
+ return top;
+ }
+
}
- public void print(ReadOnlyTStore<T> zs, IZooReaderWriter zk, String lockPath, Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
+ public static class FateStatus {
+
+ private final List<TransactionStatus> transactions;
+ private final Map<Long,List<String>> danglingHeldLocks;
+ private final Map<Long,List<String>> danglingWaitingLocks;
+
+ private FateStatus(List<TransactionStatus> transactions, Map<Long,List<String>> danglingHeldLocks, Map<Long,List<String>> danglingWaitingLocks) {
+ this.transactions = Collections.unmodifiableList(transactions);
+ this.danglingHeldLocks = Collections.unmodifiableMap(danglingHeldLocks);
+ this.danglingWaitingLocks = Collections.unmodifiableMap(danglingWaitingLocks);
+
+ }
+
+ public List<TransactionStatus> getTransactions() {
+ return transactions;
+ }
+
+ public Map<Long,List<String>> getDanglingHeldLocks() {
+ return danglingHeldLocks;
+ }
+
+ public Map<Long,List<String>> getDanglingWaitingLocks() {
+ return danglingWaitingLocks;
+ }
+ }
+
+ public FateStatus getStatus(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
throws KeeperException, InterruptedException {
Map<Long,List<String>> heldLocks = new HashMap<>();
Map<Long,List<String>> waitingLocks = new HashMap<>();
@@ -118,13 +185,12 @@ public class AdminUtil<T> {
} catch (Exception e) {
log.error("Failed to read locks for " + id + " continuing.", e);
- fmt.format("Failed to read locks for %s continuing", id);
}
}
List<Long> transactions = zs.list();
+ List<TransactionStatus> statuses = new ArrayList<>(transactions.size());
- long txCount = 0;
for (Long tid : transactions) {
zs.reserve(tid);
@@ -152,17 +218,33 @@ public class AdminUtil<T> {
if ((filterTxid != null && !filterTxid.contains(tid)) || (filterStatus != null && !filterStatus.contains(status)))
continue;
- ++txCount;
- fmt.format("txid: %016x status: %-18s op: %-15s locked: %-15s locking: %-15s top: %s%n", tid, status, debug, hlocks, wlocks, top);
+ statuses.add(new TransactionStatus(tid, status, debug, hlocks, wlocks, top));
+ }
+
+ return new FateStatus(statuses, heldLocks, waitingLocks);
+ }
+
+ public void print(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath) throws KeeperException, InterruptedException {
+ print(zs, zk, lockPath, new Formatter(System.out), null, null);
+ }
+
+ public void print(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath, Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
+ throws KeeperException, InterruptedException {
+
+ FateStatus fateStatus = getStatus(zs, zk, lockPath, filterTxid, filterStatus);
+
+ for (TransactionStatus txStatus : fateStatus.getTransactions()) {
+ fmt.format("txid: %016x status: %-18s op: %-15s locked: %-15s locking: %-15s top: %s%n", txStatus.getTxid(), txStatus.getStatus(),
+ txStatus.getDebug(), txStatus.getHeldLocks(), txStatus.getWaitingLocks(), txStatus.getTop());
}
- fmt.format(" %s transactions", txCount);
+ fmt.format(" %s transactions", fateStatus.getTransactions().size());
- if (heldLocks.size() != 0 || waitingLocks.size() != 0) {
+ if (fateStatus.getDanglingHeldLocks().size() != 0 || fateStatus.getDanglingWaitingLocks().size() != 0) {
fmt.format("%nThe following locks did not have an associated FATE operation%n");
- for (Entry<Long,List<String>> entry : heldLocks.entrySet())
+ for (Entry<Long,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet())
fmt.format("txid: %016x locked: %s%n", entry.getKey(), entry.getValue());
- for (Entry<Long,List<String>> entry : waitingLocks.entrySet())
+ for (Entry<Long,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet())
fmt.format("txid: %016x locking: %s%n", entry.getKey(), entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
index 767633b..8e80358 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -57,6 +58,11 @@ public interface AccumuloCluster {
ClientConfiguration getClientConfig();
/**
+ * Get server side config derived from accumulo-site.xml
+ */
+ AccumuloConfiguration getSiteConfiguration();
+
+ /**
* Get an object that can manage a cluster
*
* @return Manage the state of the cluster
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
index 1baa3a1..ad84f2f 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
@@ -32,6 +32,8 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.master.thrift.MasterGoalState;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.minicluster.ServerType;
@@ -41,6 +43,8 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
+
/**
* AccumuloCluster implementation to connect to an existing deployment of Accumulo
*/
@@ -184,4 +188,12 @@ public class StandaloneAccumuloCluster implements AccumuloCluster {
checkArgument(offset >= 0 && offset < users.size(), "Invalid offset, should be non-negative and less than " + users.size());
return users.get(offset);
}
+
+ @Override
+ public AccumuloConfiguration getSiteConfiguration() {
+ Configuration conf = new Configuration(false);
+ Path accumuloSite = new Path(serverAccumuloConfDir, "accumulo-site.xml");
+ conf.addResource(accumuloSite);
+ return new ConfigurationCopy(Iterables.concat(AccumuloConfiguration.getDefaultConfiguration(), conf));
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index f2e5c7c..79ad527 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@ -66,6 +66,7 @@ import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -110,6 +111,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
/**
@@ -820,4 +822,10 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
return new Path(tmp.toString());
}
}
+
+ @Override
+ public AccumuloConfiguration getSiteConfiguration() {
+ // TODO Auto-generated method stub
+ return new ConfigurationCopy(Iterables.concat(AccumuloConfiguration.getDefaultConfiguration(), config.getSiteConfig().entrySet()));
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index 09a90b5..5f0ddd2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -254,7 +254,7 @@ class FateServiceHandler implements FateService.Iface {
if (!canDeleteTable)
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
- master.fate.seedTransaction(opid, new TraceRepo<>(new DeleteTable(tableId)), autoCleanup);
+ master.fate.seedTransaction(opid, new TraceRepo<>(new DeleteTable(namespaceId, tableId)), autoCleanup);
break;
}
case TABLE_ONLINE: {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
index a1158f4..1eae5b9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.master.tableOps;
-import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.fate.Repo;
@@ -28,20 +27,32 @@ public class DeleteTable extends MasterRepo {
private static final long serialVersionUID = 1L;
private String tableId;
+ private String namespaceId;
- public DeleteTable(String tableId) {
+ private String getNamespaceId(Master environment) throws Exception {
+ if (namespaceId == null) {
+ // For ACCUMULO-4575 namespaceId was added in a bug fix release. Since it was added in bug fix release, we have to ensure we can properly deserialize
+ // older versions. When deserializing an older version, namespaceId will be null. For this case revert to the old buggy behavior.
+ return Utils.getNamespaceId(environment.getInstance(), tableId, TableOperation.DELETE);
+ }
+
+ return namespaceId;
+ }
+
+ public DeleteTable(String namespaceId, String tableId) {
+ this.namespaceId = namespaceId;
this.tableId = tableId;
}
@Override
public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
+ String namespaceId = getNamespaceId(environment);
return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
}
@Override
public Repo<Master> call(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
+ String namespaceId = getNamespaceId(environment);
TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
environment.getEventCoordinator().event("deleting table %s ", tableId);
return new CleanUp(tableId, namespaceId);
@@ -49,9 +60,9 @@ public class DeleteTable extends MasterRepo {
@Override
public void undo(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- Utils.unreserveNamespace(namespaceId, tid, false);
+ if (namespaceId != null) {
+ Utils.unreserveNamespace(namespaceId, tid, false);
+ }
Utils.unreserveTable(tableId, tid, true);
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
index 0fb9138..9b921e2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
@@ -116,6 +116,19 @@ public class Utils {
return 100;
}
+ public static String getNamespaceId(Instance instance, String tableId, TableOperation op) throws Exception {
+ try {
+ return Tables.getNamespaceId(instance, tableId);
+ } catch (RuntimeException e) {
+ // see if this was caused because the table does not exists
+ IZooReaderWriter zk = ZooReaderWriter.getInstance();
+ if (!zk.exists(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId))
+ throw new ThriftTableOperationException(tableId, "", op, TableOperationExceptionType.NOTFOUND, "Table does not exist");
+ else
+ throw e;
+ }
+ }
+
public static long reserveHdfsDirectory(String directory, long tid) throws KeeperException, InterruptedException {
Instance instance = HdfsZooInstance.getInstance();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
index efed7a4..8621ab1 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
@@ -19,10 +19,12 @@ package org.apache.accumulo.test.functional;
import java.util.Collections;
import java.util.List;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
import org.junit.Test;
public class BackupMasterIT extends ConfigurableMacIT {
@@ -39,7 +41,8 @@ public class BackupMasterIT extends ConfigurableMacIT {
// create a backup
Process backup = exec(Master.class);
try {
- ZooReaderWriter writer = new ZooReaderWriter(cluster.getZooKeepers(), 30 * 1000, "digest", "accumulo:DONTTELL".getBytes());
+ String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
+ IZooReaderWriter writer = new ZooReaderWriterFactory().getZooReaderWriter(cluster.getZooKeepers(), 30 * 1000, secret);
String root = "/accumulo/" + getConnector().getInstance().getInstanceID();
List<String> children = Collections.emptyList();
// wait for 2 lock entries
http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
new file mode 100644
index 0000000..4798095
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.AdminUtil.FateStatus;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConcurrentDeleteTableIT extends AccumuloClusterIT {
+
+ @Test
+ public void testConcurrentDeleteTablesOps() throws Exception {
+ final Connector c = getConnector();
+ String[] tables = getUniqueNames(2);
+
+ TreeSet<Text> splits = new TreeSet<>();
+
+ for (int i = 0; i < 1000; i++) {
+ Text split = new Text(String.format("%09x", i * 100000));
+ splits.add(split);
+ }
+
+ ExecutorService es = Executors.newFixedThreadPool(20);
+
+ int count = 0;
+ for (final String table : tables) {
+ c.tableOperations().create(table);
+ c.tableOperations().addSplits(table, splits);
+ writeData(c, table);
+ if (count == 1) {
+ c.tableOperations().flush(table, null, null, true);
+ }
+ count++;
+
+ final CountDownLatch cdl = new CountDownLatch(20);
+
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int i = 0; i < 20; i++) {
+ Future<?> future = es.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ cdl.countDown();
+ cdl.await();
+ c.tableOperations().delete(table);
+ } catch (TableNotFoundException e) {
+ // expected
+ } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ try {
+ c.createScanner(table, Authorizations.EMPTY);
+ Assert.fail("Expected table " + table + " to be gone.");
+ } catch (TableNotFoundException tnfe) {
+ // expected
+ }
+
+ FateStatus fateStatus = getFateStatus();
+
+ // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks.
+ Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
+ Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+ }
+
+ es.shutdown();
+ }
+
+ private FateStatus getFateStatus() throws KeeperException, InterruptedException {
+ Instance instance = getConnector().getInstance();
+ AdminUtil<String> admin = new AdminUtil<>(false);
+ String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
+ IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
+ ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
+ FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null);
+ return fateStatus;
+ }
+
+ private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+ try {
+ Random rand = new Random();
+ for (int i = 0; i < 1000; i++) {
+ Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000)));
+ m.put("m", "order", "" + i);
+ bw.addMutation(m);
+ }
+ } finally {
+ bw.close();
+ }
+ }
+}