You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/08/02 17:17:31 UTC

[08/50] [abbrv] hadoop git commit: HDDS-273. DeleteLog entries should be purged only after corresponding DNs commit the transaction. Contributed by Lokesh Jain.

HDDS-273. DeleteLog entries should be purged only after corresponding DNs commit the transaction. Contributed by Lokesh Jain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/feb795b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/feb795b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/feb795b5

Branch: refs/heads/YARN-7402
Commit: feb795b58d2a3c20bdbddea1638a83f6637d3fc9
Parents: 6b038f8
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Sun Jul 29 01:02:24 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Sun Jul 29 01:02:24 2018 +0530

----------------------------------------------------------------------
 .../DeleteBlocksCommandHandler.java             |  12 +-
 .../StorageContainerDatanodeProtocol.proto      |   4 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java |   2 +-
 .../block/DatanodeDeletedBlockTransactions.java |  47 ++--
 .../hadoop/hdds/scm/block/DeletedBlockLog.java  |  23 +-
 .../hdds/scm/block/DeletedBlockLogImpl.java     | 123 ++++++----
 .../scm/server/SCMDatanodeProtocolServer.java   |  19 +-
 .../hdds/scm/block/TestDeletedBlockLog.java     | 232 ++++++++++---------
 8 files changed, 256 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 9640f93..b0d4cbc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -113,8 +113,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
         DeleteBlockTransactionResult.Builder txResultBuilder =
             DeleteBlockTransactionResult.newBuilder();
         txResultBuilder.setTxID(entry.getTxID());
