You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/01 17:06:40 UTC
[24/50] hadoop git commit: HDDS-273. DeleteLog entries should be
purged only after corresponding DNs commit the transaction. Contributed by
Lokesh Jain.
HDDS-273. DeleteLog entries should be purged only after corresponding DNs commit the transaction. Contributed by Lokesh Jain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/feb795b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/feb795b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/feb795b5
Branch: refs/heads/HDFS-12943
Commit: feb795b58d2a3c20bdbddea1638a83f6637d3fc9
Parents: 6b038f8
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Sun Jul 29 01:02:24 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Sun Jul 29 01:02:24 2018 +0530
----------------------------------------------------------------------
.../DeleteBlocksCommandHandler.java | 12 +-
.../StorageContainerDatanodeProtocol.proto | 4 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 2 +-
.../block/DatanodeDeletedBlockTransactions.java | 47 ++--
.../hadoop/hdds/scm/block/DeletedBlockLog.java | 23 +-
.../hdds/scm/block/DeletedBlockLogImpl.java | 123 ++++++----
.../scm/server/SCMDatanodeProtocolServer.java | 19 +-
.../hdds/scm/block/TestDeletedBlockLog.java | 232 ++++++++++---------
8 files changed, 256 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 9640f93..b0d4cbc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -113,8 +113,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
DeleteBlockTransactionResult.Builder txResultBuilder =
DeleteBlockTransactionResult.newBuilder();
txResultBuilder.setTxID(entry.getTxID());
+ long containerId = entry.getContainerID();
try {
- long containerId = entry.getContainerID();
Container cont = containerSet.getContainer(containerId);
if (cont == null) {
throw new StorageContainerException("Unable to find the container "
@@ -126,7 +126,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
KeyValueContainerData containerData = (KeyValueContainerData)
cont.getContainerData();
deleteKeyValueContainerBlocks(containerData, entry);
- txResultBuilder.setSuccess(true);
+ txResultBuilder.setContainerID(containerId)
+ .setSuccess(true);
break;
default:
LOG.error(
@@ -136,9 +137,12 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
} catch (IOException e) {
LOG.warn("Failed to delete blocks for container={}, TXID={}",
entry.getContainerID(), entry.getTxID(), e);
- txResultBuilder.setSuccess(false);
+ txResultBuilder.setContainerID(containerId)
+ .setSuccess(false);
}
- resultBuilder.addResults(txResultBuilder.build());
+ resultBuilder.addResults(txResultBuilder.build())
+ .setDnId(context.getParent().getDatanodeDetails()
+ .getUuid().toString());
});
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index d89567b..0c52efb 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -229,9 +229,11 @@ message DeletedBlocksTransaction {
message ContainerBlocksDeletionACKProto {
message DeleteBlockTransactionResult {
required int64 txID = 1;
- required bool success = 2;
+ required int64 containerID = 2;
+ required bool success = 3;
}
repeated DeleteBlockTransactionResult results = 1;
+ required string dnId = 2;
}
// SendACK response returned by datanode to SCM, currently empty.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 6825ca4..8e1c2cc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -112,7 +112,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service.
- deletedBlockLog = new DeletedBlockLogImpl(conf);
+ deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
long svcInterval =
conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index d71e7b0..e33a700 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -53,7 +53,8 @@ public class DatanodeDeletedBlockTransactions {
this.nodeNum = nodeNum;
}
- public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
+ public void addTransaction(DeletedBlocksTransaction tx,
+ Set<UUID> dnsWithTransactionCommitted) throws IOException {
Pipeline pipeline = null;
try {
pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
@@ -71,29 +72,37 @@ public class DatanodeDeletedBlockTransactions {
for (DatanodeDetails dd : pipeline.getMachines()) {
UUID dnID = dd.getUuid();
- if (transactions.containsKey(dnID)) {
- List<DeletedBlocksTransaction> txs = transactions.get(dnID);
- if (txs != null && txs.size() < maximumAllowedTXNum) {
- boolean hasContained = false;
- for (DeletedBlocksTransaction t : txs) {
- if (t.getContainerID() == tx.getContainerID()) {
- hasContained = true;
- break;
- }
- }
+ if (dnsWithTransactionCommitted == null ||
+ !dnsWithTransactionCommitted.contains(dnID)) {
+ // Transaction need not be sent to dns which have already committed it
+ addTransactionToDN(dnID, tx);
+ }
+ }
+ }
- if (!hasContained) {
- txs.add(tx);
- currentTXNum++;
+ private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
+ if (transactions.containsKey(dnID)) {
+ List<DeletedBlocksTransaction> txs = transactions.get(dnID);
+ if (txs != null && txs.size() < maximumAllowedTXNum) {
+ boolean hasContained = false;
+ for (DeletedBlocksTransaction t : txs) {
+ if (t.getContainerID() == tx.getContainerID()) {
+ hasContained = true;
+ break;
}
}
- } else {
- currentTXNum++;
- transactions.put(dnID, tx);
+
+ if (!hasContained) {
+ txs.add(tx);
+ currentTXNum++;
+ }
}
- SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID,
- tx.getTxID());
+ } else {
+ currentTXNum++;
+ transactions.put(dnID, tx);
}
+ SCMBlockDeletingService.LOG
+ .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
}
Set<UUID> getDatanodeIDs() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index 28103be..2bb5686 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -18,12 +18,16 @@
package org.apache.hadoop.hdds.scm.block;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+ .DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
/**
* The DeletedBlockLog is a persisted log in SCM to keep tracking
@@ -34,18 +38,6 @@ import java.util.Map;
public interface DeletedBlockLog extends Closeable {
/**
- * A limit size list of transactions. Note count is the max number
- * of TXs to return, we might not be able to always return this
- * number. and the processCount of those transactions
- * should be [0, MAX_RETRY).
- *
- * @param count - number of transactions.
- * @return a list of BlockDeletionTransaction.
- */
- List<DeletedBlocksTransaction> getTransactions(int count)
- throws IOException;
-
- /**
* Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
* Once DatanodeDeletedBlockTransactions is full, the scan behavior will
* stop.
@@ -81,10 +73,11 @@ public interface DeletedBlockLog extends Closeable {
* Commits a transaction means to delete all footprints of a transaction
* from the log. This method doesn't guarantee all transactions can be
* successfully deleted, it tolerate failures and tries best efforts to.
- *
- * @param txIDs - transaction IDs.
+ * @param transactionResults - delete block transaction results.
+ * @param dnID - ID of datanode which acknowledges the delete block command.
*/
- void commitTransactions(List<Long> txIDs) throws IOException;
+ void commitTransactions(List<DeleteBlockTransactionResult> transactionResults,
+ UUID dnID);
/**
* Creates a block deletion transaction and adds that into the log.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 48fa2eb..752c9c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -21,27 +21,36 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+ .DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
@@ -74,12 +83,15 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
private final int maxRetry;
private final MetadataStore deletedStore;
+ private final Mapping containerManager;
private final Lock lock;
// The latest id of deleted blocks in the db.
private long lastTxID;
- private long lastReadTxID;
+ // Maps txId to set of DNs which are successful in committing the transaction
+ private Map<Long, Set<UUID>> transactionToDNsCommitMap;
- public DeletedBlockLogImpl(Configuration conf) throws IOException {
+ public DeletedBlockLogImpl(Configuration conf, Mapping containerManager)
+ throws IOException {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
@@ -95,11 +107,17 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
.setDbFile(deletedLogDbPath)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
+ this.containerManager = containerManager;
this.lock = new ReentrantLock();
// start from the head of deleted store.
- lastReadTxID = 0;
lastTxID = findLatestTxIDInStore();
+
+ // transactionToDNsCommitMap is updated only when
+ // transaction is added to the log and when it is removed.
+
+ // maps transaction to dns which have committed it.
+ transactionToDNsCommitMap = new ConcurrentHashMap<>();
}
@VisibleForTesting
@@ -124,39 +142,6 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
}
@Override
- public List<DeletedBlocksTransaction> getTransactions(
- int count) throws IOException {
- List<DeletedBlocksTransaction> result = new ArrayList<>();
- MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
- -> Longs.fromByteArray(currentKey) > lastReadTxID;
- MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
- -> !Arrays.equals(LATEST_TXID, currentKey);
- lock.lock();
- try {
- deletedStore.iterate(null, (key, value) -> {
- if (getNextTxID.filterKey(null, key, null) &&
- avoidInvalidTxid.filterKey(null, key, null)) {
- DeletedBlocksTransaction block = DeletedBlocksTransaction
- .parseFrom(value);
- if (block.getCount() > -1 && block.getCount() <= maxRetry) {
- result.add(block);
- }
- }
- return result.size() < count;
- });
- // Scan the metadata from the beginning.
- if (result.size() < count || result.size() < 1) {
- lastReadTxID = 0;
- } else {
- lastReadTxID = result.get(result.size() - 1).getTxID();
- }
- } finally {
- lock.unlock();
- }
- return result;
- }
-
- @Override
public List<DeletedBlocksTransaction> getFailedTransactions()
throws IOException {
lock.lock();
@@ -235,18 +220,50 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
/**
* {@inheritDoc}
*
- * @param txIDs - transaction IDs.
+ * @param transactionResults - transaction IDs.
+ * @param dnID - Id of Datanode which has acknowledged a delete block command.
* @throws IOException
*/
@Override
- public void commitTransactions(List<Long> txIDs) throws IOException {
+ public void commitTransactions(
+ List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
lock.lock();
try {
- for (Long txID : txIDs) {
+ Set<UUID> dnsWithCommittedTxn;
+ for (DeleteBlockTransactionResult transactionResult : transactionResults) {
+ if (isTransactionFailed(transactionResult)) {
+ continue;
+ }
try {
- deletedStore.delete(Longs.toByteArray(txID));
- } catch (IOException ex) {
- LOG.warn("Cannot commit txID " + txID, ex);
+ long txID = transactionResult.getTxID();
+ // set of dns which have successfully committed transaction txId.
+ dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
+ Long containerId = transactionResult.getContainerID();
+ if (dnsWithCommittedTxn == null || containerId == null) {
+ LOG.warn("Transaction txId={} commit by dnId={} failed."
+ + " Corresponding entry not found.", txID, dnID);
+ return;
+ }
+
+ dnsWithCommittedTxn.add(dnID);
+ Collection<DatanodeDetails> containerDnsDetails =
+ containerManager.getContainerWithPipeline(containerId)
+ .getPipeline().getDatanodes().values();
+ // The delete entry can be safely removed from the log if all the
+ // corresponding nodes commit the txn.
+ if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) {
+ List<UUID> containerDns = containerDnsDetails.stream()
+ .map(dnDetails -> dnDetails.getUuid())
+ .collect(Collectors.toList());
+ if (dnsWithCommittedTxn.containsAll(containerDns)) {
+ transactionToDNsCommitMap.remove(txID);
+ LOG.debug("Purging txId={} from block deletion log", txID);
+ deletedStore.delete(Longs.toByteArray(txID));
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not commit delete block transaction: " +
+ transactionResult.getTxID(), e);
}
}
} finally {
@@ -254,6 +271,20 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
}
}
+ private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
+ result.getTxID(), result.getSuccess());
+ }
+ if (!result.getSuccess()) {
+ LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ + "TX in next interval", result.getTxID());
+ return true;
+ }
+ return false;
+ }
+
/**
* {@inheritDoc}
*
@@ -355,7 +386,9 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
.parseFrom(value);
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
- transactions.addTransaction(block);
+ Set<UUID> dnsWithTransactionCommitted = transactionToDNsCommitMap
+ .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
+ transactions.addTransaction(block, dnsWithTransactionCommitted);
}
return !transactions.isFull();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index aee64b9..0d34787 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -91,9 +91,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
@@ -230,21 +230,8 @@ public class SCMDatanodeProtocolServer implements
ContainerBlocksDeletionACKProto acks) throws IOException {
if (acks.getResultsCount() > 0) {
List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
- for (DeleteBlockTransactionResult result : resultList) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
- + "success={}", result.getTxID(), result.getSuccess());
- }
- if (result.getSuccess()) {
- LOG.debug("Purging TXID={} from block deletion log",
- result.getTxID());
- scm.getScmBlockManager().getDeletedBlockLog()
- .commitTransactions(Collections.singletonList(result.getTxID()));
- } else {
- LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
- + "TX in next interval", result.getTxID());
- }
- }
+ scm.getScmBlockManager().getDeletedBlockLog()
+ .commitTransactions(resultList, UUID.fromString(acks.getDnId()));
}
return ContainerBlocksDeletionACKResponseProto.newBuilder()
.getDefaultInstanceForType();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 9255ec7..e86717b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -32,6 +32,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+ .DeleteBlockTransactionResult;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
@@ -45,6 +48,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -56,7 +60,8 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.when;
/**
* Tests for DeletedBlockLog.
@@ -66,6 +71,8 @@ public class TestDeletedBlockLog {
private static DeletedBlockLogImpl deletedBlockLog;
private OzoneConfiguration conf;
private File testDir;
+ private Mapping containerManager;
+ private List<DatanodeDetails> dnList;
@Before
public void setup() throws Exception {
@@ -74,7 +81,36 @@ public class TestDeletedBlockLog {
conf = new OzoneConfiguration();
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
- deletedBlockLog = new DeletedBlockLogImpl(conf);
+ containerManager = Mockito.mock(ContainerMapping.class);
+ deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+ dnList = new ArrayList<>(3);
+ setupContainerManager();
+ }
+
+ private void setupContainerManager() throws IOException {
+ dnList.add(
+ DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+ .build());
+ dnList.add(
+ DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+ .build());
+ dnList.add(
+ DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+ .build());
+
+ ContainerInfo containerInfo =
+ new ContainerInfo.Builder().setContainerID(1).build();
+ Pipeline pipeline =
+ new Pipeline(null, LifeCycleState.CLOSED, ReplicationType.RATIS,
+ ReplicationFactor.THREE, null);
+ pipeline.addMember(dnList.get(0));
+ pipeline.addMember(dnList.get(1));
+ pipeline.addMember(dnList.get(2));
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(containerInfo, pipeline);
+ when(containerManager.getContainerWithPipeline(anyLong()))
+ .thenReturn(containerWithPipeline);
+ when(containerManager.getContainer(anyLong())).thenReturn(containerInfo);
}
@After
@@ -101,45 +137,50 @@ public class TestDeletedBlockLog {
return blockMap;
}
- @Test
- public void testGetTransactions() throws Exception {
- List<DeletedBlocksTransaction> blocks =
- deletedBlockLog.getTransactions(30);
- Assert.assertEquals(0, blocks.size());
-
- // Creates 40 TX in the log.
- for (Map.Entry<Long, List<Long>> entry : generateData(40).entrySet()){
- deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+ private void commitTransactions(
+ List<DeleteBlockTransactionResult> transactionResults,
+ DatanodeDetails... dns) {
+ for (DatanodeDetails dnDetails : dns) {
+ deletedBlockLog
+ .commitTransactions(transactionResults, dnDetails.getUuid());
}
+ }
- // Get first 30 TXs.
- blocks = deletedBlockLog.getTransactions(30);
- Assert.assertEquals(30, blocks.size());
- for (int i = 0; i < 30; i++) {
- Assert.assertEquals(i + 1, blocks.get(i).getTxID());
- }
+ private void commitTransactions(
+ List<DeleteBlockTransactionResult> transactionResults) {
+ commitTransactions(transactionResults,
+ dnList.toArray(new DatanodeDetails[3]));
+ }
- // Get another 30 TXs.
- // The log only 10 left, so this time it will only return 10 TXs.
- blocks = deletedBlockLog.getTransactions(30);
- Assert.assertEquals(10, blocks.size());
- for (int i = 30; i < 40; i++) {
- Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID());
- }
+ private void commitTransactions(
+ Collection<DeletedBlocksTransaction> deletedBlocksTransactions,
+ DatanodeDetails... dns) {
+ commitTransactions(deletedBlocksTransactions.stream()
+ .map(this::createDeleteBlockTransactionResult)
+ .collect(Collectors.toList()), dns);
+ }
- // Get another 50 TXs.
- // By now the position should have moved to the beginning,
- // this call will return all 40 TXs.
- blocks = deletedBlockLog.getTransactions(50);
- Assert.assertEquals(40, blocks.size());
- for (int i = 0; i < 40; i++) {
- Assert.assertEquals(i + 1, blocks.get(i).getTxID());
- }
- List<Long> txIDs = new ArrayList<>();
- for (DeletedBlocksTransaction block : blocks) {
- txIDs.add(block.getTxID());
- }
- deletedBlockLog.commitTransactions(txIDs);
+ private void commitTransactions(
+ Collection<DeletedBlocksTransaction> deletedBlocksTransactions) {
+ commitTransactions(deletedBlocksTransactions.stream()
+ .map(this::createDeleteBlockTransactionResult)
+ .collect(Collectors.toList()));
+ }
+
+ private DeleteBlockTransactionResult createDeleteBlockTransactionResult(
+ DeletedBlocksTransaction transaction) {
+ return DeleteBlockTransactionResult.newBuilder()
+ .setContainerID(transaction.getContainerID()).setSuccess(true)
+ .setTxID(transaction.getTxID()).build();
+ }
+
+ private List<DeletedBlocksTransaction> getTransactions(
+ int maximumAllowedTXNum) throws IOException {
+ DatanodeDeletedBlockTransactions transactions =
+ new DatanodeDeletedBlockTransactions(containerManager,
+ maximumAllowedTXNum, 3);
+ deletedBlockLog.getTransactions(transactions);
+ return transactions.getDatanodeTransactions(dnList.get(0).getUuid());
}
@Test
@@ -153,7 +194,7 @@ public class TestDeletedBlockLog {
// This will return all TXs, total num 30.
List<DeletedBlocksTransaction> blocks =
- deletedBlockLog.getTransactions(40);
+ getTransactions(40);
List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());
@@ -164,13 +205,13 @@ public class TestDeletedBlockLog {
// Increment another time so it exceed the maxRetry.
// On this call, count will be set to -1 which means TX eventually fails.
deletedBlockLog.incrementCount(txIDs);
- blocks = deletedBlockLog.getTransactions(40);
+ blocks = getTransactions(40);
for (DeletedBlocksTransaction block : blocks) {
Assert.assertEquals(-1, block.getCount());
}
// If all TXs are failed, getTransactions call will always return nothing.
- blocks = deletedBlockLog.getTransactions(40);
+ blocks = getTransactions(40);
Assert.assertEquals(blocks.size(), 0);
}
@@ -180,16 +221,26 @@ public class TestDeletedBlockLog {
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
}
List<DeletedBlocksTransaction> blocks =
- deletedBlockLog.getTransactions(20);
- List<Long> txIDs = new ArrayList<>();
- for (DeletedBlocksTransaction block : blocks) {
- txIDs.add(block.getTxID());
- }
- // Add an invalid txID.
- txIDs.add(70L);
- deletedBlockLog.commitTransactions(txIDs);
- blocks = deletedBlockLog.getTransactions(50);
+ getTransactions(20);
+ // Add an invalid txn.
+ blocks.add(
+ DeletedBlocksTransaction.newBuilder().setContainerID(1).setTxID(70)
+ .setCount(0).addLocalID(0).build());
+ commitTransactions(blocks);
+ blocks.remove(blocks.size() - 1);
+
+ blocks = getTransactions(50);
+ Assert.assertEquals(30, blocks.size());
+ commitTransactions(blocks, dnList.get(1), dnList.get(2),
+ DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+ .build());
+
+ blocks = getTransactions(50);
Assert.assertEquals(30, blocks.size());
+ commitTransactions(blocks, dnList.get(0));
+
+ blocks = getTransactions(50);
+ Assert.assertEquals(0, blocks.size());
}
@Test
@@ -213,20 +264,16 @@ public class TestDeletedBlockLog {
}
added += 10;
} else if (state == 1) {
- blocks = deletedBlockLog.getTransactions(20);
+ blocks = getTransactions(20);
txIDs = new ArrayList<>();
for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID());
}
deletedBlockLog.incrementCount(txIDs);
} else if (state == 2) {
- txIDs = new ArrayList<>();
- for (DeletedBlocksTransaction block : blocks) {
- txIDs.add(block.getTxID());
- }
+ commitTransactions(blocks);
+ committed += blocks.size();
blocks = new ArrayList<>();
- committed += txIDs.size();
- deletedBlockLog.commitTransactions(txIDs);
} else {
// verify the number of added and committed.
List<Map.Entry<byte[], byte[]>> result =
@@ -234,6 +281,8 @@ public class TestDeletedBlockLog {
Assert.assertEquals(added, result.size() + committed);
}
}
+ blocks = getTransactions(1000);
+ commitTransactions(blocks);
}
@Test
@@ -244,16 +293,13 @@ public class TestDeletedBlockLog {
// close db and reopen it again to make sure
// transactions are stored persistently.
deletedBlockLog.close();
- deletedBlockLog = new DeletedBlockLogImpl(conf);
+ deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
List<DeletedBlocksTransaction> blocks =
- deletedBlockLog.getTransactions(10);
- List<Long> txIDs = new ArrayList<>();
- for (DeletedBlocksTransaction block : blocks) {
- txIDs.add(block.getTxID());
- }
- deletedBlockLog.commitTransactions(txIDs);
- blocks = deletedBlockLog.getTransactions(10);
- Assert.assertEquals(10, blocks.size());
+ getTransactions(10);
+ commitTransactions(blocks);
+ blocks = getTransactions(100);
+ Assert.assertEquals(40, blocks.size());
+ commitTransactions(blocks);
}
@Test
@@ -262,32 +308,11 @@ public class TestDeletedBlockLog {
int maximumAllowedTXNum = 5;
List<DeletedBlocksTransaction> blocks = null;
List<Long> containerIDs = new LinkedList<>();
+ DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1);
int count = 0;
long containerID = 0L;
- DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
- DatanodeDetails.Port.Name.STANDALONE, 0);
- DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
- DatanodeDetails.Port.Name.RATIS, 0);
- DatanodeDetails.Port restPort = DatanodeDetails.newPort(
- DatanodeDetails.Port.Name.REST, 0);
- DatanodeDetails dnId1 = DatanodeDetails.newBuilder()
- .setUuid(UUID.randomUUID().toString())
- .setIpAddress("127.0.0.1")
- .setHostName("localhost")
- .addPort(containerPort)
- .addPort(ratisPort)
- .addPort(restPort)
- .build();
- DatanodeDetails dnId2 = DatanodeDetails.newBuilder()
- .setUuid(UUID.randomUUID().toString())
- .setIpAddress("127.0.0.1")
- .setHostName("localhost")
- .addPort(containerPort)
- .addPort(ratisPort)
- .addPort(restPort)
- .build();
- Mapping mappingService = mock(ContainerMapping.class);
+
// Creates {TXNum} TX in the log.
for (Map.Entry<Long, List<Long>> entry : generateData(txNum)
.entrySet()) {
@@ -298,29 +323,25 @@ public class TestDeletedBlockLog {
// make TX[1-6] for datanode1; TX[7-10] for datanode2
if (count <= (maximumAllowedTXNum + 1)) {
- mockContainerInfo(mappingService, containerID, dnId1);
+ mockContainerInfo(containerID, dnId1);
} else {
- mockContainerInfo(mappingService, containerID, dnId2);
+ mockContainerInfo(containerID, dnId2);
}
}
DatanodeDeletedBlockTransactions transactions =
- new DatanodeDeletedBlockTransactions(mappingService,
+ new DatanodeDeletedBlockTransactions(containerManager,
maximumAllowedTXNum, 2);
deletedBlockLog.getTransactions(transactions);
- List<Long> txIDs = new LinkedList<>();
for (UUID id : transactions.getDatanodeIDs()) {
List<DeletedBlocksTransaction> txs = transactions
.getDatanodeTransactions(id);
- for (DeletedBlocksTransaction tx : txs) {
- txIDs.add(tx.getTxID());
- }
+ // delete TX ID
+ commitTransactions(txs);
}
- // delete TX ID
- deletedBlockLog.commitTransactions(txIDs);
- blocks = deletedBlockLog.getTransactions(txNum);
+ blocks = getTransactions(txNum);
// There should be one block remained since dnID1 reaches
// the maximum value (5).
Assert.assertEquals(1, blocks.size());
@@ -337,7 +358,8 @@ public class TestDeletedBlockLog {
builder.setTxID(11);
builder.setContainerID(containerID);
builder.setCount(0);
- transactions.addTransaction(builder.build());
+ transactions.addTransaction(builder.build(),
+ null);
// The number of TX in dnID2 should not be changed.
Assert.assertEquals(size,
@@ -349,14 +371,14 @@ public class TestDeletedBlockLog {
builder.setTxID(12);
builder.setContainerID(containerID);
builder.setCount(0);
- mockContainerInfo(mappingService, containerID, dnId2);
- transactions.addTransaction(builder.build());
+ mockContainerInfo(containerID, dnId2);
+ transactions.addTransaction(builder.build(),
+ null);
// Since all node are full, then transactions is full.
Assert.assertTrue(transactions.isFull());
}
- private void mockContainerInfo(Mapping mappingService, long containerID,
- DatanodeDetails dd) throws IOException {
+ private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException {
Pipeline pipeline =
new Pipeline("fake", LifeCycleState.OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
@@ -370,9 +392,9 @@ public class TestDeletedBlockLog {
ContainerInfo containerInfo = builder.build();
ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(
containerInfo, pipeline);
- Mockito.doReturn(containerInfo).when(mappingService)
+ Mockito.doReturn(containerInfo).when(containerManager)
.getContainer(containerID);
- Mockito.doReturn(containerWithPipeline).when(mappingService)
+ Mockito.doReturn(containerWithPipeline).when(containerManager)
.getContainerWithPipeline(containerID);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org