You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/08/13 07:18:29 UTC

hadoop git commit: HDDS-308. SCM should identify a container with pending deletes using container reports. Contributed by Lokesh Jain.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 3ac07b720 -> a8dae0047


HDDS-308. SCM should identify a container with pending deletes using container reports. Contributed by Lokesh Jain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8dae004
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8dae004
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8dae004

Branch: refs/heads/trunk
Commit: a8dae0047cc3d86468ffe275be0d7857784fa8ab
Parents: 3ac07b7
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Aug 13 12:47:52 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Aug 13 12:47:52 2018 +0530

----------------------------------------------------------------------
 .../container/common/impl/ContainerSet.java     |  3 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java |  4 +-
 .../hadoop/hdds/scm/block/DeletedBlockLog.java  |  6 +-
 .../hdds/scm/block/DeletedBlockLogImpl.java     | 38 ++++++----
 .../hdds/scm/block/PendingDeleteHandler.java    | 38 ++++++++++
 .../hdds/scm/block/PendingDeleteStatusList.java | 79 ++++++++++++++++++++
 .../hdds/scm/block/SCMBlockDeletingService.java | 20 ++++-
 .../hdds/scm/container/ContainerMapping.java    | 35 +++++++--
 .../scm/container/ContainerStateManager.java    | 14 ++--
 .../hadoop/hdds/scm/events/SCMEvents.java       |  9 +++
 .../scm/server/StorageContainerManager.java     |  5 ++
 .../hadoop/hdds/scm/block/TestBlockManager.java | 25 -------
 .../commandhandler/TestBlockDeletion.java       | 55 +++++++++++++-
 13 files changed, 268 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 7a6cb2d..3da09f2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -215,7 +215,8 @@ public class ContainerSet {
           .setReadBytes(containerData.getReadBytes())
           .setWriteBytes(containerData.getWriteBytes())
           .setUsed(containerData.getBytesUsed())
-          .setState(getState(containerData));
+          .setState(getState(containerData))
+          .setDeleteTransactionId(containerData.getDeleteTransactionId());
 
       crBuilder.addReports(ciBuilder.build());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 8e1c2cc..f3a111f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -372,9 +372,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
       }
 
       try {
-        Map<Long, Long> deleteTransactionsMap =
-            deletedBlockLog.addTransactions(containerBlocks);
-        containerManager.updateDeleteTransactionId(deleteTransactionsMap);
+        deletedBlockLog.addTransactions(containerBlocks);
       } catch (IOException e) {
         throw new IOException(
             "Skip writing the deleted blocks info to"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index 2bb5686..db6c1c5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -42,9 +42,10 @@ public interface DeletedBlockLog extends Closeable {
    * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
    * stop.
    * @param transactions a list of TXs will be set into.
+   * @return Mapping from containerId to latest transactionId for the container.
    * @throws IOException
    */
-  void getTransactions(DatanodeDeletedBlockTransactions transactions)
+  Map<Long, Long> getTransactions(DatanodeDeletedBlockTransactions transactions)
       throws IOException;
 
   /**
@@ -101,10 +102,9 @@ public interface DeletedBlockLog extends Closeable {
    * number of containers) together (on success) or non (on failure).
    *
    * @param containerBlocksMap a map of containerBlocks.
-   * @return Mapping from containerId to latest transactionId for the container.
    * @throws IOException
    */
-  Map<Long, Long> addTransactions(Map<Long, List<Long>> containerBlocksMap)
+  void addTransactions(Map<Long, List<Long>> containerBlocksMap)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index ca4e1d0..df97c27 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
     .DeleteBlockTransactionResult;
 import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -45,13 +46,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import static java.lang.Math.min;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
@@ -239,21 +241,26 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
           // set of dns which have successfully committed transaction txId.
           dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
           Long containerId = transactionResult.getContainerID();
-          if (dnsWithCommittedTxn == null || containerId == null) {
-            LOG.warn("Transaction txId={} commit by dnId={} failed."
-                + " Corresponding entry not found.", txID, dnID);
+          if (dnsWithCommittedTxn == null) {
+            LOG.warn("Transaction txId={} commit by dnId={} for containerID={} "
+                    + "failed. Corresponding entry not found.", txID, dnID,
+                containerId);
             return;
           }
 
           dnsWithCommittedTxn.add(dnID);
-          Collection<DatanodeDetails> containerDnsDetails =
+          Pipeline pipeline =
               containerManager.getContainerWithPipeline(containerId)
-                  .getPipeline().getDatanodes().values();
+                  .getPipeline();
+          Collection<DatanodeDetails> containerDnsDetails =
+              pipeline.getDatanodes().values();
           // The delete entry can be safely removed from the log if all the
-          // corresponding nodes commit the txn.
-          if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) {
+          // corresponding nodes commit the txn. It is required to check that
+          // the nodes returned in the pipeline match the replication factor.
+          if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size())
+              >= pipeline.getFactor().getNumber()) {
             List<UUID> containerDns = containerDnsDetails.stream()
-                .map(dnDetails -> dnDetails.getUuid())
+                .map(DatanodeDetails::getUuid)
                 .collect(Collectors.toList());
             if (dnsWithCommittedTxn.containsAll(containerDns)) {
               transactionToDNsCommitMap.remove(txID);
@@ -338,15 +345,13 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
    * {@inheritDoc}
    *
    * @param containerBlocksMap a map of containerBlocks.
-   * @return Mapping from containerId to latest transactionId for the container.
    * @throws IOException
    */
   @Override
-  public Map<Long, Long> addTransactions(
+  public void addTransactions(
       Map<Long, List<Long>> containerBlocksMap)
       throws IOException {
     BatchOperation batch = new BatchOperation();
-    Map<Long, Long> deleteTransactionsMap = new HashMap<>();
     lock.lock();
     try {
       long currentLatestID = lastTxID;
@@ -356,13 +361,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
         byte[] key = Longs.toByteArray(currentLatestID);
         DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
             entry.getKey(), entry.getValue());
-        deleteTransactionsMap.put(entry.getKey(), currentLatestID);
         batch.put(key, tx.toByteArray());
       }
       lastTxID = currentLatestID;
       batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
       deletedStore.writeBatch(batch);
-      return deleteTransactionsMap;
     } finally {
       lock.unlock();
     }
@@ -376,10 +379,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
   }
 
   @Override
-  public void getTransactions(DatanodeDeletedBlockTransactions transactions)
-      throws IOException {
+  public Map<Long, Long> getTransactions(
+      DatanodeDeletedBlockTransactions transactions) throws IOException {
     lock.lock();
     try {
+      Map<Long, Long> deleteTransactionMap = new HashMap<>();
       deletedStore.iterate(null, (key, value) -> {
         if (!Arrays.equals(LATEST_TXID, key)) {
           DeletedBlocksTransaction block = DeletedBlocksTransaction
@@ -388,6 +392,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
           if (block.getCount() > -1 && block.getCount() <= maxRetry) {
             if (transactions.addTransaction(block,
                 transactionToDNsCommitMap.get(block.getTxID()))) {
+              deleteTransactionMap.put(block.getContainerID(), block.getTxID());
               transactionToDNsCommitMap
                   .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
             }
@@ -396,6 +401,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
         }
         return true;
       });
+      return deleteTransactionMap;
     } finally {
       lock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
new file mode 100644
index 0000000..736daac
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+public class PendingDeleteHandler implements
+    EventHandler<PendingDeleteStatusList> {
+
+  private SCMBlockDeletingService scmBlockDeletingService;
+
+  public PendingDeleteHandler(
+      SCMBlockDeletingService scmBlockDeletingService) {
+    this.scmBlockDeletingService = scmBlockDeletingService;
+  }
+
+  @Override
+  public void onMessage(PendingDeleteStatusList pendingDeleteStatusList,
+      EventPublisher publisher) {
+    scmBlockDeletingService.handlePendingDeletes(pendingDeleteStatusList);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
new file mode 100644
index 0000000..904762d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class PendingDeleteStatusList {
+
+  private List<PendingDeleteStatus> pendingDeleteStatuses;
+  private DatanodeDetails datanodeDetails;
+
+  public PendingDeleteStatusList(DatanodeDetails datanodeDetails) {
+    this.datanodeDetails = datanodeDetails;
+    pendingDeleteStatuses = new LinkedList<>();
+  }
+
+  public void addPendingDeleteStatus(long dnDeleteTransactionId,
+      long scmDeleteTransactionId, long containerId) {
+    pendingDeleteStatuses.add(
+        new PendingDeleteStatus(dnDeleteTransactionId, scmDeleteTransactionId,
+            containerId));
+  }
+
+  public static class PendingDeleteStatus {
+    private long dnDeleteTransactionId;
+    private long scmDeleteTransactionId;
+    private long containerId;
+
+    public PendingDeleteStatus(long dnDeleteTransactionId,
+        long scmDeleteTransactionId, long containerId) {
+      this.dnDeleteTransactionId = dnDeleteTransactionId;
+      this.scmDeleteTransactionId = scmDeleteTransactionId;
+      this.containerId = containerId;
+    }
+
+    public long getDnDeleteTransactionId() {
+      return dnDeleteTransactionId;
+    }
+
+    public long getScmDeleteTransactionId() {
+      return scmDeleteTransactionId;
+    }
+
+    public long getContainerId() {
+      return containerId;
+    }
+
+  }
+
+  public List<PendingDeleteStatus> getPendingDeleteStatuses() {
+    return pendingDeleteStatuses;
+  }
+
+  public int getNumPendingDeletes() {
+    return pendingDeleteStatuses.size();
+  }
+
+  public DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 6f65fdd..699fd37 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -56,7 +57,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
  */
 public class SCMBlockDeletingService extends BackgroundService {
 
-  static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(SCMBlockDeletingService.class);
 
   // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
@@ -106,6 +107,19 @@ public class SCMBlockDeletingService extends BackgroundService {
     return queue;
   }
 
+  public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
+    DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
+    for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus : deletionStatusList
+        .getPendingDeleteStatuses()) {
+      LOG.info(
+          "Block deletion txnID mismatch in datanode {} for containerID {}."
+              + " Datanode delete txnID: {}, SCM txnID: {}",
+          dnDetails.getUuid(), deletionStatus.getContainerId(),
+          deletionStatus.getDnDeleteTransactionId(),
+          deletionStatus.getScmDeleteTransactionId());
+    }
+  }
+
   private class DeletedBlockTransactionScanner
       implements BackgroundTask<EmptyTaskResult> {
 
@@ -123,11 +137,12 @@ public class SCMBlockDeletingService extends BackgroundService {
       LOG.debug("Running DeletedBlockTransactionScanner");
       DatanodeDeletedBlockTransactions transactions = null;
       List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+      Map<Long, Long> transactionMap = null;
       if (datanodes != null) {
         transactions = new DatanodeDeletedBlockTransactions(mappingService,
             blockDeleteLimitSize, datanodes.size());
         try {
-          deletedBlockLog.getTransactions(transactions);
+          transactionMap = deletedBlockLog.getTransactions(transactions);
         } catch (IOException e) {
           // We may tolerant a number of failures for sometime
           // but if it continues to fail, at some point we need to raise
@@ -159,6 +174,7 @@ public class SCMBlockDeletingService extends BackgroundService {
                     transactions.getTransactionIDList(dnId)));
           }
         }
+        mappingService.updateDeleteTransactionId(transactionMap);
       }
 
       if (dnTxCount > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index d84551a..863d6c5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
@@ -43,6 +45,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseException;
 import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.utils.BatchOperation;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
@@ -86,6 +89,7 @@ public class ContainerMapping implements Mapping {
   private final LeaseManager<ContainerInfo> containerLeaseManager;
   private final float containerCloseThreshold;
   private final ContainerCloser closer;
+  private final EventPublisher eventPublisher;
   private final long size;
 
   /**
@@ -128,6 +132,7 @@ public class ContainerMapping implements Mapping {
         OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
     this.containerStateManager =
         new ContainerStateManager(conf, this);
+    LOG.trace("Container State Manager created.");
 
     this.pipelineSelector = new PipelineSelector(nodeManager,
         containerStateManager, conf, eventPublisher);
@@ -135,7 +140,7 @@ public class ContainerMapping implements Mapping {
     this.containerCloseThreshold = conf.getFloat(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
-    LOG.trace("Container State Manager created.");
+    this.eventPublisher = eventPublisher;
 
     long containerCreationLeaseTimeout = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
@@ -398,8 +403,13 @@ public class ContainerMapping implements Mapping {
    */
   public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
       throws IOException {
+    if (deleteTransactionMap == null) {
+      return;
+    }
+
     lock.lock();
     try {
+      BatchOperation batch = new BatchOperation();
       for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
         long containerID = entry.getKey();
         byte[] dbKey = Longs.toByteArray(containerID);
@@ -413,10 +423,11 @@ public class ContainerMapping implements Mapping {
         ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
             HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
         containerInfo.updateDeleteTransactionId(entry.getValue());
-        containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
-        containerStateManager
-            .updateDeleteTransactionId(containerID, entry.getValue());
+        batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
       }
+      containerStore.writeBatch(batch);
+      containerStateManager
+          .updateDeleteTransactionId(deleteTransactionMap);
     } finally {
       lock.unlock();
     }
@@ -484,7 +495,8 @@ public class ContainerMapping implements Mapping {
       throws IOException {
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
         containerInfos = reports.getReportsList();
-
+    PendingDeleteStatusList pendingDeleteStatusList =
+        new PendingDeleteStatusList(datanodeDetails);
     for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
         containerInfos) {
       byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
@@ -498,6 +510,14 @@ public class ContainerMapping implements Mapping {
           HddsProtos.SCMContainerInfo newState =
               reconcileState(datanodeState, knownState, datanodeDetails);
 
+          if (knownState.getDeleteTransactionId() > datanodeState
+              .getDeleteTransactionId()) {
+            pendingDeleteStatusList
+                .addPendingDeleteStatus(datanodeState.getDeleteTransactionId(),
+                    knownState.getDeleteTransactionId(),
+                    knownState.getContainerID());
+          }
+
           // FIX ME: This can be optimized, we write twice to memory, where a
           // single write would work well.
           //
@@ -529,6 +549,11 @@ public class ContainerMapping implements Mapping {
         lock.unlock();
       }
     }
+    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+      eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+          pendingDeleteStatusList);
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index f0ab213..6b983a6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -47,6 +47,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -360,13 +361,14 @@ public class ContainerStateManager implements Closeable {
   /**
    * Update deleteTransactionId for a container.
    *
-   * @param containerID ContainerID of the container whose delete
-   *                    transactionId needs to be updated.
-   * @param transactionId latest transactionId to be updated for the container
+   * @param deleteTransactionMap maps containerId to its new
+   *                             deleteTransactionID
    */
-  public void updateDeleteTransactionId(Long containerID, long transactionId) {
-    containers.getContainerMap().get(ContainerID.valueof(containerID))
-        .updateDeleteTransactionId(transactionId);
+  public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) {
+    for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
+      containers.getContainerMap().get(ContainerID.valueof(entry.getKey()))
+          .updateDeleteTransactionId(entry.getValue());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 70b1e96..5911ce2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .CloseContainerStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
@@ -147,6 +148,14 @@ public final class SCMEvents {
           "DeleteBlockCommandStatus");
 
   /**
+   * This event will be triggered while processing container reports from DN
+   * when deleteTransactionID of container in report mismatches with the
+   * deleteTransactionID on SCM.
+   */
+  public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
+      new TypedEvent(PendingDeleteStatusList.class, "PendingDeleteStatus");
+
+  /**
    * This is the command for ReplicationManager to handle under/over
    * replication. Sent by the ContainerReportHandler after processing the
    * heartbeat.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 47a9100..178e2bd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
@@ -219,6 +220,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
+    PendingDeleteHandler pendingDeleteHandler =
+        new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
 
     ContainerReportHandler containerReportHandler =
         new ContainerReportHandler(scmContainerManager, node2ContainerMap,
@@ -235,6 +238,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
     eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
+    eventQueue
+        .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 7049029..2beb4e7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -107,31 +107,6 @@ public class TestBlockManager {
   }
 
   @Test
-  public void testDeleteBlock() throws Exception {
-    AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
-        type, factor, containerOwner);
-    Assert.assertNotNull(block);
-    long transactionId =
-        mapping.getContainer(block.getBlockID().getContainerID())
-            .getDeleteTransactionId();
-    Assert.assertEquals(0, transactionId);
-    blockManager.deleteBlocks(Collections.singletonList(
-        block.getBlockID()));
-    Assert.assertEquals(++transactionId,
-        mapping.getContainer(block.getBlockID().getContainerID())
-            .getDeleteTransactionId());
-
-    block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
-        type, factor, containerOwner);
-    Assert.assertNotNull(block);
-    blockManager.deleteBlocks(Collections.singletonList(
-        block.getBlockID()));
-    Assert.assertEquals(++transactionId,
-        mapping.getContainer(block.getBlockID().getContainerID())
-            .getDeleteTransactionId());
-  }
-
-  @Test
   public void testAllocateOversizedBlock() throws IOException {
     long size = 6 * GB;
     thrown.expectMessage("Unsupported block size");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index ee9aed2..badd435 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -23,6 +23,11 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -43,6 +48,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.ozShell.TestOzoneShell;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.utils.MetadataStore;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -101,7 +107,7 @@ public class TestBlockDeletion {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
 
-    String value = RandomStringUtils.random(1000000);
+    String value = RandomStringUtils.random(10000000);
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
@@ -111,7 +117,9 @@ public class TestBlockDeletion {
 
     OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
         ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
-    out.write(value.getBytes());
+    for (int i = 0; i < 100; i++) {
+      out.write(value.getBytes());
+    }
     out.close();
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
@@ -144,6 +152,49 @@ public class TestBlockDeletion {
     Assert.assertTrue(!containerIdsWithDeletedBlocks.isEmpty());
     // Containers in the DN and SCM should have same delete transactionIds
     matchContainerTransactionIds();
+    // Containers in the DN and SCM should have same delete transactionIds
+    // after DN restart. The assertion is just to verify that the state of
+    // containerInfos in dn and scm is consistent after dn restart.
+    cluster.restartHddsDatanode(0);
+    matchContainerTransactionIds();
+
+    // verify PENDING_DELETE_STATUS event is fired
+    verifyBlockDeletionEvent();
+  }
+
+  private void verifyBlockDeletionEvent()
+      throws IOException, InterruptedException {
+    LogCapturer logCapturer =
+        LogCapturer.captureLogs(SCMBlockDeletingService.LOG);
+    // Create dummy container reports with deleteTransactionId set as 0
+    ContainerReportsProto containerReport = dnContainerSet.getContainerReport();
+    ContainerReportsProto.Builder dummyReportsBuilder =
+        ContainerReportsProto.newBuilder();
+    for (ContainerInfo containerInfo : containerReport.getReportsList()) {
+      dummyReportsBuilder.addReports(
+          ContainerInfo.newBuilder(containerInfo).setDeleteTransactionId(0)
+              .build());
+    }
+    ContainerReportsProto dummyReport = dummyReportsBuilder.build();
+
+    logCapturer.clearOutput();
+    scm.getScmContainerManager().processContainerReports(
+        cluster.getHddsDatanodes().get(0).getDatanodeDetails(), dummyReport);
+    // wait for event to be handled by event handler
+    Thread.sleep(1000);
+    String output = logCapturer.getOutput();
+    for (ContainerInfo containerInfo : dummyReport.getReportsList()) {
+      long containerId = containerInfo.getContainerID();
+      // Event should be triggered only for containers which have deleted blocks
+      if (containerIdsWithDeletedBlocks.contains(containerId)) {
+        Assert.assertTrue(output.contains(
+            "for containerID " + containerId + ". Datanode delete txnID"));
+      } else {
+        Assert.assertTrue(!output.contains(
+            "for containerID " + containerId + ". Datanode delete txnID"));
+      }
+    }
+    logCapturer.clearOutput();
   }
 
   private void matchContainerTransactionIds() throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org