+        long containerId = entry.getContainerID();
         try {
-          long containerId = entry.getContainerID();
           Container cont = containerSet.getContainer(containerId);
           if (cont == null) {
             throw new StorageContainerException("Unable to find the container "
@@ -126,7 +126,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
             KeyValueContainerData containerData = (KeyValueContainerData)
                 cont.getContainerData();
             deleteKeyValueContainerBlocks(containerData, entry);
-            txResultBuilder.setSuccess(true);
+            txResultBuilder.setContainerID(containerId)
+                .setSuccess(true);
             break;
           default:
             LOG.error(
@@ -136,9 +137,12 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
         } catch (IOException e) {
           LOG.warn("Failed to delete blocks for container={}, TXID={}",
               entry.getContainerID(), entry.getTxID(), e);
-          txResultBuilder.setSuccess(false);
+          txResultBuilder.setContainerID(containerId)
+              .setSuccess(false);
         }
-        resultBuilder.addResults(txResultBuilder.build());
+        resultBuilder.addResults(txResultBuilder.build())
+            .setDnId(context.getParent().getDatanodeDetails()
+                .getUuid().toString());
       });
       ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index d89567b..0c52efb 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -229,9 +229,11 @@ message DeletedBlocksTransaction {
 message ContainerBlocksDeletionACKProto {
   message DeleteBlockTransactionResult {
     required int64 txID = 1;
-    required bool success = 2;
+    required int64 containerID = 2;
+    required bool success = 3;
   }
   repeated DeleteBlockTransactionResult results = 1;
+  required string dnId = 2;
 }
 
 // SendACK response returned by datanode to SCM, currently empty.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 6825ca4..8e1c2cc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -112,7 +112,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
 
     // SCM block deleting transaction log and deleting service.
-    deletedBlockLog = new DeletedBlockLogImpl(conf);
+    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
     long svcInterval =
         conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
             OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index d71e7b0..e33a700 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -53,7 +53,8 @@ public class DatanodeDeletedBlockTransactions {
     this.nodeNum = nodeNum;
   }
 
-  public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
+  public void addTransaction(DeletedBlocksTransaction tx,
+      Set<UUID> dnsWithTransactionCommitted) throws IOException {
     Pipeline pipeline = null;
     try {
       pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
@@ -71,29 +72,37 @@ public class DatanodeDeletedBlockTransactions {
 
     for (DatanodeDetails dd : pipeline.getMachines()) {
       UUID dnID = dd.getUuid();
-      if (transactions.containsKey(dnID)) {
-        List<DeletedBlocksTransaction> txs = transactions.get(dnID);
-        if (txs != null && txs.size() < maximumAllowedTXNum) {
-          boolean hasContained = false;
-          for (DeletedBlocksTransaction t : txs) {
-            if (t.getContainerID() == tx.getContainerID()) {
-              hasContained = true;
-              break;
-            }
-          }
+      if (dnsWithTransactionCommitted == null ||
+          !dnsWithTransactionCommitted.contains(dnID)) {
+        // Transaction need not be sent to dns which have already committed it
+        addTransactionToDN(dnID, tx);
+      }
+    }
+  }
 
-          if (!hasContained) {
-            txs.add(tx);
-            currentTXNum++;
+  private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
+    if (transactions.containsKey(dnID)) {
+      List<DeletedBlocksTransaction> txs = transactions.get(dnID);
+      if (txs != null && txs.size() < maximumAllowedTXNum) {
+        boolean hasContained = false;
+        for (DeletedBlocksTransaction t : txs) {
+          if (t.getContainerID() == tx.getContainerID()) {
+            hasContained = true;
+            break;
           }
         }
-      } else {
-        currentTXNum++;
-        transactions.put(dnID, tx);
+
+        if (!hasContained) {
+          txs.add(tx);
+          currentTXNum++;
+        }
       }
-      SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID,
-          tx.getTxID());
+    } else {
+      currentTXNum++;
+      transactions.put(dnID, tx);
     }
+    SCMBlockDeletingService.LOG
+        .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
   }
 
   Set<UUID> getDatanodeIDs() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index 28103be..2bb5686 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -18,12 +18,16 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+    .DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * The DeletedBlockLog is a persisted log in SCM to keep tracking
@@ -34,18 +38,6 @@ import java.util.Map;
 public interface DeletedBlockLog extends Closeable {
 
   /**
-   *  A limit size list of transactions. Note count is the max number
-   *  of TXs to return, we might not be able to always return this
-   *  number. and the processCount of those transactions
-   *  should be [0, MAX_RETRY).
-   *
-   * @param count - number of transactions.
-   * @return a list of BlockDeletionTransaction.
-   */
-  List<DeletedBlocksTransaction> getTransactions(int count)
-      throws IOException;
-
-  /**
    * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
    * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
    * stop.
@@ -81,10 +73,11 @@ public interface DeletedBlockLog extends Closeable {
    * Commits a transaction means to delete all footprints of a transaction
    * from the log. This method doesn't guarantee all transactions can be
    * successfully deleted, it tolerate failures and tries best efforts to.
-   *
-   * @param txIDs - transaction IDs.
+   *  @param transactionResults - delete block transaction results.
+   * @param dnID - ID of datanode which acknowledges the delete block command.
    */
-  void commitTransactions(List<Long> txIDs) throws IOException;
+  void commitTransactions(List<DeleteBlockTransactionResult> transactionResults,
+      UUID dnID);
 
   /**
    * Creates a block deletion transaction and adds that into the log.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 48fa2eb..752c9c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -21,27 +21,36 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+    .DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.container.Mapping;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
@@ -74,12 +83,15 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
 
   private final int maxRetry;
   private final MetadataStore deletedStore;
+  private final Mapping containerManager;
   private final Lock lock;
   // The latest id of deleted blocks in the db.
   private long lastTxID;
-  private long lastReadTxID;
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private Map<Long, Set<UUID>> transactionToDNsCommitMap;
 
-  public DeletedBlockLogImpl(Configuration conf) throws IOException {
+  public DeletedBlockLogImpl(Configuration conf, Mapping containerManager)
+      throws IOException {
     maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
         OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
 
@@ -95,11 +107,17 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
         .setDbFile(deletedLogDbPath)
         .setCacheSize(cacheSize * OzoneConsts.MB)
         .build();
+    this.containerManager = containerManager;
 
     this.lock = new ReentrantLock();
     // start from the head of deleted store.
-    lastReadTxID = 0;
     lastTxID = findLatestTxIDInStore();
+
+    // transactionToDNsCommitMap is updated only when
+    // transaction is added to the log and when it is removed.
+
+    // maps transaction to dns which have committed it.
+    transactionToDNsCommitMap = new ConcurrentHashMap<>();
   }
 
   @VisibleForTesting
@@ -124,39 +142,6 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
   }
 
   @Override
-  public List<DeletedBlocksTransaction> getTransactions(
-      int count) throws IOException {
-    List<DeletedBlocksTransaction> result = new ArrayList<>();
-    MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
-        -> Longs.fromByteArray(currentKey) > lastReadTxID;
-    MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
-        -> !Arrays.equals(LATEST_TXID, currentKey);
-    lock.lock();
-    try {
-      deletedStore.iterate(null, (key, value) -> {
-        if (getNextTxID.filterKey(null, key, null) &&
-            avoidInvalidTxid.filterKey(null, key, null)) {
-          DeletedBlocksTransaction block = DeletedBlocksTransaction
-              .parseFrom(value);
-          if (block.getCount() > -1 && block.getCount() <= maxRetry) {
-            result.add(block);
-          }
-        }
-        return result.size() < count;
-      });
-      // Scan the metadata from the beginning.
-      if (result.size() < count || result.size() < 1) {
-        lastReadTxID = 0;
-      } else {
-        lastReadTxID = result.get(result.size() - 1).getTxID();
-      }
-    } finally {
-      lock.unlock();
-    }
-    return result;
-  }
-
-  @Override
   public List<DeletedBlocksTransaction> getFailedTransactions()
       throws IOException {
     lock.lock();
@@ -235,18 +220,50 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
   /**
    * {@inheritDoc}
    *
-   * @param txIDs - transaction IDs.
+   * @param transactionResults - transaction IDs.
+   * @param dnID - Id of Datanode which has acknowledged a delete block command.
    * @throws IOException
    */
   @Override
-  public void commitTransactions(List<Long> txIDs) throws IOException {
+  public void commitTransactions(
+      List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
     lock.lock();
     try {
-      for (Long txID : txIDs) {
+      Set<UUID> dnsWithCommittedTxn;
+      for (DeleteBlockTransactionResult transactionResult : transactionResults) {
+        if (isTransactionFailed(transactionResult)) {
+          continue;
+        }
         try {
-          deletedStore.delete(Longs.toByteArray(txID));
-        } catch (IOException ex) {
-          LOG.warn("Cannot commit txID " + txID, ex);
+          long txID = transactionResult.getTxID();
+          // set of dns which have successfully committed transaction txId.
+          dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
+          Long containerId = transactionResult.getContainerID();
+          if (dnsWithCommittedTxn == null || containerId == null) {
+            LOG.warn("Transaction txId={} commit by dnId={} failed."
+                + " Corresponding entry not found.", txID, dnID);
+            return;
+          }
+
+          dnsWithCommittedTxn.add(dnID);
+          Collection<DatanodeDetails> containerDnsDetails =
+              containerManager.getContainerWithPipeline(containerId)
+                  .getPipeline().getDatanodes().values();
+          // The delete entry can be safely removed from the log if all the
+          // corresponding nodes commit the txn.
+          if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) {
+            List<UUID> containerDns = containerDnsDetails.stream()
+                .map(dnDetails -> dnDetails.getUuid())
+                .collect(Collectors.toList());
+            if (dnsWithCommittedTxn.containsAll(containerDns)) {
+              transactionToDNsCommitMap.remove(txID);
+              LOG.debug("Purging txId={} from block deletion log", txID);
+              deletedStore.delete(Longs.toByteArray(txID));
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("Could not commit delete block transaction: " +
+              transactionResult.getTxID(), e);
         }
       }
     } finally {
@@ -254,6 +271,20 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
     }
   }
 
+  private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
+          result.getTxID(), result.getSuccess());
+    }
+    if (!result.getSuccess()) {
+      LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+          + "TX in next interval", result.getTxID());
+      return true;
+    }
+    return false;
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -355,7 +386,9 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
               .parseFrom(value);
 
           if (block.getCount() > -1 && block.getCount() <= maxRetry) {
-            transactions.addTransaction(block);
+            Set<UUID> dnsWithTransactionCommitted = transactionToDNsCommitMap
+                .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
+            transactions.addTransaction(block, dnsWithTransactionCommitted);
           }
           return !transactions.isFull();
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index aee64b9..0d34787 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -91,9 +91,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
@@ -230,21 +230,8 @@ public class SCMDatanodeProtocolServer implements
       ContainerBlocksDeletionACKProto acks) throws IOException {
     if (acks.getResultsCount() > 0) {
       List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
-      for (DeleteBlockTransactionResult result : resultList) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
-              + "success={}", result.getTxID(), result.getSuccess());
-        }
-        if (result.getSuccess()) {
-          LOG.debug("Purging TXID={} from block deletion log",
-              result.getTxID());
-          scm.getScmBlockManager().getDeletedBlockLog()
-              .commitTransactions(Collections.singletonList(result.getTxID()));
-        } else {
-          LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
-              + "TX in next interval", result.getTxID());
-        }
-      }
+      scm.getScmBlockManager().getDeletedBlockLog()
+          .commitTransactions(resultList, UUID.fromString(acks.getDnId()));
     }
     return ContainerBlocksDeletionACKResponseProto.newBuilder()
         .getDefaultInstanceForType();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/feb795b5/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 9255ec7..e86717b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -32,6 +32,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+    .DeleteBlockTransactionResult;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
@@ -45,6 +48,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -56,7 +60,8 @@ import java.util.stream.Collectors;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for DeletedBlockLog.
@@ -66,6 +71,8 @@ public class TestDeletedBlockLog {
   private static DeletedBlockLogImpl deletedBlockLog;
   private OzoneConfiguration conf;
   private File testDir;
+  private Mapping containerManager;
+  private List<DatanodeDetails> dnList;
 
   @Before
   public void setup() throws Exception {
@@ -74,7 +81,36 @@ public class TestDeletedBlockLog {
     conf = new OzoneConfiguration();
     conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
     conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
-    deletedBlockLog = new DeletedBlockLogImpl(conf);
+    containerManager = Mockito.mock(ContainerMapping.class);
+    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+    dnList = new ArrayList<>(3);
+    setupContainerManager();
+  }
+
+  private void setupContainerManager() throws IOException {
+    dnList.add(
+        DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+            .build());
+    dnList.add(
+        DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+            .build());
+    dnList.add(
+        DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+            .build());
+
+    ContainerInfo containerInfo =
+        new ContainerInfo.Builder().setContainerID(1).build();
+    Pipeline pipeline =
+        new Pipeline(null, LifeCycleState.CLOSED, ReplicationType.RATIS,
+            ReplicationFactor.THREE, null);
+    pipeline.addMember(dnList.get(0));
+    pipeline.addMember(dnList.get(1));
+    pipeline.addMember(dnList.get(2));
+    ContainerWithPipeline containerWithPipeline =
+        new ContainerWithPipeline(containerInfo, pipeline);
+    when(containerManager.getContainerWithPipeline(anyLong()))
+        .thenReturn(containerWithPipeline);
+    when(containerManager.getContainer(anyLong())).thenReturn(containerInfo);
   }
 
   @After
@@ -101,45 +137,50 @@ public class TestDeletedBlockLog {
     return blockMap;
   }
 
-  @Test
-  public void testGetTransactions() throws Exception {
-    List<DeletedBlocksTransaction> blocks =
-        deletedBlockLog.getTransactions(30);
-    Assert.assertEquals(0, blocks.size());
-
-    // Creates 40 TX in the log.
-    for (Map.Entry<Long, List<Long>> entry : generateData(40).entrySet()){
-      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+  private void commitTransactions(
+      List<DeleteBlockTransactionResult> transactionResults,
+      DatanodeDetails... dns) {
+    for (DatanodeDetails dnDetails : dns) {
+      deletedBlockLog
+          .commitTransactions(transactionResults, dnDetails.getUuid());
     }
+  }
 
-    // Get first 30 TXs.
-    blocks = deletedBlockLog.getTransactions(30);
-    Assert.assertEquals(30, blocks.size());
-    for (int i = 0; i < 30; i++) {
-      Assert.assertEquals(i + 1, blocks.get(i).getTxID());
-    }
+  private void commitTransactions(
+      List<DeleteBlockTransactionResult> transactionResults) {
+    commitTransactions(transactionResults,
+        dnList.toArray(new DatanodeDetails[3]));
+  }
 
-    // Get another 30 TXs.
-    // The log only 10 left, so this time it will only return 10 TXs.
-    blocks = deletedBlockLog.getTransactions(30);
-    Assert.assertEquals(10, blocks.size());
-    for (int i = 30; i < 40; i++) {
-      Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID());
-    }
+  private void commitTransactions(
+      Collection<DeletedBlocksTransaction> deletedBlocksTransactions,
+      DatanodeDetails... dns) {
+    commitTransactions(deletedBlocksTransactions.stream()
+        .map(this::createDeleteBlockTransactionResult)
+        .collect(Collectors.toList()), dns);
+  }
 
-    // Get another 50 TXs.
-    // By now the position should have moved to the beginning,
-    // this call will return all 40 TXs.
-    blocks = deletedBlockLog.getTransactions(50);
-    Assert.assertEquals(40, blocks.size());
-    for (int i = 0; i < 40; i++) {
-      Assert.assertEquals(i + 1, blocks.get(i).getTxID());
-    }
-    List<Long> txIDs = new ArrayList<>();
-    for (DeletedBlocksTransaction block : blocks) {
-      txIDs.add(block.getTxID());
-    }
-    deletedBlockLog.commitTransactions(txIDs);
+  private void commitTransactions(
+      Collection<DeletedBlocksTransaction> deletedBlocksTransactions) {
+    commitTransactions(deletedBlocksTransactions.stream()
+        .map(this::createDeleteBlockTransactionResult)
+        .collect(Collectors.toList()));
+  }
+
+  private DeleteBlockTransactionResult createDeleteBlockTransactionResult(
+      DeletedBlocksTransaction transaction) {
+    return DeleteBlockTransactionResult.newBuilder()
+        .setContainerID(transaction.getContainerID()).setSuccess(true)
+        .setTxID(transaction.getTxID()).build();
+  }
+
+  private List<DeletedBlocksTransaction> getTransactions(
+      int maximumAllowedTXNum) throws IOException {
+    DatanodeDeletedBlockTransactions transactions =
+        new DatanodeDeletedBlockTransactions(containerManager,
+            maximumAllowedTXNum, 3);
+    deletedBlockLog.getTransactions(transactions);
+    return transactions.getDatanodeTransactions(dnList.get(0).getUuid());
   }
 
   @Test
@@ -153,7 +194,7 @@ public class TestDeletedBlockLog {
 
     // This will return all TXs, total num 30.
     List<DeletedBlocksTransaction> blocks =
-        deletedBlockLog.getTransactions(40);
+        getTransactions(40);
     List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
         .collect(Collectors.toList());
 
@@ -164,13 +205,13 @@ public class TestDeletedBlockLog {
     // Increment another time so it exceed the maxRetry.
     // On this call, count will be set to -1 which means TX eventually fails.
     deletedBlockLog.incrementCount(txIDs);
-    blocks = deletedBlockLog.getTransactions(40);
+    blocks = getTransactions(40);
     for (DeletedBlocksTransaction block : blocks) {
       Assert.assertEquals(-1, block.getCount());
     }
 
     // If all TXs are failed, getTransactions call will always return nothing.
-    blocks = deletedBlockLog.getTransactions(40);
+    blocks = getTransactions(40);
     Assert.assertEquals(blocks.size(), 0);
   }
 
@@ -180,16 +221,26 @@ public class TestDeletedBlockLog {
       deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
     }
     List<DeletedBlocksTransaction> blocks =
-        deletedBlockLog.getTransactions(20);
-    List<Long> txIDs = new ArrayList<>();
-    for (DeletedBlocksTransaction block : blocks) {
-      txIDs.add(block.getTxID());
-    }
-    // Add an invalid txID.
-    txIDs.add(70L);
-    deletedBlockLog.commitTransactions(txIDs);
-    blocks = deletedBlockLog.getTransactions(50);
+        getTransactions(20);
+    // Add an invalid txn.
+    blocks.add(
+        DeletedBlocksTransaction.newBuilder().setContainerID(1).setTxID(70)
+            .setCount(0).addLocalID(0).build());
+    commitTransactions(blocks);
+    blocks.remove(blocks.size() - 1);
+
+    blocks = getTransactions(50);
+    Assert.assertEquals(30, blocks.size());
+    commitTransactions(blocks, dnList.get(1), dnList.get(2),
+        DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
+            .build());
+
+    blocks = getTransactions(50);
     Assert.assertEquals(30, blocks.size());
+    commitTransactions(blocks, dnList.get(0));
+
+    blocks = getTransactions(50);
+    Assert.assertEquals(0, blocks.size());
   }
 
   @Test
@@ -213,20 +264,16 @@ public class TestDeletedBlockLog {
         }
         added += 10;
       } else if (state == 1) {
-        blocks = deletedBlockLog.getTransactions(20);
+        blocks = getTransactions(20);
         txIDs = new ArrayList<>();
         for (DeletedBlocksTransaction block : blocks) {
           txIDs.add(block.getTxID());
         }
         deletedBlockLog.incrementCount(txIDs);
       } else if (state == 2) {
-        txIDs = new ArrayList<>();
-        for (DeletedBlocksTransaction block : blocks) {
-          txIDs.add(block.getTxID());
-        }
+        commitTransactions(blocks);
+        committed += blocks.size();
         blocks = new ArrayList<>();
-        committed += txIDs.size();
-        deletedBlockLog.commitTransactions(txIDs);
       } else {
         // verify the number of added and committed.
         List<Map.Entry<byte[], byte[]>> result =
@@ -234,6 +281,8 @@ public class TestDeletedBlockLog {
         Assert.assertEquals(added, result.size() + committed);
       }
     }
+    blocks = getTransactions(1000);
+    commitTransactions(blocks);
   }
 
   @Test
@@ -244,16 +293,13 @@ public class TestDeletedBlockLog {
     // close db and reopen it again to make sure
     // transactions are stored persistently.
     deletedBlockLog.close();
-    deletedBlockLog = new DeletedBlockLogImpl(conf);
+    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
     List<DeletedBlocksTransaction> blocks =
-        deletedBlockLog.getTransactions(10);
-    List<Long> txIDs = new ArrayList<>();
-    for (DeletedBlocksTransaction block : blocks) {
-      txIDs.add(block.getTxID());
-    }
-    deletedBlockLog.commitTransactions(txIDs);
-    blocks = deletedBlockLog.getTransactions(10);
-    Assert.assertEquals(10, blocks.size());
+        getTransactions(10);
+    commitTransactions(blocks);
+    blocks = getTransactions(100);
+    Assert.assertEquals(40, blocks.size());
+    commitTransactions(blocks);
   }
 
   @Test
@@ -262,32 +308,11 @@ public class TestDeletedBlockLog {
     int maximumAllowedTXNum = 5;
     List<DeletedBlocksTransaction> blocks = null;
     List<Long> containerIDs = new LinkedList<>();
+    DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1);
 
     int count = 0;
     long containerID = 0L;
-    DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
-        DatanodeDetails.Port.Name.STANDALONE, 0);
-    DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
-        DatanodeDetails.Port.Name.RATIS, 0);
-    DatanodeDetails.Port restPort = DatanodeDetails.newPort(
-        DatanodeDetails.Port.Name.REST, 0);
-    DatanodeDetails dnId1 = DatanodeDetails.newBuilder()
-        .setUuid(UUID.randomUUID().toString())
-        .setIpAddress("127.0.0.1")
-        .setHostName("localhost")
-        .addPort(containerPort)
-        .addPort(ratisPort)
-        .addPort(restPort)
-        .build();
-    DatanodeDetails dnId2 = DatanodeDetails.newBuilder()
-        .setUuid(UUID.randomUUID().toString())
-        .setIpAddress("127.0.0.1")
-        .setHostName("localhost")
-        .addPort(containerPort)
-        .addPort(ratisPort)
-        .addPort(restPort)
-        .build();
-    Mapping mappingService = mock(ContainerMapping.class);
+
     // Creates {TXNum} TX in the log.
     for (Map.Entry<Long, List<Long>> entry : generateData(txNum)
         .entrySet()) {
@@ -298,29 +323,25 @@ public class TestDeletedBlockLog {
 
       // make TX[1-6] for datanode1; TX[7-10] for datanode2
       if (count <= (maximumAllowedTXNum + 1)) {
-        mockContainerInfo(mappingService, containerID, dnId1);
+        mockContainerInfo(containerID, dnId1);
       } else {
-        mockContainerInfo(mappingService, containerID, dnId2);
+        mockContainerInfo(containerID, dnId2);
       }
     }
 
     DatanodeDeletedBlockTransactions transactions =
-        new DatanodeDeletedBlockTransactions(mappingService,
+        new DatanodeDeletedBlockTransactions(containerManager,
             maximumAllowedTXNum, 2);
     deletedBlockLog.getTransactions(transactions);
 
-    List<Long> txIDs = new LinkedList<>();
     for (UUID id : transactions.getDatanodeIDs()) {
       List<DeletedBlocksTransaction> txs = transactions
           .getDatanodeTransactions(id);
-      for (DeletedBlocksTransaction tx : txs) {
-        txIDs.add(tx.getTxID());
-      }
+      // delete TX ID
+      commitTransactions(txs);
     }
 
-    // delete TX ID
-    deletedBlockLog.commitTransactions(txIDs);
-    blocks = deletedBlockLog.getTransactions(txNum);
+    blocks = getTransactions(txNum);
     // There should be one block remained since dnID1 reaches
     // the maximum value (5).
     Assert.assertEquals(1, blocks.size());
@@ -337,7 +358,8 @@ public class TestDeletedBlockLog {
     builder.setTxID(11);
     builder.setContainerID(containerID);
     builder.setCount(0);
-    transactions.addTransaction(builder.build());
+    transactions.addTransaction(builder.build(),
+        null);
 
     // The number of TX in dnID2 should not be changed.
     Assert.assertEquals(size,
@@ -349,14 +371,14 @@ public class TestDeletedBlockLog {
     builder.setTxID(12);
     builder.setContainerID(containerID);
     builder.setCount(0);
-    mockContainerInfo(mappingService, containerID, dnId2);
-    transactions.addTransaction(builder.build());
+    mockContainerInfo(containerID, dnId2);
+    transactions.addTransaction(builder.build(),
+        null);
     // Since all node are full, then transactions is full.
     Assert.assertTrue(transactions.isFull());
   }
 
-  private void mockContainerInfo(Mapping mappingService, long containerID,
-      DatanodeDetails dd) throws IOException {
+  private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException {
     Pipeline pipeline =
         new Pipeline("fake", LifeCycleState.OPEN,
             ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
@@ -370,9 +392,9 @@ public class TestDeletedBlockLog {
     ContainerInfo containerInfo = builder.build();
     ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(
         containerInfo, pipeline);
-    Mockito.doReturn(containerInfo).when(mappingService)
+    Mockito.doReturn(containerInfo).when(containerManager)
         .getContainer(containerID);
-    Mockito.doReturn(containerWithPipeline).when(mappingService)
+    Mockito.doReturn(containerWithPipeline).when(containerManager)
         .getContainerWithPipeline(containerID);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org