You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/01/27 00:45:42 UTC
[1/2] accumulo git commit: ACCUMULO-4575 Fixed concurrent delete
issue in FATE ops
Repository: accumulo
Updated Branches:
refs/heads/1.8 8668c7f38 -> bf5b6e0fa
ACCUMULO-4575 Fixed concurrent delete issue in FATE ops
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7b9a11ad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7b9a11ad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7b9a11ad
Branch: refs/heads/1.8
Commit: 7b9a11ad4d12f4572f606eaaed17f8fe78720e05
Parents: ff134d2
Author: Keith Turner <kt...@apache.org>
Authored: Thu Jan 26 17:47:22 2017 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Thu Jan 26 17:47:22 2017 -0500
----------------------------------------------------------------------
.../accumulo/master/FateServiceHandler.java | 15 +-
.../master/tableOps/CancelCompactions.java | 19 ++-
.../master/tableOps/ChangeTableState.java | 20 ++-
.../accumulo/master/tableOps/CompactRange.java | 30 ++--
.../master/tableOps/CompactionDriver.java | 14 +-
.../accumulo/master/tableOps/DeleteTable.java | 29 ++--
.../accumulo/master/tableOps/RenameTable.java | 19 ++-
.../accumulo/master/tableOps/TableRangeOp.java | 21 ++-
.../master/tableOps/TableRangeOpWait.java | 9 +-
.../apache/accumulo/master/tableOps/Utils.java | 9 +-
.../functional/ConcurrentDeleteTableIT.java | 167 ++++++++++++++++++-
11 files changed, 262 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index 5f0ddd2..5af612c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -185,7 +185,7 @@ class FateServiceHandler implements FateService.Iface {
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
try {
- master.fate.seedTransaction(opid, new TraceRepo<>(new RenameTable(tableId, oldTableName, newTableName)), autoCleanup);
+ master.fate.seedTransaction(opid, new TraceRepo<>(new RenameTable(namespaceId, tableId, oldTableName, newTableName)), autoCleanup);
} catch (NamespaceNotFoundException e) {
throw new ThriftTableOperationException(null, oldTableName, tableOp, TableOperationExceptionType.NAMESPACE_NOTFOUND, "");
}
@@ -273,7 +273,7 @@ class FateServiceHandler implements FateService.Iface {
if (!canOnlineOfflineTable)
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
- master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(tableId, tableOp)), autoCleanup);
+ master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(namespaceId, tableId, tableOp)), autoCleanup);
break;
}
case TABLE_OFFLINE: {
@@ -292,7 +292,7 @@ class FateServiceHandler implements FateService.Iface {
if (!canOnlineOfflineTable)
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
- master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(tableId, tableOp)), autoCleanup);
+ master.fate.seedTransaction(opid, new TraceRepo<>(new ChangeTableState(namespaceId, tableId, tableOp)), autoCleanup);
break;
}
case TABLE_MERGE: {
@@ -316,7 +316,7 @@ class FateServiceHandler implements FateService.Iface {
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
Master.log.debug("Creating merge op: " + tableId + " " + startRow + " " + endRow);
- master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.MERGE, tableId, startRow, endRow)), autoCleanup);
+ master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, startRow, endRow)), autoCleanup);
break;
}
case TABLE_DELETE_RANGE: {
@@ -339,7 +339,7 @@ class FateServiceHandler implements FateService.Iface {
if (!canDeleteRange)
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
- master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.DELETE, tableId, startRow, endRow)), autoCleanup);
+ master.fate.seedTransaction(opid, new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId, startRow, endRow)), autoCleanup);
break;
}
case TABLE_BULK_IMPORT: {
@@ -386,7 +386,8 @@ class FateServiceHandler implements FateService.Iface {
if (!canCompact)
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
- master.fate.seedTransaction(opid, new TraceRepo<>(new CompactRange(tableId, startRow, endRow, iterators, compactionStrategy)), autoCleanup);
+ master.fate
+ .seedTransaction(opid, new TraceRepo<>(new CompactRange(namespaceId, tableId, startRow, endRow, iterators, compactionStrategy)), autoCleanup);
break;
}
case TABLE_CANCEL_COMPACT: {
@@ -405,7 +406,7 @@ class FateServiceHandler implements FateService.Iface {
if (!canCancelCompact)
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
- master.fate.seedTransaction(opid, new TraceRepo<>(new CancelCompactions(tableId)), autoCleanup);
+ master.fate.seedTransaction(opid, new TraceRepo<>(new CancelCompactions(namespaceId, tableId)), autoCleanup);
break;
}
case TABLE_IMPORT: {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index e268f17..c98174e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.master.tableOps;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -31,15 +30,20 @@ public class CancelCompactions extends MasterRepo {
private static final long serialVersionUID = 1L;
private String tableId;
+ private String namespaceId;
- public CancelCompactions(String tableId) {
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT_CANCEL, this.namespaceId);
+ }
+
+ public CancelCompactions(String namespaceId, String tableId) {
this.tableId = tableId;
+ this.namespaceId = namespaceId;
}
@Override
- public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT_CANCEL)
+ public long isReady(long tid, Master env) throws Exception {
+ return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.COMPACT_CANCEL)
+ Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
}
@@ -73,9 +77,8 @@ public class CancelCompactions extends MasterRepo {
}
@Override
- public void undo(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- Utils.unreserveNamespace(namespaceId, tid, false);
+ public void undo(long tid, Master env) throws Exception {
Utils.unreserveTable(tableId, tid, false);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
index 8649570..ee6efa4 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.master.tableOps;
-import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.fate.Repo;
@@ -28,10 +27,16 @@ public class ChangeTableState extends MasterRepo {
private static final long serialVersionUID = 1L;
private String tableId;
+ private String namespaceId;
private TableOperation top;
- public ChangeTableState(String tableId, TableOperation top) {
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, top, this.namespaceId);
+ }
+
+ public ChangeTableState(String namespaceId, String tableId, TableOperation top) {
this.tableId = tableId;
+ this.namespaceId = namespaceId;
this.top = top;
if (top != TableOperation.ONLINE && top != TableOperation.OFFLINE)
@@ -39,21 +44,19 @@ public class ChangeTableState extends MasterRepo {
}
@Override
- public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
+ public long isReady(long tid, Master env) throws Exception {
// reserve the table so that this op does not run concurrently with create, clone, or delete table
- return Utils.reserveNamespace(namespaceId, tid, false, true, top) + Utils.reserveTable(tableId, tid, true, true, top);
+ return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, top) + Utils.reserveTable(tableId, tid, true, true, top);
}
@Override
public Repo<Master> call(long tid, Master env) throws Exception {
- String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId);
TableState ts = TableState.ONLINE;
if (top == TableOperation.OFFLINE)
ts = TableState.OFFLINE;
TableManager.getInstance().transitionTableState(tableId, ts);
- Utils.unreserveNamespace(namespaceId, tid, false);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
Utils.unreserveTable(tableId, tid, true);
LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state " + tableId + " " + ts);
env.getEventCoordinator().event("Set table state of %s to %s", tableId, ts);
@@ -62,8 +65,7 @@ public class ChangeTableState extends MasterRepo {
@Override
public void undo(long tid, Master env) throws Exception {
- String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId);
- Utils.unreserveNamespace(namespaceId, tid, false);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
Utils.unreserveTable(tableId, tid, true);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index 7a9c5d6..e641479 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@ -25,7 +25,6 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil;
-import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
@@ -47,18 +46,25 @@ public class CompactRange extends MasterRepo {
private static final long serialVersionUID = 1L;
private final String tableId;
+ private final String namespaceId;
private byte[] startRow;
private byte[] endRow;
private byte[] config;
- public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy)
- throws ThriftTableOperationException {
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT, this.namespaceId);
+ }
+
+ public CompactRange(String namespaceId, String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators,
+ CompactionStrategyConfig compactionStrategy) throws ThriftTableOperationException {
+ requireNonNull(namespaceId, "Invalid argument: null namespaceId");
requireNonNull(tableId, "Invalid argument: null tableId");
requireNonNull(iterators, "Invalid argument: null iterator list");
requireNonNull(compactionStrategy, "Invalid argument: null compactionStrategy");
this.tableId = tableId;
+ this.namespaceId = namespaceId;
this.startRow = startRow.length == 0 ? null : startRow;
this.endRow = endRow.length == 0 ? null : endRow;
@@ -74,15 +80,14 @@ public class CompactRange extends MasterRepo {
}
@Override
- public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT)
+ public long isReady(long tid, Master env) throws Exception {
+ return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.COMPACT)
+ Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT);
}
@Override
- public Repo<Master> call(final long tid, Master environment) throws Exception {
- String zTablePath = Constants.ZROOT + "/" + environment.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
+ public Repo<Master> call(final long tid, Master env) throws Exception {
+ String zTablePath = Constants.ZROOT + "/" + env.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
byte[] cid;
@@ -122,7 +127,7 @@ public class CompactRange extends MasterRepo {
}
});
- return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), tableId, startRow, endRow);
+ return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), getNamespaceId(env), tableId, startRow, endRow);
} catch (NoNodeException nne) {
throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
}
@@ -158,12 +163,11 @@ public class CompactRange extends MasterRepo {
}
@Override
- public void undo(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
+ public void undo(long tid, Master env) throws Exception {
try {
- removeIterators(environment, tid, tableId);
+ removeIterators(env, tid, tableId);
} finally {
- Utils.unreserveNamespace(namespaceId, tid, false);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
Utils.unreserveTable(tableId, tid, false);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java
index f630121..da60f89 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java
@@ -57,13 +57,18 @@ class CompactionDriver extends MasterRepo {
private long compactId;
private final String tableId;
+ private final String namespaceId;
private byte[] startRow;
private byte[] endRow;
- public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) {
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT, this.namespaceId);
+ }
+ public CompactionDriver(long compactId, String namespaceId, String tableId, byte[] startRow, byte[] endRow) {
this.compactId = compactId;
this.tableId = tableId;
+ this.namespaceId = namespaceId;
this.startRow = startRow;
this.endRow = endRow;
}
@@ -172,11 +177,10 @@ class CompactionDriver extends MasterRepo {
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- CompactRange.removeIterators(environment, tid, tableId);
+ public Repo<Master> call(long tid, Master env) throws Exception {
+ CompactRange.removeIterators(env, tid, tableId);
Utils.getReadLock(tableId, tid).unlock();
- Utils.getReadLock(namespaceId, tid).unlock();
+ Utils.getReadLock(getNamespaceId(env), tid).unlock();
return null;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
index 1eae5b9..e6267df 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@ -29,14 +29,8 @@ public class DeleteTable extends MasterRepo {
private String tableId;
private String namespaceId;
- private String getNamespaceId(Master environment) throws Exception {
- if (namespaceId == null) {
- // For ACCUMULO-4575 namespaceId was added in a bug fix release. Since it was added in bug fix release, we have to ensure we can properly deserialize
- // older versions. When deserializing an older version, namespaceId will be null. For this case revert to the old buggy behavior.
- return Utils.getNamespaceId(environment.getInstance(), tableId, TableOperation.DELETE);
- }
-
- return namespaceId;
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.DELETE, this.namespaceId);
}
public DeleteTable(String namespaceId, String tableId) {
@@ -45,24 +39,21 @@ public class DeleteTable extends MasterRepo {
}
@Override
- public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = getNamespaceId(environment);
- return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
+ public long isReady(long tid, Master env) throws Exception {
+ return Utils.reserveNamespace(getNamespaceId(env), tid, false, false, TableOperation.DELETE)
+ + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- String namespaceId = getNamespaceId(environment);
+ public Repo<Master> call(long tid, Master env) throws Exception {
TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
- environment.getEventCoordinator().event("deleting table %s ", tableId);
- return new CleanUp(tableId, namespaceId);
+ env.getEventCoordinator().event("deleting table %s ", tableId);
+ return new CleanUp(tableId, getNamespaceId(env));
}
@Override
- public void undo(long tid, Master environment) throws Exception {
- if (namespaceId != null) {
- Utils.unreserveNamespace(namespaceId, tid, false);
- }
+ public void undo(long tid, Master env) throws Exception {
Utils.unreserveTable(tableId, tid, true);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
index 053749f..80d3293 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
@@ -39,16 +39,22 @@ public class RenameTable extends MasterRepo {
private static final long serialVersionUID = 1L;
private String tableId;
+ private String namespaceId;
private String oldTableName;
private String newTableName;
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.RENAME, this.namespaceId);
+ }
+
@Override
- public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.RENAME) + Utils.reserveTable(tableId, tid, true, true, TableOperation.RENAME);
+ public long isReady(long tid, Master env) throws Exception {
+ return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.RENAME)
+ + Utils.reserveTable(tableId, tid, true, true, TableOperation.RENAME);
}
- public RenameTable(String tableId, String oldTableName, String newTableName) throws NamespaceNotFoundException {
+ public RenameTable(String namespaceId, String tableId, String oldTableName, String newTableName) throws NamespaceNotFoundException {
+ this.namespaceId = namespaceId;
this.tableId = tableId;
this.oldTableName = oldTableName;
this.newTableName = newTableName;
@@ -57,7 +63,7 @@ public class RenameTable extends MasterRepo {
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
Instance instance = master.getInstance();
- String namespaceId = Tables.getNamespaceId(instance, tableId);
+ String namespaceId = getNamespaceId(master);
Pair<String,String> qualifiedOldTableName = Tables.qualify(oldTableName);
Pair<String,String> qualifiedNewTableName = Tables.qualify(newTableName);
@@ -104,9 +110,8 @@ public class RenameTable extends MasterRepo {
@Override
public void undo(long tid, Master env) throws Exception {
- String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId);
Utils.unreserveTable(tableId, tid, true);
- Utils.unreserveNamespace(namespaceId, tid, false);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
index 879470b..64d08be 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.master.tableOps;
-import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
@@ -38,19 +37,24 @@ public class TableRangeOp extends MasterRepo {
private static final long serialVersionUID = 1L;
private final String tableId;
+ private final String namespaceId;
private byte[] startRow;
private byte[] endRow;
private Operation op;
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.MERGE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE);
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.MERGE, this.namespaceId);
}
- public TableRangeOp(MergeInfo.Operation op, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException {
+ @Override
+ public long isReady(long tid, Master env) throws Exception {
+ return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.MERGE)
+ + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE);
+ }
+ public TableRangeOp(MergeInfo.Operation op, String namespaceId, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException {
this.tableId = tableId;
+ this.namespaceId = namespaceId;
this.startRow = TextUtil.getBytes(startRow);
this.endRow = TextUtil.getBytes(endRow);
this.op = op;
@@ -81,20 +85,19 @@ public class TableRangeOp extends MasterRepo {
env.setMergeState(new MergeInfo(range, op), MergeState.STARTED);
}
- return new TableRangeOpWait(tableId);
+ return new TableRangeOpWait(getNamespaceId(env), tableId);
}
@Override
public void undo(long tid, Master env) throws Exception {
- String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId);
// Not sure this is a good thing to do. The Master state engine should be the one to remove it.
Text tableIdText = new Text(tableId);
MergeInfo mergeInfo = env.getMergeInfo(tableIdText);
if (mergeInfo.getState() != MergeState.NONE)
log.info("removing merge information " + mergeInfo);
env.clearMergeState(tableIdText);
- Utils.unreserveNamespace(namespaceId, tid, false);
Utils.unreserveTable(tableId, tid, true);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
index 668c790..5feb06d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
@@ -16,7 +16,7 @@
*/
package org.apache.accumulo.master.tableOps;
-import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.master.state.MergeInfo;
@@ -43,9 +43,11 @@ class TableRangeOpWait extends MasterRepo {
private static final long serialVersionUID = 1L;
private String tableId;
+ private String namespaceId;
- public TableRangeOpWait(String tableId) {
+ public TableRangeOpWait(String namespaceId, String tableId) {
this.tableId = tableId;
+ this.namespaceId = namespaceId;
}
@Override
@@ -59,13 +61,12 @@ class TableRangeOpWait extends MasterRepo {
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
- String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId);
Text tableIdText = new Text(tableId);
MergeInfo mergeInfo = master.getMergeInfo(tableIdText);
log.info("removing merge information " + mergeInfo);
master.clearMergeState(tableIdText);
- Utils.unreserveNamespace(namespaceId, tid, false);
Utils.unreserveTable(tableId, tid, true);
+ Utils.unreserveNamespace(Utils.getNamespaceId(master.getInstance(), tableId, TableOperation.MERGE, this.namespaceId), tid, false);
return null;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
index 9b921e2..d47bedf 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
@@ -116,7 +116,14 @@ public class Utils {
return 100;
}
- public static String getNamespaceId(Instance instance, String tableId, TableOperation op) throws Exception {
+ public static String getNamespaceId(Instance instance, String tableId, TableOperation op, String namespaceId) throws Exception {
+ if (namespaceId != null) {
+ return namespaceId;
+ }
+
+ // For ACCUMULO-4575 namespaceId was added in a bug fix release. Since it was added in bug fix release, we have to ensure we can properly deserialize
+ // older versions. When deserializing an older version, namespaceId will be null. For this case revert to the old buggy behavior.
+
try {
return Tables.getNamespaceId(instance, tableId);
} catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b9a11ad/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index 4798095..0c63e59 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -18,8 +18,11 @@
package org.apache.accumulo.test.functional;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -35,6 +38,8 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.Authorizations;
@@ -57,12 +62,7 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterIT {
final Connector c = getConnector();
String[] tables = getUniqueNames(2);
- TreeSet<Text> splits = new TreeSet<>();
-
- for (int i = 0; i < 1000; i++) {
- Text split = new Text(String.format("%09x", i * 100000));
- splits.add(split);
- }
+ TreeSet<Text> splits = createSplits();
ExecutorService es = Executors.newFixedThreadPool(20);
@@ -76,11 +76,12 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterIT {
}
count++;
- final CountDownLatch cdl = new CountDownLatch(20);
+ int numDeleteOps = 20;
+ final CountDownLatch cdl = new CountDownLatch(numDeleteOps);
List<Future<?>> futures = new ArrayList<>();
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < numDeleteOps; i++) {
Future<?> future = es.submit(new Runnable() {
@Override
@@ -121,6 +122,156 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterIT {
es.shutdown();
}
+ private TreeSet<Text> createSplits() {
+ TreeSet<Text> splits = new TreeSet<>();
+
+ for (int i = 0; i < 1000; i++) {
+ Text split = new Text(String.format("%09x", i * 100000));
+ splits.add(split);
+ }
+ return splits;
+ }
+
+ private static abstract class DelayedTableOp implements Runnable {
+ private CountDownLatch cdl;
+
+ DelayedTableOp(CountDownLatch cdl) {
+ this.cdl = cdl;
+ }
+
+ public void run() {
+ try {
+ cdl.countDown();
+ cdl.await();
+ Thread.sleep(10);
+ doTableOp();
+ } catch (TableNotFoundException e) {
+ // expected
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected abstract void doTableOp() throws Exception;
+ }
+
+ @Test
+ public void testConcurrentFateOpsWithDelete() throws Exception {
+ final Connector c = getConnector();
+ String[] tables = getUniqueNames(2);
+
+ TreeSet<Text> splits = createSplits();
+
+ int numOperations = 8;
+
+ ExecutorService es = Executors.newFixedThreadPool(numOperations);
+
+ int count = 0;
+ for (final String table : tables) {
+ c.tableOperations().create(table);
+ c.tableOperations().addSplits(table, splits);
+ writeData(c, table);
+ if (count == 1) {
+ c.tableOperations().flush(table, null, null, true);
+ }
+ count++;
+
+ // increment this for each test
+ final CountDownLatch cdl = new CountDownLatch(numOperations);
+
+ List<Future<?>> futures = new ArrayList<>();
+
+ futures.add(es.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ cdl.countDown();
+ cdl.await();
+ c.tableOperations().delete(table);
+ } catch (TableNotFoundException | TableOfflineException e) {
+ // expected
+ } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().compact(table, new CompactionConfig());
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().merge(table, null, null);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ Map<String,String> m = Collections.emptyMap();
+ Set<String> s = Collections.emptySet();
+ c.tableOperations().clone(table, table + "_clone", true, m, s);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().deleteRows(table, null, null);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().cancelCompaction(table);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().rename(table, table + "_renamed");
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().offline(table);
+ }
+ }));
+
+ Assert.assertEquals(numOperations, futures.size());
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ try {
+ c.createScanner(table, Authorizations.EMPTY);
+ Assert.fail("Expected table " + table + " to be gone.");
+ } catch (TableNotFoundException tnfe) {
+ // expected
+ }
+
+ FateStatus fateStatus = getFateStatus();
+
+ // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks.
+ Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
+ Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+ }
+
+ es.shutdown();
+ }
+
private FateStatus getFateStatus() throws KeeperException, InterruptedException {
Instance instance = getConnector().getInstance();
AdminUtil<String> admin = new AdminUtil<>(false);
[2/2] accumulo git commit: Merge branch '1.7' into 1.8
Posted by kt...@apache.org.
Merge branch '1.7' into 1.8
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bf5b6e0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bf5b6e0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bf5b6e0f
Branch: refs/heads/1.8
Commit: bf5b6e0fae4974bb77b6409c21c5770d815b991f
Parents: 8668c7f 7b9a11a
Author: Keith Turner <kt...@apache.org>
Authored: Thu Jan 26 19:24:14 2017 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Thu Jan 26 19:24:14 2017 -0500
----------------------------------------------------------------------
.../accumulo/master/FateServiceHandler.java | 15 +-
.../master/tableOps/CancelCompactions.java | 19 ++-
.../master/tableOps/ChangeTableState.java | 20 ++-
.../accumulo/master/tableOps/CompactRange.java | 30 ++--
.../master/tableOps/CompactionDriver.java | 14 +-
.../accumulo/master/tableOps/DeleteTable.java | 29 ++--
.../accumulo/master/tableOps/RenameTable.java | 19 ++-
.../accumulo/master/tableOps/TableRangeOp.java | 20 ++-
.../master/tableOps/TableRangeOpWait.java | 9 +-
.../apache/accumulo/master/tableOps/Utils.java | 9 +-
.../functional/ConcurrentDeleteTableIT.java | 167 ++++++++++++++++++-
11 files changed, 262 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index d7d5b14,e641479..f3d53a7
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@@ -24,11 -24,10 +24,10 @@@ import java.util.List
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil;
- import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
@@@ -51,9 -51,14 +51,14 @@@ public class CompactRange extends Maste
private byte[] endRow;
private byte[] config;
- public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy)
- throws AcceptableThriftTableOperationException {
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT, this.namespaceId);
+ }
+
+ public CompactRange(String namespaceId, String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators,
- CompactionStrategyConfig compactionStrategy) throws ThriftTableOperationException {
++ CompactionStrategyConfig compactionStrategy) throws AcceptableThriftTableOperationException {
+ requireNonNull(namespaceId, "Invalid argument: null namespaceId");
requireNonNull(tableId, "Invalid argument: null tableId");
requireNonNull(iterators, "Invalid argument: null iterator list");
requireNonNull(compactionStrategy, "Invalid argument: null compactionStrategy");
@@@ -122,9 -127,9 +127,9 @@@
}
});
- return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), tableId, startRow, endRow);
+ return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), getNamespaceId(env), tableId, startRow, endRow);
} catch (NoNodeException nne) {
- throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
+ throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
index e2e7018,64d08be..d91755d
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
@@@ -16,10 -16,9 +16,9 @@@
*/
package org.apache.accumulo.master.tableOps;
+import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
- import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.util.TextUtil;
@@@ -42,15 -42,19 +42,19 @@@ public class TableRangeOp extends Maste
private byte[] endRow;
private Operation op;
- @Override
- public long isReady(long tid, Master environment) throws Exception {
- String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
- return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.MERGE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE);
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.MERGE, this.namespaceId);
}
- public TableRangeOp(MergeInfo.Operation op, String tableId, Text startRow, Text endRow) throws AcceptableThriftTableOperationException {
+ @Override
+ public long isReady(long tid, Master env) throws Exception {
+ return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.MERGE)
+ + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE);
+ }
- public TableRangeOp(MergeInfo.Operation op, String namespaceId, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException {
++ public TableRangeOp(MergeInfo.Operation op, String namespaceId, String tableId, Text startRow, Text endRow) throws AcceptableThriftTableOperationException {
this.tableId = tableId;
+ this.namespaceId = namespaceId;
this.startRow = TextUtil.getBytes(startRow);
this.endRow = TextUtil.getBytes(endRow);
this.op = op;
@@@ -85,14 -90,14 +89,14 @@@
@Override
public void undo(long tid, Master env) throws Exception {
- String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId);
// Not sure this is a good thing to do. The Master state engine should be the one to remove it.
- Text tableIdText = new Text(tableId);
- MergeInfo mergeInfo = env.getMergeInfo(tableIdText);
+ MergeInfo mergeInfo = env.getMergeInfo(tableId);
if (mergeInfo.getState() != MergeState.NONE)
log.info("removing merge information " + mergeInfo);
- env.clearMergeState(tableIdText);
+ env.clearMergeState(tableId);
+ Utils.unreserveNamespace(namespaceId, tid, false);
Utils.unreserveTable(tableId, tid, true);
+ Utils.unreserveNamespace(getNamespaceId(env), tid, false);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
index a7c82b1,5feb06d..1194c67
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
@@@ -57,12 -61,12 +59,11 @@@ class TableRangeOpWait extends MasterRe
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
- String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId);
- Text tableIdText = new Text(tableId);
- MergeInfo mergeInfo = master.getMergeInfo(tableIdText);
+ MergeInfo mergeInfo = master.getMergeInfo(tableId);
log.info("removing merge information " + mergeInfo);
- master.clearMergeState(tableIdText);
+ master.clearMergeState(tableId);
- Utils.unreserveNamespace(namespaceId, tid, false);
Utils.unreserveTable(tableId, tid, true);
+ Utils.unreserveNamespace(Utils.getNamespaceId(master.getInstance(), tableId, TableOperation.MERGE, this.namespaceId), tid, false);
return null;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index 3f7a305,0000000..5808804
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@@ -1,147 -1,0 +1,298 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
++import java.util.Collections;
+import java.util.List;
++import java.util.Map;
+import java.util.Random;
++import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
++import org.apache.accumulo.core.client.TableOfflineException;
++import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.AdminUtil.FateStatus;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConcurrentDeleteTableIT extends AccumuloClusterHarness {
+
+ @Test
+ public void testConcurrentDeleteTablesOps() throws Exception {
+ final Connector c = getConnector();
+ String[] tables = getUniqueNames(2);
+
- TreeSet<Text> splits = new TreeSet<>();
-
- for (int i = 0; i < 1000; i++) {
- Text split = new Text(String.format("%09x", i * 100000));
- splits.add(split);
- }
++ TreeSet<Text> splits = createSplits();
+
+ ExecutorService es = Executors.newFixedThreadPool(20);
+
+ int count = 0;
+ for (final String table : tables) {
+ c.tableOperations().create(table);
+ c.tableOperations().addSplits(table, splits);
+ writeData(c, table);
+ if (count == 1) {
+ c.tableOperations().flush(table, null, null, true);
+ }
+ count++;
+
- final CountDownLatch cdl = new CountDownLatch(20);
++ int numDeleteOps = 20;
++ final CountDownLatch cdl = new CountDownLatch(numDeleteOps);
+
+ List<Future<?>> futures = new ArrayList<>();
+
- for (int i = 0; i < 20; i++) {
++ for (int i = 0; i < numDeleteOps; i++) {
+ Future<?> future = es.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ cdl.countDown();
+ cdl.await();
+ c.tableOperations().delete(table);
+ } catch (TableNotFoundException e) {
+ // expected
+ } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ try {
+ c.createScanner(table, Authorizations.EMPTY);
+ Assert.fail("Expected table " + table + " to be gone.");
+ } catch (TableNotFoundException tnfe) {
+ // expected
+ }
+
+ FateStatus fateStatus = getFateStatus();
+
+ // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks.
+ Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
+ Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+ }
+
+ es.shutdown();
+ }
+
++ private TreeSet<Text> createSplits() {
++ TreeSet<Text> splits = new TreeSet<>();
++
++ for (int i = 0; i < 1000; i++) {
++ Text split = new Text(String.format("%09x", i * 100000));
++ splits.add(split);
++ }
++ return splits;
++ }
++
++ private static abstract class DelayedTableOp implements Runnable {
++ private CountDownLatch cdl;
++
++ DelayedTableOp(CountDownLatch cdl) {
++ this.cdl = cdl;
++ }
++
++ public void run() {
++ try {
++ cdl.countDown();
++ cdl.await();
++ Thread.sleep(10);
++ doTableOp();
++ } catch (TableNotFoundException e) {
++ // expected
++ } catch (RuntimeException e) {
++ throw e;
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ protected abstract void doTableOp() throws Exception;
++ }
++
++ @Test
++ public void testConcurrentFateOpsWithDelete() throws Exception {
++ final Connector c = getConnector();
++ String[] tables = getUniqueNames(2);
++
++ TreeSet<Text> splits = createSplits();
++
++ int numOperations = 8;
++
++ ExecutorService es = Executors.newFixedThreadPool(numOperations);
++
++ int count = 0;
++ for (final String table : tables) {
++ c.tableOperations().create(table);
++ c.tableOperations().addSplits(table, splits);
++ writeData(c, table);
++ if (count == 1) {
++ c.tableOperations().flush(table, null, null, true);
++ }
++ count++;
++
++ // increment this for each test
++ final CountDownLatch cdl = new CountDownLatch(numOperations);
++
++ List<Future<?>> futures = new ArrayList<>();
++
++ futures.add(es.submit(new Runnable() {
++ @Override
++ public void run() {
++ try {
++ cdl.countDown();
++ cdl.await();
++ c.tableOperations().delete(table);
++ } catch (TableNotFoundException | TableOfflineException e) {
++ // expected
++ } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) {
++ throw new RuntimeException(e);
++ }
++ }
++ }));
++
++ futures.add(es.submit(new DelayedTableOp(cdl) {
++ @Override
++ protected void doTableOp() throws Exception {
++ c.tableOperations().compact(table, new CompactionConfig());
++ }
++ }));
++
++ futures.add(es.submit(new DelayedTableOp(cdl) {
++ @Override
++ protected void doTableOp() throws Exception {
++ c.tableOperations().merge(table, null, null);
++ }
++ }));
++
++ futures.add(es.submit(new DelayedTableOp(cdl) {
++ @Override
++ protected void doTableOp() throws Exception {
++ Map<String,String> m = Collections.emptyMap();
++ Set<String> s = Collections.emptySet();
++ c.tableOperations().clone(table, table + "_clone", true, m, s);
++ }
++ }));
++
++ futures.add(es.submit(new DelayedTableOp(cdl) {
++ @Override
++ protected void doTableOp() throws Exception {
++ c.tableOperations().deleteRows(table, null, null);
++ }
++ }));
++
++ futures.add(es.submit(new DelayedTableOp(cdl) {
++ @Override
++ protected void doTableOp() throws Exception {
++ c.tableOperations().cancelCompaction(table);
++ }
++ }));
++
++ futures.add(es.submit(new DelayedTableOp(cdl) {
++ @Override
++ protected void doTableOp() throws Exception {
++ c.tableOperations().rename(table, table + "_renamed");
++ }
++ }));
++
++ futures.add(es.submit(new DelayedTableOp(cdl) {
++ @Override
++ protected void doTableOp() throws Exception {
++ c.tableOperations().offline(table);
++ }
++ }));
++
++ Assert.assertEquals(numOperations, futures.size());
++
++ for (Future<?> future : futures) {
++ future.get();
++ }
++
++ try {
++ c.createScanner(table, Authorizations.EMPTY);
++ Assert.fail("Expected table " + table + " to be gone.");
++ } catch (TableNotFoundException tnfe) {
++ // expected
++ }
++
++ FateStatus fateStatus = getFateStatus();
++
++ // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks.
++ Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
++ Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
++ }
++
++ es.shutdown();
++ }
++
+ private FateStatus getFateStatus() throws KeeperException, InterruptedException {
+ Instance instance = getConnector().getInstance();
+ AdminUtil<String> admin = new AdminUtil<>(false);
+ String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
+ IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
+ ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
+ FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null);
+ return fateStatus;
+ }
+
+ private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+ try {
+ Random rand = new Random();
+ for (int i = 0; i < 1000; i++) {
+ Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000)));
+ m.put("m", "order", "" + i);
+ bw.addMutation(m);
+ }
+ } finally {
+ bw.close();
+ }
+ }
+}