You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "xichen01 (via GitHub)" <gi...@apache.org> on 2023/06/27 17:06:05 UTC

[GitHub] [ozone] xichen01 opened a new pull request, #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

xichen01 opened a new pull request, #4988:
URL: https://github.com/apache/ozone/pull/4988

   ## What changes were proposed in this pull request?
   
   Currently SCM will send a duplicate `DeletedBlocksTransaction` to the specify DN if the DN not report the transactions have been finish by the Heartbeat. So if the `DeleteBlocksCommandHandler` Thread of a DN was Blocked cause by some reason (Such as wait Container lock) the SCM will send a duplicate `DeletedBlocksTransaction` to this DN.
   This PR is used to avoid this issue by  status management of SCM's `DeleteBlocksCommand`
   
   ## Summary
   ### The Status of `DeleteBlocksCommand`
   ```java
    public enum CmdStatus {
       // The DeleteBlocksCommand has not yet been sent.
       // This is the initial status of the command after it's created.
       TO_BE_SENT,
       // This status indicates that the DeleteBlocksCommand has been sent
       // to the DataNode, but the Datanode has not reported any new status
       // for the DeleteBlocksCommand.
       SENT,
       // The DeleteBlocksCommand has been received by Datanode and
       // is waiting for executed.
       PENDING_EXECUTED,
       // The DeleteBlocksCommand was executed, and the execution was successful
       EXECUTED,
       // The DeleteBlocksCommand was executed but failed to execute,
       // or was lost before it was executed.
       NEED_RESEND
     }
   ```
   
   ### State Transfer
   TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM, The follow-up status has not been updated by Datanode.
   
   SENT -> PENDING_EXECUTED: The DeleteBlocksCommand is sent and received by the Datanode, but the command is not executed by the  Datanode, the command is waiting to be executed.
   SENT -> NEED_RESEND: The DeleteBlocksCommand is sent and lost before it is received by the DN.
   SENT -> EXECUTED: The DeleteBlocksCommand has been sent to Datanode, executed by DN, and executed successfully.
   
   
   PENDING_EXECUTED -> PENDING_EXECUTED: The DeleteBlocksCommand continues to wait to be executed by Datanode.
   PENDING_EXECUTED -> NEED_RESEND: The DeleteBlocksCommand waited fora while and was executed, but the execution failed;Or the DeleteBlocksCommand was lost while waiting(such as the Datanode restart).
   
   PENDING_EXECUTED -> EXECUTED: The Command waits for a period of time on the DN and is executed successfully.
   
   ### State transition diagram
   ```mermaid
   stateDiagram-v2
       TO_BE_SENT
       TO_BE_SENT --> SENT
       SENT --> PENDING_EXECUTED
       SENT --> NEED_RESEND
       PENDING_EXECUTED --> PENDING_EXECUTED
       PENDING_EXECUTED --> NEED_RESEND
       PENDING_EXECUTED --> EXECUTED
       SENT --> EXECUTED
   
   ```
   
   ### DeleteBlocksCommand resent
   The `DeleteBlocksCommand` on the `TO_BE_SENT, SENT, PENDING_EXECUTED, EXECUTED` will not be resent by SCM. only the `DeleteBlocksCommand` on the `NEED_RESEND ` Status will be resent.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-8882
   
   
   Please replace this section with the link to the Apache JIRA)
   
   ## How was this patch tested?
   integration test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1751966551

   > @xichen01 I tried to understand the changes and impact, I am just getting lost in code, My Understanding, Here, State Machine to avoid duplicate sending for DeleteBlock command but can not find any good action for these state.
   > 
   > TO_BE_SENT: initial state NEED_EXECUTED, EXECUTED: just removed on timeout, no other action on keep these PENDING_EXECUTED, SENT: just to avoid retry, and on timeout, remove
   > 
   > Do we really need so many states? Or just, "INTIAL" & "SENT", and cleanup on timeout or executed.
   > 
   > * One improvement can see from this PR that next command, it includes new set of blocks (even current blocks are not yet executed).
   > 
   > I think we should have refactored code including transactionToDNsCommitMap, transactionToRetryCountMap to take advantage of these state management and simplified the code.
   
   
   OK, I'll try to compress some of the state to reduce the complexity of the code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1349663858


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -437,15 +350,26 @@ public DatanodeDeletedBlockTransactions getTransactions(
       throws IOException {
     lock.lock();
     try {
+      // Here we can clean up the Datanode timeout command that no longer
+      // reports heartbeats
+      getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(

Review Comment:
   - "SCMBlockDeletingService timeout": is `SCMBlockDeletingService` itself timeout, When the `SCMBlockDeletingService` thread is stuck, it may trigger the
   
   - `scmCommandTimeoutMs`: is the `scmCommand` is the maximum time allowed without an update, is used to prevent a commands from being lost in the process of sending them to the DN, we can resend them after a timeout.
   `scmCommand` can be update by DN Heartbeat, If no DN reports the status of a `scmCommand` for 300s, then the `scmCommand` will be cleaned up due to a timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1411768034


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -471,9 +371,10 @@ public DatanodeDeletedBlockTransactions getTransactions(
               if (checkInadequateReplica(replicas, txn)) {
                 continue;
               }
-              getTransaction(txn, transactions, dnList, replicas);
-              transactionToDNsCommitMap
-                  .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
+              getTransaction(
+                  txn, transactions, dnList, replicas, commandStatus);
+              getSCMDeletedBlockTransactionStatusManager().

Review Comment:
   Done.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java:
##########
@@ -179,13 +178,16 @@ public EmptyTaskResult call() throws Exception {
             UUID dnId = entry.getKey();
             List<DeletedBlocksTransaction> dnTXs = entry.getValue();
             if (!dnTXs.isEmpty()) {
-              processedTxIDs.addAll(dnTXs.stream()
+              Set<Long> dnTxSet = dnTXs.stream()
                   .map(DeletedBlocksTransaction::getTxID)
-                  .collect(Collectors.toSet()));
-              SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
+                  .collect(Collectors.toSet());
+              processedTxIDs.addAll(dnTxSet);
+              DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
               command.setTerm(scmContext.getTermOfLeader());
               eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
                   new CommandForDatanode<>(dnId, command));
+              deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()

Review Comment:
   There may indeed be a thread-safety issue here, fixed.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+
+          // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+          // executed by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+      case FAILED:
+        if (oldStatus == SENT) {
+          // Once the DN executes DeleteBlocksCommands, regardless of whether
+          // DeleteBlocksCommands is executed successfully or not,
+          // it will be deleted from record.
+          // Successful DeleteBlocksCommands are recorded in
+          // `transactionToDNsCommitMap`.
+          removeScmCommand(dnId, scmCmdId);
+          changed = true;
+        }
+        break;
+      default:
+        LOG.error("Can not update to Unknown new Status: {}", newStatus);
+        break;
+      }
+      if (!changed) {
+        LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+            "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+      } else {
+        LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" +
+            " {}", dnId, scmCmdId, oldStatus, newStatus);
+      }
+    }
+
+    private void removeTimeoutScmCommand(UUID dnId,
+        Set<Long> scmCmdIds, long timeoutMs) {
+      Instant now = Instant.now();
+      for (Long scmCmdId : scmCmdIds) {
+        Instant updateTime = getUpdateTime(dnId, scmCmdId);
+        if (updateTime != null &&
+            Duration.between(updateTime, now).toMillis() > timeoutMs) {
+          CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+          LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+              "after without update {}ms}", state, dnId, timeoutMs);
+        } else {
+          LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+              "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+        }
+      }
+    }
+
+    private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      CmdStatusData statusData = record.remove(scmCmdId);
+      LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+      return statusData;
+    }
+
+    public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+        Set<UUID> dnIds) {
+      Map<UUID, Map<Long, CmdStatus>> result =
+          new HashMap<>(scmCmdStatusRecord.size());
+
+      for (UUID dnId : dnIds) {
+        Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+        if (record == null) {
+          continue;
+        }
+        Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+        for (CmdStatusData statusData : record.values()) {
+          CmdStatus status = statusData.getStatus();
+          for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+            dnStatusMap.put(deletedBlocksTxId, status);
+          }
+        }
+        result.put(dnId, dnStatusMap);
+      }
+
+      return result;
+    }
+
+    private void clear() {
+      scmCmdStatusRecord.clear();
+    }
+
+    @VisibleForTesting
+    Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+      return scmCmdStatusRecord;
+    }
+  }
+
+  public void incrementRetryCount(List<Long> txIDs, long maxRetry)
+      throws IOException {
+    ArrayList<Long> txIDsToUpdate = new ArrayList<>();
+    for (Long txID : txIDs) {
+      int currentCount =
+          transactionToRetryCountMap.getOrDefault(txID, 0);
+      if (currentCount > maxRetry) {
+        continue;
+      } else {
+        currentCount += 1;
+        if (currentCount > maxRetry) {
+          txIDsToUpdate.add(txID);
+        }
+        transactionToRetryCountMap.put(txID, currentCount);
+      }
+    }
+
+    if (!txIDsToUpdate.isEmpty()) {
+      deletedBlockLogStateManager
+          .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
+    }
+  }
+
+  public void resetRetryCount(List<Long> txIDs) throws IOException {
+    for (Long txID: txIDs) {
+      transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
+    }
+  }
+
+  public int getOrDefaultRetryCount(long txID, int defaultValue) {
+    return transactionToRetryCountMap.getOrDefault(txID, defaultValue);
+  }
+
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    scmDeleteBlocksCommandStatusManager.onSent(
+        dnId.getUuid(), scmCommand.getId());
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+  }
+
+  public void recordTransactionCreated(

Review Comment:
   Has merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1411723842


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {

Review Comment:
   If `oldStatus == SENT`, the update status(`SENT->SENT`) will only be used to refresh the `updateTime` to prevent the command from timing out.
   ```java
         public void setStatus(CmdStatus status) {
           this.updateTime = Instant.now();
           this.status = status;
         }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1409370513


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,

Review Comment:
   extra comma for enum, may not be compiling, please check



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {

Review Comment:
   oldStatus == SENT not required, as status is already SENT



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java:
##########
@@ -179,13 +178,16 @@ public EmptyTaskResult call() throws Exception {
             UUID dnId = entry.getKey();
             List<DeletedBlocksTransaction> dnTXs = entry.getValue();
             if (!dnTXs.isEmpty()) {
-              processedTxIDs.addAll(dnTXs.stream()
+              Set<Long> dnTxSet = dnTXs.stream()
                   .map(DeletedBlocksTransaction::getTxID)
-                  .collect(Collectors.toSet()));
-              SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
+                  .collect(Collectors.toSet());
+              processedTxIDs.addAll(dnTxSet);
+              DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
               command.setTerm(scmContext.getTermOfLeader());
               eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
                   new CommandForDatanode<>(dnId, command));
+              deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()

Review Comment:
   move this before fireEvent so that its marked to_be_sent, and in multi-thread, onSent notification will be handled.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -471,9 +371,10 @@ public DatanodeDeletedBlockTransactions getTransactions(
               if (checkInadequateReplica(replicas, txn)) {
                 continue;
               }
-              getTransaction(txn, transactions, dnList, replicas);
-              transactionToDNsCommitMap
-                  .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
+              getTransaction(
+                  txn, transactions, dnList, replicas, commandStatus);
+              getSCMDeletedBlockTransactionStatusManager().

Review Comment:
   this can be removed and be added as part while recordTransactionCreated() before fire event.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+
+          // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+          // executed by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+      case FAILED:
+        if (oldStatus == SENT) {
+          // Once the DN executes DeleteBlocksCommands, regardless of whether
+          // DeleteBlocksCommands is executed successfully or not,
+          // it will be deleted from record.
+          // Successful DeleteBlocksCommands are recorded in
+          // `transactionToDNsCommitMap`.
+          removeScmCommand(dnId, scmCmdId);
+          changed = true;
+        }
+        break;
+      default:
+        LOG.error("Can not update to Unknown new Status: {}", newStatus);
+        break;
+      }
+      if (!changed) {
+        LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+            "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+      } else {
+        LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" +
+            " {}", dnId, scmCmdId, oldStatus, newStatus);
+      }
+    }
+
+    private void removeTimeoutScmCommand(UUID dnId,
+        Set<Long> scmCmdIds, long timeoutMs) {
+      Instant now = Instant.now();
+      for (Long scmCmdId : scmCmdIds) {
+        Instant updateTime = getUpdateTime(dnId, scmCmdId);
+        if (updateTime != null &&
+            Duration.between(updateTime, now).toMillis() > timeoutMs) {
+          CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+          LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+              "after without update {}ms}", state, dnId, timeoutMs);
+        } else {
+          LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+              "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+        }
+      }
+    }
+
+    private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      CmdStatusData statusData = record.remove(scmCmdId);
+      LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+      return statusData;
+    }
+
+    public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+        Set<UUID> dnIds) {
+      Map<UUID, Map<Long, CmdStatus>> result =
+          new HashMap<>(scmCmdStatusRecord.size());
+
+      for (UUID dnId : dnIds) {
+        Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+        if (record == null) {
+          continue;
+        }
+        Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+        for (CmdStatusData statusData : record.values()) {
+          CmdStatus status = statusData.getStatus();
+          for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+            dnStatusMap.put(deletedBlocksTxId, status);
+          }
+        }
+        result.put(dnId, dnStatusMap);
+      }
+
+      return result;
+    }
+
+    private void clear() {
+      scmCmdStatusRecord.clear();
+    }
+
+    @VisibleForTesting
+    Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+      return scmCmdStatusRecord;
+    }
+  }
+
+  public void incrementRetryCount(List<Long> txIDs, long maxRetry)
+      throws IOException {
+    ArrayList<Long> txIDsToUpdate = new ArrayList<>();
+    for (Long txID : txIDs) {
+      int currentCount =
+          transactionToRetryCountMap.getOrDefault(txID, 0);
+      if (currentCount > maxRetry) {
+        continue;
+      } else {
+        currentCount += 1;
+        if (currentCount > maxRetry) {
+          txIDsToUpdate.add(txID);
+        }
+        transactionToRetryCountMap.put(txID, currentCount);
+      }
+    }
+
+    if (!txIDsToUpdate.isEmpty()) {
+      deletedBlockLogStateManager
+          .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
+    }
+  }
+
+  public void resetRetryCount(List<Long> txIDs) throws IOException {
+    for (Long txID: txIDs) {
+      transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
+    }
+  }
+
+  public int getOrDefaultRetryCount(long txID, int defaultValue) {
+    return transactionToRetryCountMap.getOrDefault(txID, defaultValue);
+  }
+
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    scmDeleteBlocksCommandStatusManager.onSent(
+        dnId.getUuid(), scmCommand.getId());
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+  }
+
+  public void recordTransactionCreated(

Review Comment:
   recordTransactionCreated and recordTransactionCommitted can be merged to single call, as they are done to record trx pending to be send. r



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+
+          // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+          // executed by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+      case FAILED:
+        if (oldStatus == SENT) {

Review Comment:
   what if status is TO_BE_SENT? I think in this case, default remove it in all state as this is final state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1349657448


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   *
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      Map<Long, Integer> transactionToRetryCountMap,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = transactionToRetryCountMap;
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT, PENDING_EXECUTED));
+    private static final Set<CmdStatus> FINIAL_STATUSES = new HashSet<>(
+        Arrays.asList(EXECUTED, NEED_RESEND));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // This status indicates that the DeleteBlocksCommand has been sent
+      // to the DataNode, but the Datanode has not reported any new status
+      // for the DeleteBlocksCommand.
+      SENT,
+      // The DeleteBlocksCommand has been received by Datanode and
+      // is waiting for executed.
+      PENDING_EXECUTED,
+      // The DeleteBlocksCommand was executed, and the execution was successful
+      EXECUTED,
+      // The DeleteBlocksCommand was executed but failed to execute,
+      // or was lost before it was executed.
+      NEED_RESEND
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, SENT);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      CmdStatus status = fromProtoCommandStatus(newState);
+      if (status != null) {
+        updateStatus(dnId, scmCmdId, status);
+      }
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    protected void cleanSCMCommandForDn(UUID dnId, long timeoutMs) {
+      cleanTimeoutSCMCommand(dnId, timeoutMs);
+      cleanFinalStatusSCMCommand(dnId);
+    }
+
+    private void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private void cleanFinalStatusSCMCommand(UUID dnId) {
+      for (CmdStatus status : FINIAL_STATUSES) {
+        for (Long scmCmdId : getScmCommandIds(dnId, status)) {
+          CmdStatusData stateData = removeScmCommand(dnId, scmCmdId);
+          LOG.debug("Clean SCMCommand status: {} for DN: {}, stateData: {}",
+              status, dnId, stateData);
+        }
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId, CmdStatus newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+
+      switch (newStatus) {
+      case SENT:
+        if (oldStatus == TO_BE_SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case PENDING_EXECUTED:
+        if (oldStatus == SENT || oldStatus == PENDING_EXECUTED) {
+          // SENT -> PENDING_EXECUTED: The DeleteBlocksCommand is sent and
+          // received by the Datanode, but the command is not executed by the
+          // Datanode, the command is waiting to be executed.
+
+          // PENDING_EXECUTED -> PENDING_EXECUTED: The DeleteBlocksCommand
+          // continues to wait to be executed by Datanode.
+          statusData.setStatus(PENDING_EXECUTED);
+          changed = true;
+        }
+        break;
+      case NEED_RESEND:
+        if (oldStatus == SENT || oldStatus == PENDING_EXECUTED) {
+          // SENT -> NEED_RESEND: The DeleteBlocksCommand is sent and lost
+          // before it is received by the DN.
+
+          // PENDING_EXECUTED -> NEED_RESEND: The DeleteBlocksCommand waited for
+          // a while and was executed, but the execution failed;.
+          // Or the DeleteBlocksCommand was lost while waiting(such as the
+          // Datanode restart).
+          statusData.setStatus(NEED_RESEND);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+        if (oldStatus == SENT || oldStatus == PENDING_EXECUTED) {
+          // PENDING_EXECUTED -> EXECUTED: The Command waits for a period of
+          // time on the DN and is executed successfully.
+
+          // SENT -> EXECUTED: The DeleteBlocksCommand has been sent to
+          // Datanode, executed by DN, and executed successfully.
+          statusData.setStatus(EXECUTED);
+          changed = true;
+        }
+        break;
+      default:
+        LOG.error("Can not update to Unknown new Status: {}", newStatus);
+        break;
+      }
+      if (!changed) {
+        LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+            "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+      } else {
+        LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" +
+            " {}", dnId, scmCmdId, oldStatus, newStatus);
+      }
+    }
+
+    private void removeTimeoutScmCommand(UUID dnId,
+        Set<Long> scmCmdIds, long timeoutMs) {
+      Instant now = Instant.now();
+      for (Long scmCmdId : scmCmdIds) {
+        Instant updateTime = getUpdateTime(dnId, scmCmdId);
+        if (updateTime != null &&
+            Duration.between(updateTime, now).toMillis() > timeoutMs) {
+          updateStatus(dnId, scmCmdId, NEED_RESEND);
+          CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+          LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+              "after without update {}ms}", state, dnId, timeoutMs);
+        } else {
+          LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+              "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+        }
+      }
+    }
+
+    private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+
+      CmdStatus status = record.get(scmCmdId).getStatus();
+      if (!FINIAL_STATUSES.contains(status)) {
+        LOG.error("Cannot Remove ScmCommand {} Non-final Status {} for DN: {}" +
+            ". final Status {}", scmCmdId, status, dnId, FINIAL_STATUSES);
+        return null;
+      }
+
+      CmdStatusData statusData = record.remove(scmCmdId);
+      LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+      return statusData;
+    }
+
+    private static CmdStatus fromProtoCommandStatus(
+        CommandStatus.Status protoCmdStatus) {
+      switch (protoCmdStatus) {
+      case PENDING:
+        return CmdStatus.PENDING_EXECUTED;
+      case EXECUTED:
+        return CmdStatus.EXECUTED;
+      case FAILED:
+        return CmdStatus.NEED_RESEND;
+      default:
+        LOG.error("Unknown protoCmdStatus: {} cannot convert " +
+            "to ScmDeleteBlockCommandStatus", protoCmdStatus);
+        return null;
+      }
+    }
+
+    public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+        Set<UUID> dnIds) {
+      Map<UUID, Map<Long, CmdStatus>> result =
+          new HashMap<>(scmCmdStatusRecord.size());
+
+      for (UUID dnId : dnIds) {
+        Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+        if (record == null) {
+          continue;
+        }
+        Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+        for (CmdStatusData statusData : record.values()) {
+          CmdStatus status = statusData.getStatus();
+          for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+            dnStatusMap.put(deletedBlocksTxId, status);
+          }
+        }
+        result.put(dnId, dnStatusMap);
+      }
+
+      return result;
+    }
+
+    private void clear() {
+      scmCmdStatusRecord.clear();
+    }
+
+    @VisibleForTesting
+    Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+      return scmCmdStatusRecord;
+    }
+  }
+
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    scmDeleteBlocksCommandStatusManager.updateStatus(
+        dnId.getUuid(), scmCommand.getId(), SENT);
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+  }
+
+  public void recordTransactionCreated(
+      UUID dnId, long scmCmdId, Set<Long> dnTxSet) {
+    scmDeleteBlocksCommandStatusManager.recordScmCommand(
+        SCMDeleteBlocksCommandStatusManager
+            .createScmCmdStatusData(dnId, scmCmdId, dnTxSet));
+  }
+
+  public void recordTransactionCommitted(long txId) {
+    transactionToDNsCommitMap
+        .putIfAbsent(txId, new LinkedHashSet<>());
+  }
+
+  public void clear() {
+    scmDeleteBlocksCommandStatusManager.clear();
+    transactionToDNsCommitMap.clear();
+  }
+
+  public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+    scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs);
+  }
+
+  public void onDatanodeDead(UUID dnId) {
+    scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId);
+  }
+
+  public boolean isDuplication(DatanodeDetails dnDetail, long tx,
+      Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+    if (alreadyExecuted(dnDetail.getUuid(), tx)) {
+      return true;
+    }
+    return inProcessing(dnDetail.getUuid(), tx, commandStatus);

Review Comment:
   But we don't know which `scmCmdId` the `tx` is located in, and we may need to iterate through all the `scmCmdId`, which may be slow.  The time complexity will be O(n) , n is the number of `scmCmdId` recorded, `commandStatus`
   
   
   ```java
       private final Map<UUID,  Map<Long,  CmdStatusData>> scmCmdStatusRecord;
                  // Map<DnUUID, Map<ScmCmdid,  Set<deletedBlocksTxId>>
   ```
   
   `commandStatus` pre-calculates to get all the tx statuses.
   ```java
      Map<UUID, Map<Long, CmdStatus>> commandStatus
      // Map<DnUUID, Map<deletedBlocksTxId,  CmdStatus>
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1260494268


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java:
##########
@@ -515,6 +516,17 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
           commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
       List<SCMCommand> commands =
           commandQueue.getCommand(datanodeDetails.getUuid());
+
+      // Update the SCMCommand of deleteBlocksCommand Status
+      for (SCMCommand command : commands) {

Review Comment:
   can have a sentTriggerMap, like
   Map<CommandType, ICommandSend> sendCommandNotifyMap;
   ICommandSend {
       void notify(List<SCMComand);
   }
   
   register(CommandType, ICommandSend handler) {
       sendCommanNotifyMap.add(DELELTEBlockCommand, handler);
   }
   
   handler can have above logic to update status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN [ozone]

Posted by "ashishkumar50 (via GitHub)" <gi...@apache.org>.
ashishkumar50 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1794330687

   @xichen01, Thanks for working on this. 
   Whether the latest comments given by @sumitagrawl is addressed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] github-actions[bot] commented on pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1609930581

   No such command. `/` Available commands:
    * /**close** : Close pending pull request temporary
    * /**help** : Show all the available comment commands
    * /**label** : add new label to the issue: `/label <label>`
    * /**pending** : Add a REQUESTED_CHANGE type review to mark issue non-mergeable: `/pending <reason>`
    * /**ready** : Dismiss all the blocking reviews by github-actions bot
    * /**retest** : provide help on how to trigger new CI build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1411743155


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+
+          // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+          // executed by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+      case FAILED:
+        if (oldStatus == SENT) {
+          // Once the DN executes DeleteBlocksCommands, regardless of whether
+          // DeleteBlocksCommands is executed successfully or not,
+          // it will be deleted from record.
+          // Successful DeleteBlocksCommands are recorded in
+          // `transactionToDNsCommitMap`.
+          removeScmCommand(dnId, scmCmdId);
+          changed = true;
+        }
+        break;
+      default:
+        LOG.error("Can not update to Unknown new Status: {}", newStatus);
+        break;
+      }
+      if (!changed) {
+        LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+            "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+      } else {
+        LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" +
+            " {}", dnId, scmCmdId, oldStatus, newStatus);
+      }
+    }
+
+    private void removeTimeoutScmCommand(UUID dnId,
+        Set<Long> scmCmdIds, long timeoutMs) {
+      Instant now = Instant.now();
+      for (Long scmCmdId : scmCmdIds) {
+        Instant updateTime = getUpdateTime(dnId, scmCmdId);
+        if (updateTime != null &&
+            Duration.between(updateTime, now).toMillis() > timeoutMs) {
+          CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+          LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+              "after without update {}ms}", state, dnId, timeoutMs);
+        } else {
+          LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+              "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+        }
+      }
+    }
+
+    private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      CmdStatusData statusData = record.remove(scmCmdId);
+      LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+      return statusData;
+    }
+
+    public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+        Set<UUID> dnIds) {
+      Map<UUID, Map<Long, CmdStatus>> result =
+          new HashMap<>(scmCmdStatusRecord.size());
+
+      for (UUID dnId : dnIds) {
+        Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+        if (record == null) {
+          continue;
+        }
+        Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+        for (CmdStatusData statusData : record.values()) {
+          CmdStatus status = statusData.getStatus();
+          for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+            dnStatusMap.put(deletedBlocksTxId, status);
+          }
+        }
+        result.put(dnId, dnStatusMap);
+      }
+
+      return result;
+    }
+
+    private void clear() {
+      scmCmdStatusRecord.clear();
+    }
+
+    @VisibleForTesting
+    Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+      return scmCmdStatusRecord;
+    }
+  }
+
+  public void incrementRetryCount(List<Long> txIDs, long maxRetry)
+      throws IOException {
+    ArrayList<Long> txIDsToUpdate = new ArrayList<>();
+    for (Long txID : txIDs) {
+      int currentCount =
+          transactionToRetryCountMap.getOrDefault(txID, 0);
+      if (currentCount > maxRetry) {
+        continue;
+      } else {
+        currentCount += 1;
+        if (currentCount > maxRetry) {
+          txIDsToUpdate.add(txID);
+        }
+        transactionToRetryCountMap.put(txID, currentCount);
+      }
+    }
+
+    if (!txIDsToUpdate.isEmpty()) {
+      deletedBlockLogStateManager
+          .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
+    }
+  }
+
+  public void resetRetryCount(List<Long> txIDs) throws IOException {
+    for (Long txID: txIDs) {
+      transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
+    }
+  }
+
+  public int getOrDefaultRetryCount(long txID, int defaultValue) {
+    return transactionToRetryCountMap.getOrDefault(txID, defaultValue);
+  }
+
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    scmDeleteBlocksCommandStatusManager.onSent(
+        dnId.getUuid(), scmCommand.getId());
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+  }
+
+  public void recordTransactionCreated(

Review Comment:
   - `recordTransactionCreated`: This is used to record that a transaction has been created by `SCMBlockDeletingService`, and that SCM has not yet sent the transaction to the DN at the time it was called.
   - `recordTransactionCommitted`: This is used to record that the transaction has been successfully executed by the DN, this is called after the DN has successfully received the deletion transaction and reported it to the SCM.
   
   So it's hard to merge the two



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1421502127


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java:
##########
@@ -88,14 +84,11 @@ void incrementCount(List<Long> txIDs)
   int resetCount(List<Long> txIDs) throws IOException;
 
   /**
-   * Commits a transaction means to delete all footprints of a transaction
-   * from the log. This method doesn't guarantee all transactions can be
-   * successfully deleted, it tolerate failures and tries best efforts to.
-   *  @param transactionResults - delete block transaction results.
-   * @param dnID - ID of datanode which acknowledges the delete block command.
+   * Get SCMDeletedBlockTransactionStatusManager.
+   * @return an Object of SCMDeletedBlockTransactionStatusManager
    */
-  void commitTransactions(List<DeleteBlockTransactionResult> transactionResults,
-      UUID dnID);
+  SCMDeletedBlockTransactionStatusManager
+      getSCMDeletedBlockTransactionStatusManager();

Review Comment:
   Removed the `getSCMDeletedBlockTransactionStatusManager` interface from `DeletedBlockLog` and added `DeletedBlockTransactionStatusManager` related actions to `DeletedBlockLog`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] xichen01 commented on a diff in pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1260094418


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =

Review Comment:
   Previously we have `transactionToRetryCountMap` and `transactionToDNsCommitMap` to manage `DeletedBlocksTransaction`, which is equivalent to `DeletedBlocksTransaction` has two statuses: Committed and Uncommitted.
   The `SCMDeleteBlocksCommandStatusManager` in this PR extends the status of the `DeletedBlocksTransaction`. This lets us know if the `DeletedBlocksTransaction` is "Not yet sent to DN" or "Sent to DN but not yet processed by DN", etc.
   Maybe `transactionToDNsCommitMap` and `transactionToRetryCountMap` could be merged here as well, but I'm not sure if I want to implement it in this PR.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java:
##########
@@ -78,9 +78,9 @@ protected CommandStatusReportsProto getReport() {
       // If status is still pending then don't remove it from map as
       // CommandHandler will change its status when it works on this command.
       if (!cmdStatus.getStatus().equals(Status.PENDING)) {
-        builder.addCmdStatus(cmdStatus.getProtoBufMessage());

Review Comment:
   If we only report some `PENDIN` status of `DeleteBlocksCommand` to SCM, in `DeletedBlockLogImpl#onMessage()` on the SCM side, `commitSCMCommandStatus` will be executed to update the `SCMDeleteBlocksCommandStatus`, while `commitTransactions` will not be executed.
   So the impact is that `DeletedBlockLogImpl#onMessage()` will be executed more times.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+
+  /**
+   * Status of SCMDeleteBlocksCommand.
+   */
+  public enum CmdStatus {
+    // The DeleteBlocksCommand has not yet been sent.
+    // This is the initial status of the command after it's created.
+    TO_BE_SENT,
+    // This status indicates that the DeleteBlocksCommand has been sent
+    // to the DataNode, but the Datanode has not reported any new status
+    // for the DeleteBlocksCommand.
+    SENT,
+    // The DeleteBlocksCommand has been received by Datanode and
+    // is waiting for executed.
+    PENDING_EXECUTED,
+    // The DeleteBlocksCommand was executed, and the execution was successful
+    EXECUTED,
+    // The DeleteBlocksCommand was executed but failed to execute,
+    // or was lost before it was executed.
+    NEED_RESEND
+  }
+
+  private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+
+  private final Set<CmdStatus> statusesRequiringTimeout = new HashSet<>(
+      Arrays.asList(SENT, PENDING_EXECUTED));
+  private final Set<CmdStatus> finialStatuses = new HashSet<>(
+      Arrays.asList(EXECUTED, NEED_RESEND));
+  private final Set<CmdStatus> failedStatuses = new HashSet<>(
+      Arrays.asList(NEED_RESEND));
+
+  private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+  public SCMDeleteBlocksCommandStatusManager() {
+    this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+  }
+
+  public static CmdStatusData createScmCmdStatusData(
+      UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+    return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+  }
+
+  protected static final class CmdStatusData {
+    private final UUID dnId;
+    private final long scmCmdId;
+    private final Set<Long> deletedBlocksTxIds;
+    private Instant updateTime;
+    private CmdStatus status;
+
+    private CmdStatusData(
+        UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+      this.dnId = dnId;
+      this.scmCmdId = scmTxID;
+      this.deletedBlocksTxIds = deletedBlocksTxIds;
+      setStatus(DEFAULT_STATUS);
+    }
+
+    public boolean isContainDeletedBlocksTx(long deletedBlocksTxId) {
+      return deletedBlocksTxIds.contains(deletedBlocksTxId);
+    }
+
+    public Set<Long> getDeletedBlocksTxIds() {
+      return deletedBlocksTxIds;
+    }
+
+    public UUID getDnId() {
+      return dnId;
+    }
+
+    public long getScmCmdId() {
+      return scmCmdId;
+    }
+
+    public CmdStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(CmdStatus status) {
+      this.updateTime = Instant.now();
+      this.status = status;
+    }
+
+    public Instant getUpdateTime() {
+      return updateTime;
+    }
+
+    @Override
+    public String toString() {
+      return "ScmTxStateMachine" +
+          "{dnId=" + dnId +
+          ", scmTxID=" + scmCmdId +
+          ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+          ", updateTime=" + updateTime +
+          ", status=" + status +
+          '}';
+    }
+  }
+
+  public void recordScmCommand(CmdStatusData statusData) {
+    LOG.debug("Record ScmCommand: {}", statusData);
+    scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+        new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    Map<UUID, Map<Long, CmdStatus>> result =
+        new HashMap<>(scmCmdStatusRecord.size());
+
+    for (UUID dnId : dnIds) {
+      if (scmCmdStatusRecord.get(dnId) == null) {
+        continue;
+      }
+      Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      for (CmdStatusData statusData : record.values()) {
+        CmdStatus status = statusData.getStatus();
+        for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+          dnStatusMap.put(deletedBlocksTxId, status);
+        }
+      }
+      result.put(dnId, dnStatusMap);
+    }
+
+    return result;
+  }
+
+  public void onSent(UUID dnId, long scmCmdId) {
+    updateStatus(dnId, scmCmdId, SENT);
+  }
+
+  public void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+      CommandStatus.Status newState) {
+    CmdStatus status = fromProtoCommandStatus(newState);
+    if (status != null) {
+      updateStatus(dnId, scmCmdId, status);
+    }
+  }
+
+  public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+    for (UUID dnId : scmCmdStatusRecord.keySet()) {

Review Comment:
   Okay, this will clean up deadNode's status faster. I will update this.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java:
##########
@@ -515,6 +516,17 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
           commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
       List<SCMCommand> commands =
           commandQueue.getCommand(datanodeDetails.getUuid());
+
+      // Update the SCMCommand of deleteBlocksCommand Status
+      for (SCMCommand command : commands) {

Review Comment:
   Do you mean maybe like  `scmNodeEventPublisher.fireEvent(SCMEvents.TAKE_OUT_DATANODE_COMMAND, datanodeDetails);`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1411768283


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1836496613

   Thanks @xichen01 for updating the patch.  Can you please check `TestDeletedBlockLog` failures?
   
   https://github.com/xichen01/ozone/actions/runs/7057702518/job/19212125760#step:5:1833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl merged PR #4988:
URL: https://github.com/apache/ozone/pull/4988


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1411743155


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+
+          // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+          // executed by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+      case FAILED:
+        if (oldStatus == SENT) {
+          // Once the DN executes DeleteBlocksCommands, regardless of whether
+          // DeleteBlocksCommands is executed successfully or not,
+          // it will be deleted from record.
+          // Successful DeleteBlocksCommands are recorded in
+          // `transactionToDNsCommitMap`.
+          removeScmCommand(dnId, scmCmdId);
+          changed = true;
+        }
+        break;
+      default:
+        LOG.error("Can not update to Unknown new Status: {}", newStatus);
+        break;
+      }
+      if (!changed) {
+        LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+            "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+      } else {
+        LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" +
+            " {}", dnId, scmCmdId, oldStatus, newStatus);
+      }
+    }
+
+    private void removeTimeoutScmCommand(UUID dnId,
+        Set<Long> scmCmdIds, long timeoutMs) {
+      Instant now = Instant.now();
+      for (Long scmCmdId : scmCmdIds) {
+        Instant updateTime = getUpdateTime(dnId, scmCmdId);
+        if (updateTime != null &&
+            Duration.between(updateTime, now).toMillis() > timeoutMs) {
+          CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+          LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+              "after without update {}ms}", state, dnId, timeoutMs);
+        } else {
+          LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+              "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+        }
+      }
+    }
+
+    private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      CmdStatusData statusData = record.remove(scmCmdId);
+      LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+      return statusData;
+    }
+
+    public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+        Set<UUID> dnIds) {
+      Map<UUID, Map<Long, CmdStatus>> result =
+          new HashMap<>(scmCmdStatusRecord.size());
+
+      for (UUID dnId : dnIds) {
+        Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+        if (record == null) {
+          continue;
+        }
+        Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+        for (CmdStatusData statusData : record.values()) {
+          CmdStatus status = statusData.getStatus();
+          for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+            dnStatusMap.put(deletedBlocksTxId, status);
+          }
+        }
+        result.put(dnId, dnStatusMap);
+      }
+
+      return result;
+    }
+
+    private void clear() {
+      scmCmdStatusRecord.clear();
+    }
+
+    @VisibleForTesting
+    Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+      return scmCmdStatusRecord;
+    }
+  }
+
+  public void incrementRetryCount(List<Long> txIDs, long maxRetry)
+      throws IOException {
+    ArrayList<Long> txIDsToUpdate = new ArrayList<>();
+    for (Long txID : txIDs) {
+      int currentCount =
+          transactionToRetryCountMap.getOrDefault(txID, 0);
+      if (currentCount > maxRetry) {
+        continue;
+      } else {
+        currentCount += 1;
+        if (currentCount > maxRetry) {
+          txIDsToUpdate.add(txID);
+        }
+        transactionToRetryCountMap.put(txID, currentCount);
+      }
+    }
+
+    if (!txIDsToUpdate.isEmpty()) {
+      deletedBlockLogStateManager
+          .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
+    }
+  }
+
+  public void resetRetryCount(List<Long> txIDs) throws IOException {
+    for (Long txID: txIDs) {
+      transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
+    }
+  }
+
+  public int getOrDefaultRetryCount(long txID, int defaultValue) {
+    return transactionToRetryCountMap.getOrDefault(txID, defaultValue);
+  }
+
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    scmDeleteBlocksCommandStatusManager.onSent(
+        dnId.getUuid(), scmCommand.getId());
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+  }
+
+  public void recordTransactionCreated(

Review Comment:
   - `recordTransactionCreated`: This is used to record that a transaction has been created by `SCMBlockDeletingService`, and that SCM has not yet sent the transaction to the DN at the time it was called.
   - `recordTransactionCommitted`: This is used to record that the transaction has been successfully executed by the DN, this is called after the DN has successfully received the deletion transaction and reported it to the SCM.
   
   So it's hard to merge the two



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1414964293


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -437,15 +350,26 @@ public DatanodeDeletedBlockTransactions getTransactions(
       throws IOException {
     lock.lock();
     try {
+      // Here we can clean up the Datanode timeout command that no longer
+      // reports heartbeats
+      getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(

Review Comment:
   @xichen01 Yes, you are correct. Above is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1411718829


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -437,15 +350,26 @@ public DatanodeDeletedBlockTransactions getTransactions(
       throws IOException {
     lock.lock();
     try {
+      // Here we can clean up the Datanode timeout command that no longer
+      // reports heartbeats
+      getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(

Review Comment:
   @sumitagrawl Thank you for the further Review.
   
   
   Our gap may be in the frequency of `SCMBlockDeletingService` execution, in my understanding `SCMBlockDeletingService` is executed every `60s`.
   
   In my understanding 
   - `SCMBlockDeletingService timeout`(`OZONE_BLOCK_DELETING_SERVICE_TIMEOUT`): This is a value used to determine if the Service runtime has timed out, and if it has, a log is printed
   `SCMBlockDeletingService timeout` is only used to print logs, not involved in other judgments.
   ```java
    if (endTime - startTime > serviceTimeoutInNanos) {
      LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", serviceName, endTime - startTime > serviceTimeoutInNanos) { {
      serviceName, endTime - startTime, serviceTimeoutInNanos); }
    }
   ```
   
   - The `block.deleting.service.interval` determines the period at which the `SCMBlockDeletingService` runs, which defaults to 60s.
   - `scmCommandTimeoutMs` is 300s, which means that after SCM sends a delete transaction 5 times, if there is no response from the previously sent transaction, it will send a delete again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1794629250

   > @xichen01, Thanks for working on this. Whether the latest comments given by @sumitagrawl is addressed?
   
   Yes, `CmdStatus` has been reduced to two states, and `transactionToRetryCountMap` had been moved into `SCMDeletedBlockGTransactionStatus`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] xichen01 commented on a diff in pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1265442916


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =

Review Comment:
   Done.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java:
##########
@@ -515,6 +516,17 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
           commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
       List<SCMCommand> commands =
           commandQueue.getCommand(datanodeDetails.getUuid());
+
+      // Update the SCMCommand of deleteBlocksCommand Status
+      for (SCMCommand command : commands) {

Review Comment:
   Done.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+
+  /**
+   * Status of SCMDeleteBlocksCommand.
+   */
+  public enum CmdStatus {
+    // The DeleteBlocksCommand has not yet been sent.
+    // This is the initial status of the command after it's created.
+    TO_BE_SENT,
+    // This status indicates that the DeleteBlocksCommand has been sent
+    // to the DataNode, but the Datanode has not reported any new status
+    // for the DeleteBlocksCommand.
+    SENT,
+    // The DeleteBlocksCommand has been received by Datanode and
+    // is waiting for executed.
+    PENDING_EXECUTED,
+    // The DeleteBlocksCommand was executed, and the execution was successful
+    EXECUTED,
+    // The DeleteBlocksCommand was executed but failed to execute,
+    // or was lost before it was executed.
+    NEED_RESEND
+  }
+
+  private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+
+  private final Set<CmdStatus> statusesRequiringTimeout = new HashSet<>(
+      Arrays.asList(SENT, PENDING_EXECUTED));
+  private final Set<CmdStatus> finialStatuses = new HashSet<>(
+      Arrays.asList(EXECUTED, NEED_RESEND));
+  private final Set<CmdStatus> failedStatuses = new HashSet<>(
+      Arrays.asList(NEED_RESEND));
+
+  private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+  public SCMDeleteBlocksCommandStatusManager() {
+    this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+  }
+
+  public static CmdStatusData createScmCmdStatusData(
+      UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+    return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+  }
+
+  protected static final class CmdStatusData {
+    private final UUID dnId;
+    private final long scmCmdId;
+    private final Set<Long> deletedBlocksTxIds;
+    private Instant updateTime;
+    private CmdStatus status;
+
+    private CmdStatusData(
+        UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+      this.dnId = dnId;
+      this.scmCmdId = scmTxID;
+      this.deletedBlocksTxIds = deletedBlocksTxIds;
+      setStatus(DEFAULT_STATUS);
+    }
+
+    public boolean isContainDeletedBlocksTx(long deletedBlocksTxId) {
+      return deletedBlocksTxIds.contains(deletedBlocksTxId);
+    }
+
+    public Set<Long> getDeletedBlocksTxIds() {
+      return deletedBlocksTxIds;
+    }
+
+    public UUID getDnId() {
+      return dnId;
+    }
+
+    public long getScmCmdId() {
+      return scmCmdId;
+    }
+
+    public CmdStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(CmdStatus status) {
+      this.updateTime = Instant.now();
+      this.status = status;
+    }
+
+    public Instant getUpdateTime() {
+      return updateTime;
+    }
+
+    @Override
+    public String toString() {
+      return "ScmTxStateMachine" +
+          "{dnId=" + dnId +
+          ", scmTxID=" + scmCmdId +
+          ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+          ", updateTime=" + updateTime +
+          ", status=" + status +
+          '}';
+    }
+  }
+
+  public void recordScmCommand(CmdStatusData statusData) {
+    LOG.debug("Record ScmCommand: {}", statusData);
+    scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+        new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    Map<UUID, Map<Long, CmdStatus>> result =
+        new HashMap<>(scmCmdStatusRecord.size());
+
+    for (UUID dnId : dnIds) {
+      if (scmCmdStatusRecord.get(dnId) == null) {
+        continue;
+      }
+      Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      for (CmdStatusData statusData : record.values()) {
+        CmdStatus status = statusData.getStatus();
+        for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+          dnStatusMap.put(deletedBlocksTxId, status);
+        }
+      }
+      result.put(dnId, dnStatusMap);
+    }
+
+    return result;
+  }
+
+  public void onSent(UUID dnId, long scmCmdId) {
+    updateStatus(dnId, scmCmdId, SENT);
+  }
+
+  public void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+      CommandStatus.Status newState) {
+    CmdStatus status = fromProtoCommandStatus(newState);
+    if (status != null) {
+      updateStatus(dnId, scmCmdId, status);
+    }
+  }
+
+  public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+    for (UUID dnId : scmCmdStatusRecord.keySet()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1867825576

   > @xichen01 Could you please check [HDDS-9962](https://issues.apache.org/jira/browse/HDDS-9962), intermittent failure in `TestBlockDeletion`? It started happening after this PR was merged.
   
   OK, I will check this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1867597764

   @xichen01 Could you please check HDDS-9962, intermittent failure in `TestBlockDeletion`?  It started happening after this PR was merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1411732223


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT,
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+
+          // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+          // executed by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+      case FAILED:
+        if (oldStatus == SENT) {

Review Comment:
   Theoretically, this cannot happen, because every transaction has a unique `scmCmdId`, the updates here are based on the `scmCmdId` to identify the transaction and update it.
   The DN cannot reply to a transaction with `scmCmdId XX` without the SCM sending the transaction with `scmCmdId XX` to the DN.
   But I think some additional processing logic could be added here, to increase the robustness of the program.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] xichen01 commented on pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1655962152

   @sumitagrawl PTAL Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1260495240


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =

Review Comment:
   IMO, we should do it as part of this, with adding new changes, its becoming complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] github-actions[bot] commented on pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1609915636

   No such command. `/` Available commands:
    * /**close** : Close pending pull request temporary
    * /**help** : Show all the available comment commands
    * /**label** : add new label to the issue: `/label <label>`
    * /**pending** : Add a REQUESTED_CHANGE type review to mark issue non-mergeable: `/pending <reason>`
    * /**ready** : Dismiss all the blocking reviews by github-actions bot
    * /**retest** : provide help on how to trigger new CI build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] xichen01 commented on pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1609915279

   // todo. Implementing unit test and integration test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1346943938


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -437,15 +350,26 @@ public DatanodeDeletedBlockTransactions getTransactions(
       throws IOException {
     lock.lock();
     try {
+      // Here we can clean up the Datanode timeout command that no longer
+      // reports heartbeats
+      getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(

Review Comment:
   scmCommandTimeoutMs and SCMBlockDeletingService timeout is same, i.e. 300s, so every time, same set of transaction may send again. So scmCommandTimeoutMs has no meaning here,



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -107,11 +100,6 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
     this.containerManager = containerManager;
     this.lock = new ReentrantLock();
 
-    // transactionToDNsCommitMap is updated only when
-    // transaction is added to the log and when it is removed.
-
-    // maps transaction to dns which have committed it.
-    transactionToDNsCommitMap = new ConcurrentHashMap<>();
     transactionToRetryCountMap = new ConcurrentHashMap<>();

Review Comment:
   transactionToRetryCountMap can also be oved inside SCMDeletedBlockGTransactionStatusManager



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   *
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      Map<Long, Integer> transactionToRetryCountMap,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = transactionToRetryCountMap;
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT, PENDING_EXECUTED));
+    private static final Set<CmdStatus> FINIAL_STATUSES = new HashSet<>(
+        Arrays.asList(EXECUTED, NEED_RESEND));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // This status indicates that the DeleteBlocksCommand has been sent
+      // to the DataNode, but the Datanode has not reported any new status
+      // for the DeleteBlocksCommand.
+      SENT,
+      // The DeleteBlocksCommand has been received by Datanode and
+      // is waiting for executed.
+      PENDING_EXECUTED,
+      // The DeleteBlocksCommand was executed, and the execution was successful
+      EXECUTED,
+      // The DeleteBlocksCommand was executed but failed to execute,
+      // or was lost before it was executed.
+      NEED_RESEND
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, SENT);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      CmdStatus status = fromProtoCommandStatus(newState);
+      if (status != null) {
+        updateStatus(dnId, scmCmdId, status);
+      }
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    protected void cleanSCMCommandForDn(UUID dnId, long timeoutMs) {
+      cleanTimeoutSCMCommand(dnId, timeoutMs);
+      cleanFinalStatusSCMCommand(dnId);
+    }
+
+    private void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private void cleanFinalStatusSCMCommand(UUID dnId) {
+      for (CmdStatus status : FINIAL_STATUSES) {
+        for (Long scmCmdId : getScmCommandIds(dnId, status)) {
+          CmdStatusData stateData = removeScmCommand(dnId, scmCmdId);
+          LOG.debug("Clean SCMCommand status: {} for DN: {}, stateData: {}",
+              status, dnId, stateData);
+        }
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId, CmdStatus newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+
+      switch (newStatus) {
+      case SENT:
+        if (oldStatus == TO_BE_SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case PENDING_EXECUTED:
+        if (oldStatus == SENT || oldStatus == PENDING_EXECUTED) {
+          // SENT -> PENDING_EXECUTED: The DeleteBlocksCommand is sent and
+          // received by the Datanode, but the command is not executed by the
+          // Datanode, the command is waiting to be executed.
+
+          // PENDING_EXECUTED -> PENDING_EXECUTED: The DeleteBlocksCommand
+          // continues to wait to be executed by Datanode.
+          statusData.setStatus(PENDING_EXECUTED);
+          changed = true;
+        }
+        break;
+      case NEED_RESEND:

Review Comment:
   NEED_RESEND should be renamed as FAIL_RESEND, its confusing if need_resend as this represent a failure task.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager
+    implements EventHandler<DeleteBlockStatus> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final Lock lock;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   *
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      Map<Long, Integer> transactionToRetryCountMap,
+      ScmBlockDeletingServiceMetrics metrics,
+      Lock lock, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.lock = lock;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = transactionToRetryCountMap;
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT, PENDING_EXECUTED));
+    private static final Set<CmdStatus> FINIAL_STATUSES = new HashSet<>(
+        Arrays.asList(EXECUTED, NEED_RESEND));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // This status indicates that the DeleteBlocksCommand has been sent
+      // to the DataNode, but the Datanode has not reported any new status
+      // for the DeleteBlocksCommand.
+      SENT,
+      // The DeleteBlocksCommand has been received by Datanode and
+      // is waiting for executed.
+      PENDING_EXECUTED,
+      // The DeleteBlocksCommand was executed, and the execution was successful
+      EXECUTED,
+      // The DeleteBlocksCommand was executed but failed to execute,
+      // or was lost before it was executed.
+      NEED_RESEND
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, SENT);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      CmdStatus status = fromProtoCommandStatus(newState);
+      if (status != null) {
+        updateStatus(dnId, scmCmdId, status);
+      }
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    protected void cleanSCMCommandForDn(UUID dnId, long timeoutMs) {
+      cleanTimeoutSCMCommand(dnId, timeoutMs);
+      cleanFinalStatusSCMCommand(dnId);
+    }
+
+    private void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private void cleanFinalStatusSCMCommand(UUID dnId) {
+      for (CmdStatus status : FINIAL_STATUSES) {
+        for (Long scmCmdId : getScmCommandIds(dnId, status)) {
+          CmdStatusData stateData = removeScmCommand(dnId, scmCmdId);
+          LOG.debug("Clean SCMCommand status: {} for DN: {}, stateData: {}",
+              status, dnId, stateData);
+        }
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId, CmdStatus newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+
+      switch (newStatus) {
+      case SENT:
+        if (oldStatus == TO_BE_SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case PENDING_EXECUTED:
+        if (oldStatus == SENT || oldStatus == PENDING_EXECUTED) {
+          // SENT -> PENDING_EXECUTED: The DeleteBlocksCommand is sent and
+          // received by the Datanode, but the command is not executed by the
+          // Datanode, the command is waiting to be executed.
+
+          // PENDING_EXECUTED -> PENDING_EXECUTED: The DeleteBlocksCommand
+          // continues to wait to be executed by Datanode.
+          statusData.setStatus(PENDING_EXECUTED);
+          changed = true;
+        }
+        break;
+      case NEED_RESEND:
+        if (oldStatus == SENT || oldStatus == PENDING_EXECUTED) {
+          // SENT -> NEED_RESEND: The DeleteBlocksCommand is sent and lost
+          // before it is received by the DN.
+
+          // PENDING_EXECUTED -> NEED_RESEND: The DeleteBlocksCommand waited for
+          // a while and was executed, but the execution failed;.
+          // Or the DeleteBlocksCommand was lost while waiting(such as the
+          // Datanode restart).
+          statusData.setStatus(NEED_RESEND);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+        if (oldStatus == SENT || oldStatus == PENDING_EXECUTED) {
+          // PENDING_EXECUTED -> EXECUTED: The Command waits for a period of
+          // time on the DN and is executed successfully.
+
+          // SENT -> EXECUTED: The DeleteBlocksCommand has been sent to
+          // Datanode, executed by DN, and executed successfully.
+          statusData.setStatus(EXECUTED);
+          changed = true;
+        }
+        break;
+      default:
+        LOG.error("Can not update to Unknown new Status: {}", newStatus);
+        break;
+      }
+      if (!changed) {
+        LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+            "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+      } else {
+        LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" +
+            " {}", dnId, scmCmdId, oldStatus, newStatus);
+      }
+    }
+
+    private void removeTimeoutScmCommand(UUID dnId,
+        Set<Long> scmCmdIds, long timeoutMs) {
+      Instant now = Instant.now();
+      for (Long scmCmdId : scmCmdIds) {
+        Instant updateTime = getUpdateTime(dnId, scmCmdId);
+        if (updateTime != null &&
+            Duration.between(updateTime, now).toMillis() > timeoutMs) {
+          updateStatus(dnId, scmCmdId, NEED_RESEND);
+          CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+          LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+              "after without update {}ms}", state, dnId, timeoutMs);
+        } else {
+          LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+              "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+        }
+      }
+    }
+
+    private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+
+      CmdStatus status = record.get(scmCmdId).getStatus();
+      if (!FINIAL_STATUSES.contains(status)) {
+        LOG.error("Cannot Remove ScmCommand {} Non-final Status {} for DN: {}" +
+            ". final Status {}", scmCmdId, status, dnId, FINIAL_STATUSES);
+        return null;
+      }
+
+      CmdStatusData statusData = record.remove(scmCmdId);
+      LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+      return statusData;
+    }
+
+    private static CmdStatus fromProtoCommandStatus(
+        CommandStatus.Status protoCmdStatus) {
+      switch (protoCmdStatus) {
+      case PENDING:
+        return CmdStatus.PENDING_EXECUTED;
+      case EXECUTED:
+        return CmdStatus.EXECUTED;
+      case FAILED:
+        return CmdStatus.NEED_RESEND;
+      default:
+        LOG.error("Unknown protoCmdStatus: {} cannot convert " +
+            "to ScmDeleteBlockCommandStatus", protoCmdStatus);
+        return null;
+      }
+    }
+
+    public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+        Set<UUID> dnIds) {
+      Map<UUID, Map<Long, CmdStatus>> result =
+          new HashMap<>(scmCmdStatusRecord.size());
+
+      for (UUID dnId : dnIds) {
+        Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+        if (record == null) {
+          continue;
+        }
+        Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+        for (CmdStatusData statusData : record.values()) {
+          CmdStatus status = statusData.getStatus();
+          for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+            dnStatusMap.put(deletedBlocksTxId, status);
+          }
+        }
+        result.put(dnId, dnStatusMap);
+      }
+
+      return result;
+    }
+
+    private void clear() {
+      scmCmdStatusRecord.clear();
+    }
+
+    @VisibleForTesting
+    Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+      return scmCmdStatusRecord;
+    }
+  }
+
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    scmDeleteBlocksCommandStatusManager.updateStatus(
+        dnId.getUuid(), scmCommand.getId(), SENT);
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+  }
+
+  public void recordTransactionCreated(
+      UUID dnId, long scmCmdId, Set<Long> dnTxSet) {
+    scmDeleteBlocksCommandStatusManager.recordScmCommand(
+        SCMDeleteBlocksCommandStatusManager
+            .createScmCmdStatusData(dnId, scmCmdId, dnTxSet));
+  }
+
+  public void recordTransactionCommitted(long txId) {
+    transactionToDNsCommitMap
+        .putIfAbsent(txId, new LinkedHashSet<>());
+  }
+
+  public void clear() {
+    scmDeleteBlocksCommandStatusManager.clear();
+    transactionToDNsCommitMap.clear();
+  }
+
+  public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+    scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs);
+  }
+
+  public void onDatanodeDead(UUID dnId) {
+    scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId);
+  }
+
+  public boolean isDuplication(DatanodeDetails dnDetail, long tx,
+      Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+    if (alreadyExecuted(dnDetail.getUuid(), tx)) {
+      return true;
+    }
+    return inProcessing(dnDetail.getUuid(), tx, commandStatus);

Review Comment:
   do not need compute and passing commandStatus in isDuplication, this can be obtained in this class itself,
   scmCmdStatusRecord.get(uuid).get(tx) and can be checked for status. this can be avoided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1421434493


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java:
##########
@@ -88,14 +84,11 @@ void incrementCount(List<Long> txIDs)
   int resetCount(List<Long> txIDs) throws IOException;
 
   /**
-   * Commits a transaction means to delete all footprints of a transaction
-   * from the log. This method doesn't guarantee all transactions can be
-   * successfully deleted, it tolerate failures and tries best efforts to.
-   *  @param transactionResults - delete block transaction results.
-   * @param dnID - ID of datanode which acknowledges the delete block command.
+   * Get SCMDeletedBlockTransactionStatusManager.
+   * @return an Object of SCMDeletedBlockTransactionStatusManager
    */
-  void commitTransactions(List<DeleteBlockTransactionResult> transactionResults,
-      UUID dnID);
+  SCMDeletedBlockTransactionStatusManager
+      getSCMDeletedBlockTransactionStatusManager();

Review Comment:
   `DeletedBlockLog` interface is defined in terms of operations .  I don't think exposing a manager object is appropriate for the interface, it should be an implementation detail.  Similarly, sharing the same lock between the two objects does not seem right.
   
   Maybe the interface should define operations that the implementation passes through to the manager.  Alternatively the manager object should have an interface defined separately, and act as a way to manipulate the `DeletedBlockLog`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1259979240


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java:
##########
@@ -78,9 +78,9 @@ protected CommandStatusReportsProto getReport() {
       // If status is still pending then don't remove it from map as
       // CommandHandler will change its status when it works on this command.
       if (!cmdStatus.getStatus().equals(Status.PENDING)) {
-        builder.addCmdStatus(cmdStatus.getProtoBufMessage());

Review Comment:
   what is the impact for this change at DN / SMC side?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java:
##########
@@ -515,6 +516,17 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
           commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
       List<SCMCommand> commands =
           commandQueue.getCommand(datanodeDetails.getUuid());
+
+      // Update the SCMCommand of deleteBlocksCommand Status
+      for (SCMCommand command : commands) {

Review Comment:
   IMO, can have register to notify send, so that if required this notification for other command, can be easily added.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =

Review Comment:
   For handling deleted block status,
   - DeletedBlockLogStateManager is already there, keeping another class for same purpose may not be required.
   - Already command status is managed using transactionToRetryCountMap, transactionToDNsCommitMap
   IMO, we need combine these logic to provide single status manager for deleted blocks.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+
+  /**
+   * Status of SCMDeleteBlocksCommand.
+   */
+  public enum CmdStatus {
+    // The DeleteBlocksCommand has not yet been sent.
+    // This is the initial status of the command after it's created.
+    TO_BE_SENT,
+    // This status indicates that the DeleteBlocksCommand has been sent
+    // to the DataNode, but the Datanode has not reported any new status
+    // for the DeleteBlocksCommand.
+    SENT,
+    // The DeleteBlocksCommand has been received by Datanode and
+    // is waiting for executed.
+    PENDING_EXECUTED,
+    // The DeleteBlocksCommand was executed, and the execution was successful
+    EXECUTED,
+    // The DeleteBlocksCommand was executed but failed to execute,
+    // or was lost before it was executed.
+    NEED_RESEND
+  }
+
+  private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+
+  private final Set<CmdStatus> statusesRequiringTimeout = new HashSet<>(
+      Arrays.asList(SENT, PENDING_EXECUTED));
+  private final Set<CmdStatus> finialStatuses = new HashSet<>(
+      Arrays.asList(EXECUTED, NEED_RESEND));
+  private final Set<CmdStatus> failedStatuses = new HashSet<>(
+      Arrays.asList(NEED_RESEND));
+
+  private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+  public SCMDeleteBlocksCommandStatusManager() {
+    this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+  }
+
+  public static CmdStatusData createScmCmdStatusData(
+      UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+    return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+  }
+
+  protected static final class CmdStatusData {
+    private final UUID dnId;
+    private final long scmCmdId;
+    private final Set<Long> deletedBlocksTxIds;
+    private Instant updateTime;
+    private CmdStatus status;
+
+    private CmdStatusData(
+        UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+      this.dnId = dnId;
+      this.scmCmdId = scmTxID;
+      this.deletedBlocksTxIds = deletedBlocksTxIds;
+      setStatus(DEFAULT_STATUS);
+    }
+
+    public boolean isContainDeletedBlocksTx(long deletedBlocksTxId) {
+      return deletedBlocksTxIds.contains(deletedBlocksTxId);
+    }
+
+    public Set<Long> getDeletedBlocksTxIds() {
+      return deletedBlocksTxIds;
+    }
+
+    public UUID getDnId() {
+      return dnId;
+    }
+
+    public long getScmCmdId() {
+      return scmCmdId;
+    }
+
+    public CmdStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(CmdStatus status) {
+      this.updateTime = Instant.now();
+      this.status = status;
+    }
+
+    public Instant getUpdateTime() {
+      return updateTime;
+    }
+
+    @Override
+    public String toString() {
+      return "ScmTxStateMachine" +
+          "{dnId=" + dnId +
+          ", scmTxID=" + scmCmdId +
+          ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+          ", updateTime=" + updateTime +
+          ", status=" + status +
+          '}';
+    }
+  }
+
+  public void recordScmCommand(CmdStatusData statusData) {
+    LOG.debug("Record ScmCommand: {}", statusData);
+    scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+        new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    Map<UUID, Map<Long, CmdStatus>> result =
+        new HashMap<>(scmCmdStatusRecord.size());
+
+    for (UUID dnId : dnIds) {
+      if (scmCmdStatusRecord.get(dnId) == null) {
+        continue;
+      }
+      Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      for (CmdStatusData statusData : record.values()) {
+        CmdStatus status = statusData.getStatus();
+        for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+          dnStatusMap.put(deletedBlocksTxId, status);
+        }
+      }
+      result.put(dnId, dnStatusMap);
+    }
+
+    return result;
+  }
+
+  public void onSent(UUID dnId, long scmCmdId) {
+    updateStatus(dnId, scmCmdId, SENT);
+  }
+
+  public void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+      CommandStatus.Status newState) {
+    CmdStatus status = fromProtoCommandStatus(newState);
+    if (status != null) {
+      updateStatus(dnId, scmCmdId, status);
+    }
+  }
+
+  public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+    for (UUID dnId : scmCmdStatusRecord.keySet()) {

Review Comment:
   For deadNode, need cleanup this cache for the DN, need require trigger during deadNodeHandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1838426459

   @adoroszlai @sumitagrawl 
   All tests have passed. PTAL, thanks
   https://github.com/xichen01/ozone/actions/runs/7071798914/job/19258175391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending duplicates to Datanode [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1409485378


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -437,15 +350,26 @@ public DatanodeDeletedBlockTransactions getTransactions(
       throws IOException {
     lock.lock();
     try {
+      // Here we can clean up the Datanode timeout command that no longer
+      // reports heartbeats
+      getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(

Review Comment:
   @xichen01 
   "SCMBlockDeletingService timeout": this is period after which it will create block set for further blocks for deletion. So this is running every 5 min.
   scmCommandTimeoutMs: this is waiting time after command is sent in HB for retry. 
   So normally DN should execute delete blocks within "5" minute.
   
   With both having "5 min", and scmCommandTimeoutMs being initialized after some time, it will avoid sending same command for one more retry, and effectively get 10 min for blocks deletion.
   
   This is more useful for aggressive deletion, keeping timeout to "1 min" for "SCMBlockDeletingService timeout", so that new blocks are prepared quickly even at background, old blocks deletion in progress at DN. And also resolving duplicate blocks to that extent.
   
   IMO, scmCommandTimeoutMs by default can be set to "20 min" as default value in corresponding to "SCMBlockDeletingService timeout" which is default '5 min", i.e. it will get 4-5 cycle waiting time for sending same blocks again.
   OR can have relative configuration as scmCommandTimeoutMs = 4 * "SCMBlockDeletingService timeout" 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] xichen01 commented on pull request #4988: HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #4988:
URL: https://github.com/apache/ozone/pull/4988#issuecomment-1625700346

   @adoroszlai PTAL Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-8882. Add status management of SCM's DeleteBlocksCommand to avoid sending duplicate delete transactions to the DN [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1383177051


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -107,11 +100,6 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
     this.containerManager = containerManager;
     this.lock = new ReentrantLock();
 
-    // transactionToDNsCommitMap is updated only when
-    // transaction is added to the log and when it is removed.
-
-    // maps transaction to dns which have committed it.
-    transactionToDNsCommitMap = new ConcurrentHashMap<>();
     transactionToRetryCountMap = new ConcurrentHashMap<>();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org