You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ck...@apache.org on 2022/10/26 02:40:04 UTC

[ozone] 02/03: HDDS-7258. Cleanup the allocated but uncommitted blocks (#3778)

This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch ozone-1.3
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit b2ff74f7f5ed71faca9c7bdcccdb18b5bb27f050
Author: Nibiru <ax...@qq.com>
AuthorDate: Wed Oct 26 01:00:02 2022 +0800

    HDDS-7258. Cleanup the allocated but uncommitted blocks (#3778)
---
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  76 ++++++++----
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  19 ++-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  23 +++-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  24 ++++
 .../ozone/om/response/key/OMKeyCommitResponse.java |   6 +
 .../om/request/key/TestOMKeyCommitRequest.java     |  96 ++++++++++++++-
 .../ozone/om/service/TestKeyDeletingService.java   | 136 +++++++++++++++++++--
 7 files changed, 333 insertions(+), 47 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 9b2014dd7b..f8f589af2e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -26,9 +26,9 @@ import java.util.Map;
 import java.util.Objects;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -180,25 +180,30 @@ public final class OmKeyInfo extends WithParentObjectId {
   /**
    * updates the length of the each block in the list given.
    * This will be called when the key is being committed to OzoneManager.
+   * Return the uncommitted locationInfo to be deleted.
    *
    * @param locationInfoList list of locationInfo
+   * @return allocated but uncommitted locationInfos
    */
-  public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
-      boolean isMpu) {
-    updateLocationInfoList(locationInfoList, isMpu, false);
+  public List<OmKeyLocationInfo> updateLocationInfoList(
+      List<OmKeyLocationInfo> locationInfoList, boolean isMpu) {
+    return updateLocationInfoList(locationInfoList, isMpu, false);
   }
 
   /**
    * updates the length of the each block in the list given.
    * This will be called when the key is being committed to OzoneManager.
+   * Return the uncommitted locationInfo to be deleted.
    *
    * @param locationInfoList list of locationInfo
    * @param isMpu a true represents multi part key, false otherwise
    * @param skipBlockIDCheck a true represents that the blockId verification
    *                         check should be skipped, false represents that
    *                         the blockId verification will be required
+   * @return allocated but uncommitted locationInfos
    */
-  public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
+  public List<OmKeyLocationInfo> updateLocationInfoList(
+      List<OmKeyLocationInfo> locationInfoList,
       boolean isMpu, boolean skipBlockIDCheck) {
     long latestVersion = getLatestVersionLocations().getVersion();
     OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
@@ -207,51 +212,68 @@ public final class OmKeyInfo extends WithParentObjectId {
 
     // Compare user given block location against allocatedBlockLocations
     // present in OmKeyInfo.
+    List<OmKeyLocationInfo> uncommittedBlocks;
     List<OmKeyLocationInfo> updatedBlockLocations;
     if (skipBlockIDCheck) {
       updatedBlockLocations = locationInfoList;
+      uncommittedBlocks = new ArrayList<>();
     } else {
-      updatedBlockLocations =
+      Pair<List<OmKeyLocationInfo>, List<OmKeyLocationInfo>> verifiedResult =
           verifyAndGetKeyLocations(locationInfoList, keyLocationInfoGroup);
+      updatedBlockLocations = verifiedResult.getLeft();
+      uncommittedBlocks = verifiedResult.getRight();
     }
-    // Updates the latest locationList in the latest version only with
-    // given locationInfoList here.
-    // TODO : The original allocated list and the updated list here may vary
-    // as the containers on the Datanode on which the blocks were pre allocated
-    // might get closed. The diff of blocks between these two lists here
-    // need to be garbage collected in case the ozone client dies.
+
     keyLocationInfoGroup.removeBlocks(latestVersion);
     // set each of the locationInfo object to the latest version
     updatedBlockLocations.forEach(omKeyLocationInfo -> omKeyLocationInfo
         .setCreateVersion(latestVersion));
     keyLocationInfoGroup.addAll(latestVersion, updatedBlockLocations);
-  }
 
-  private List<OmKeyLocationInfo> verifyAndGetKeyLocations(
-      List<OmKeyLocationInfo> locationInfoList,
-      OmKeyLocationInfoGroup keyLocationInfoGroup) {
-
-    List<OmKeyLocationInfo> allocatedBlockLocations =
-        keyLocationInfoGroup.getBlocksLatestVersionOnly();
-    List<OmKeyLocationInfo> updatedBlockLocations = new ArrayList<>();
+    return uncommittedBlocks;
+  }
 
-    List<ContainerBlockID> existingBlockIDs = new ArrayList<>();
-    for (OmKeyLocationInfo existingLocationInfo : allocatedBlockLocations) {
-      BlockID existingBlockID = existingLocationInfo.getBlockID();
-      existingBlockIDs.add(existingBlockID.getContainerBlockID());
+  /**
+   *  1. Verify committed KeyLocationInfos
+   *  2. Find out the allocated but uncommitted KeyLocationInfos.
+   *
+   * @param locationInfoList committed KeyLocationInfos
+   * @param keyLocationInfoGroup allocated KeyLocationInfoGroup
+   * @return Pair of updatedOmKeyLocationInfo and uncommittedOmKeyLocationInfo
+   */
+  private Pair<List<OmKeyLocationInfo>, List<OmKeyLocationInfo>>
+      verifyAndGetKeyLocations(
+          List<OmKeyLocationInfo> locationInfoList,
+          OmKeyLocationInfoGroup keyLocationInfoGroup) {
+    // Only check ContainerBlockID here to avoid the mismatch of the pipeline
+    // field and BcsId in the OmKeyLocationInfo, as the OmKeyInfoCodec ignores
+    // the pipeline field by default and bcsId would be updated in Ratis mode.
+    Map<ContainerBlockID, OmKeyLocationInfo> allocatedBlockLocations =
+        new HashMap<>();
+    for (OmKeyLocationInfo existingLocationInfo : keyLocationInfoGroup.
+        getLocationList()) {
+      ContainerBlockID existingBlockID = existingLocationInfo.getBlockID().
+          getContainerBlockID();
+      // The case of overwriting value should never happen
+      allocatedBlockLocations.put(existingBlockID, existingLocationInfo);
     }
 
+    List<OmKeyLocationInfo> updatedBlockLocations = new ArrayList<>();
     for (OmKeyLocationInfo modifiedLocationInfo : locationInfoList) {
-      BlockID modifiedBlockID = modifiedLocationInfo.getBlockID();
-      if (existingBlockIDs.contains(modifiedBlockID.getContainerBlockID())) {
+      ContainerBlockID modifiedContainerBlockId =
+          modifiedLocationInfo.getBlockID().getContainerBlockID();
+      if (allocatedBlockLocations.containsKey(modifiedContainerBlockId)) {
         updatedBlockLocations.add(modifiedLocationInfo);
+        allocatedBlockLocations.remove(modifiedContainerBlockId);
       } else {
         LOG.warn("Unknown BlockLocation:{}, where the blockID of given "
             + "location doesn't match with the stored/allocated block of"
             + " keyName:{}", modifiedLocationInfo, keyName);
       }
     }
-    return updatedBlockLocations;
+    List<OmKeyLocationInfo> uncommittedLocationInfos = new ArrayList<>(
+        allocatedBlockLocations.values());
+    return Pair.of(updatedBlockLocations, uncommittedLocationInfos);
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index ebff5416f3..bb070b6ba1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -218,8 +218,11 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
       omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
-      // Update the block length for each block
-      omKeyInfo.updateLocationInfoList(locationInfoList, false);
+      // Update the block length for each block, return the allocated but
+      // uncommitted blocks
+      List<OmKeyLocationInfo> uncommitted = omKeyInfo.updateLocationInfoList(
+          locationInfoList, false);
+
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
@@ -239,6 +242,18 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         omBucketInfo.incrUsedNamespace(1L);
       }
 
+      // let the uncommitted blocks pretend as key's old version blocks
+      // which will be deleted as RepeatedOmKeyInfo
+      OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
+          omKeyInfo);
+      if (pseudoKeyInfo != null) {
+        if (oldKeyVersionsToDelete != null) {
+          oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
+        } else {
+          oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
+        }
+      }
+
       // Add to cache of open key table and key table.
       omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
           new CacheKey<>(dbOpenKey),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index b9239c9d86..5c7ac450b8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLoca
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.Path;
@@ -58,6 +60,9 @@ import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_L
  */
 public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMKeyCommitRequestWithFSO.class);
+
   public OMKeyCommitRequestWithFSO(OMRequest omRequest,
       BucketLayout bucketLayout) {
     super(omRequest, bucketLayout);
@@ -144,10 +149,8 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
 
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
 
-      // Update the block length for each block
-      List<OmKeyLocationInfo> allocatedLocationInfoList =
-          omKeyInfo.getLatestVersionLocations().getLocationList();
-      omKeyInfo.updateLocationInfoList(locationInfoList, false);
+      List<OmKeyLocationInfo> uncommitted = omKeyInfo.updateLocationInfoList(
+          locationInfoList, false);
 
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
@@ -177,6 +180,18 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
         omBucketInfo.incrUsedNamespace(1L);
       }
 
+      // let the uncommitted blocks pretend as key's old version blocks
+      // which will be deleted as RepeatedOmKeyInfo
+      OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
+          omKeyInfo);
+      if (pseudoKeyInfo != null) {
+        if (oldKeyVersionsToDelete != null) {
+          oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
+        } else {
+          oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
+        }
+      }
+
       // Add to cache of open key table and key table.
       OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, dbFileKey,
               null, fileName, trxnLogIndex);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 904b5c0005..8c79e16dcd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -816,4 +816,28 @@ public abstract class OMKeyRequest extends OMClientRequest {
     return ozoneManager.getOzoneLockProvider()
         .createLockStrategy(getBucketLayout());
   }
+
+  /**
+   * Wrap the uncommitted blocks as pseudoKeyInfo.
+   *
+   * @param uncommitted Uncommitted OmKeyLocationInfo
+   * @param omKeyInfo   Args for key block
+   * @return pseudoKeyInfo
+   */
+  protected OmKeyInfo wrapUncommittedBlocksAsPseudoKey(
+      List<OmKeyLocationInfo> uncommitted, OmKeyInfo omKeyInfo) {
+    if (uncommitted.isEmpty()) {
+      return null;
+    }
+    LOG.info("Detect allocated but uncommitted blocks {} in key {}.",
+        uncommitted, omKeyInfo.getKeyName());
+    OmKeyInfo pseudoKeyInfo = omKeyInfo.copyObject();
+    // TODO dataSize of pseudoKey is not real here
+    List<OmKeyLocationInfoGroup> uncommittedGroups = new ArrayList<>();
+    // version not matters in the current logic of keyDeletingService,
+    // all versions of blocks will be deleted.
+    uncommittedGroups.add(new OmKeyLocationInfoGroup(0, uncommitted));
+    pseudoKeyInfo.setKeyLocationVersions(uncommittedGroups);
+    return pseudoKeyInfo;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index 513e10cba4..911a61cce6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -104,6 +105,11 @@ public class OMKeyCommitResponse extends OmKeyResponse {
     return ozoneKeyName;
   }
 
+  @VisibleForTesting
+  public RepeatedOmKeyInfo getKeysToDelete() {
+    return keysToDelete;
+  }
+
   protected void updateDeletedTable(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
     if (this.keysToDelete != null) {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index bdf16fef42..10552e380d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse;
 import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
@@ -190,6 +191,93 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
         omKeyInfo.getLatestVersionLocations().getLocationList());
   }
 
+  @Test
+  public void testValidateAndUpdateCacheWithUncommittedBlocks()
+      throws Exception {
+
+    // allocated block list
+    List<KeyLocation> allocatedKeyLocationList = getKeyLocation(5);
+
+    List<OmKeyLocationInfo> allocatedBlockList = allocatedKeyLocationList
+        .stream().map(OmKeyLocationInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+
+    // committed block list, with three blocks different with the allocated
+    List<KeyLocation> committedKeyLocationList = getKeyLocation(3);
+
+    OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest(
+        committedKeyLocationList));
+
+    OMKeyCommitRequest omKeyCommitRequest =
+        getOmKeyCommitRequest(modifiedOmRequest);
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager, omKeyCommitRequest.getBucketLayout());
+
+    String ozoneKey = addKeyToOpenKeyTable(allocatedBlockList);
+
+    // Key should not be there in key table, as validateAndUpdateCache is
+    // still not called.
+    OmKeyInfo omKeyInfo =
+        omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
+            .get(ozoneKey);
+
+    Assert.assertNull(omKeyInfo);
+
+    OMClientResponse omClientResponse =
+        omKeyCommitRequest.validateAndUpdateCache(ozoneManager,
+            100L, ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omClientResponse.getOMResponse().getStatus());
+
+    List<OmKeyInfo> toDeleteKeyList = ((OMKeyCommitResponse) omClientResponse).
+        getKeysToDelete().cloneOmKeyInfoList();
+
+    // This is the first time to commit key, only the allocated but uncommitted
+    // blocks should be deleted.
+    Assert.assertEquals(1, toDeleteKeyList.size());
+    Assert.assertEquals(2, toDeleteKeyList.get(0).
+        getKeyLocationVersions().get(0).getLocationList().size());
+
+    // Entry should be deleted from openKey Table.
+    omKeyInfo =
+        omMetadataManager.getOpenKeyTable(omKeyCommitRequest.getBucketLayout())
+            .get(ozoneKey);
+    Assert.assertNull(omKeyInfo);
+
+    // Now entry should be created in key Table.
+    omKeyInfo =
+        omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
+            .get(ozoneKey);
+
+    Assert.assertNotNull(omKeyInfo);
+
+    // DB keyInfo format
+    verifyKeyName(omKeyInfo);
+
+    // Check modification time
+    CommitKeyRequest commitKeyRequest = modifiedOmRequest.getCommitKeyRequest();
+    Assert.assertEquals(commitKeyRequest.getKeyArgs().getModificationTime(),
+        omKeyInfo.getModificationTime());
+
+    // Check block location.
+    List<OmKeyLocationInfo> locationInfoListFromCommitKeyRequest =
+        commitKeyRequest.getKeyArgs()
+            .getKeyLocationsList().stream()
+            .map(OmKeyLocationInfo::getFromProtobuf)
+            .collect(Collectors.toList());
+
+    List<OmKeyLocationInfo> intersection = new ArrayList<>(allocatedBlockList);
+    intersection.retainAll(locationInfoListFromCommitKeyRequest);
+
+    // Key table should have three blocks.
+    Assert.assertEquals(intersection,
+        omKeyInfo.getLatestVersionLocations().getLocationList());
+    Assert.assertEquals(3, intersection.size());
+
+  }
+
   @Test
   public void testValidateAndUpdateCacheWithSubDirs() throws Exception {
     parentDir = "dir1/dir2/dir3/";
@@ -466,15 +554,19 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
         modifiedKeyArgs.getFactor());
   }
 
+  private OMRequest createCommitKeyRequest() {
+    return createCommitKeyRequest(getKeyLocation(5));
+  }
+
   /**
    * Create OMRequest which encapsulates CommitKeyRequest.
    */
-  private OMRequest createCommitKeyRequest() {
+  private OMRequest createCommitKeyRequest(List<KeyLocation> keyLocations) {
     KeyArgs keyArgs =
         KeyArgs.newBuilder().setDataSize(dataSize).setVolumeName(volumeName)
             .setKeyName(keyName).setBucketName(bucketName)
             .setType(replicationType).setFactor(replicationFactor)
-            .addAllKeyLocations(getKeyLocation(5)).build();
+            .addAllKeyLocations(keyLocations).build();
 
     CommitKeyRequest commitKeyRequest =
         CommitKeyRequest.newBuilder().setKeyArgs(keyArgs)
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index a24a72ac80..3bc35c8b3b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -22,14 +22,21 @@ package org.apache.hadoop.ozone.om.service;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.ratis.util.ExitUtils;
 import org.junit.BeforeClass;
@@ -133,8 +140,8 @@ public class TestKeyDeletingService {
         () -> keyDeletingService.getDeletedKeyCount().get() >= keyCount,
         1000, 10000);
     Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
-    Assert.assertEquals(
-        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), 0);
+    Assert.assertEquals(0,
+        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
   }
 
   @Test(timeout = 40000)
@@ -176,9 +183,9 @@ public class TestKeyDeletingService {
         () -> keyDeletingService.getRunCount().get() >= 5,
         100, 1000);
     // Since SCM calls are failing, deletedKeyCount should be zero.
-    Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0);
-    Assert.assertEquals(
-        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), keyCount);
+    Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
+    Assert.assertEquals(keyCount,
+        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
   }
 
   @Test(timeout = 30000)
@@ -200,15 +207,86 @@ public class TestKeyDeletingService {
     KeyDeletingService keyDeletingService =
         (KeyDeletingService) keyManager.getDeletingService();
 
-    // Since empty keys are directly deleted from db there should be no
-    // pending deletion keys. Also deletedKeyCount should be zero.
-    Assert.assertEquals(
-        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), 0);
+    // the pre-allocated blocks are not committed, hence they will be deleted.
+    Assert.assertEquals(100,
+        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
     // Make sure that we have run the background thread 2 times or more
     GenericTestUtils.waitFor(
         () -> keyDeletingService.getRunCount().get() >= 2,
         100, 1000);
-    Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0);
+    // the blockClient is set to fail the deletion of key blocks, hence no keys
+    // will be deleted
+    Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
+  }
+
+  @Test(timeout = 30000)
+  public void checkDeletionForPartiallyCommitKey()
+      throws IOException, TimeoutException, InterruptedException,
+      AuthenticationException {
+    OzoneConfiguration conf = createConfAndInitValues();
+    ScmBlockLocationProtocol blockClient =
+        //failCallsFrequency = 1 , means all calls fail.
+        new ScmBlockLocationTestingClient(null, null, 1);
+    OmTestManagers omTestManagers
+        = new OmTestManagers(conf, blockClient, null);
+    KeyManager keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+
+    String volumeName = String.format("volume%s",
+        RandomStringUtils.randomAlphanumeric(5));
+    String bucketName = String.format("bucket%s",
+        RandomStringUtils.randomAlphanumeric(5));
+    String keyName = String.format("key%s",
+        RandomStringUtils.randomAlphanumeric(5));
+
+    // Create Volume and Bucket
+    createVolumeAndBucket(keyManager, volumeName, bucketName, false);
+
+    OmKeyArgs keyArg = createAndCommitKey(keyManager, volumeName, bucketName,
+        keyName, 3, 1);
+
+    // Only the uncommitted block should be pending to be deleted.
+    GenericTestUtils.waitFor(
+        () -> {
+          try {
+            return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+                .stream()
+                .map(BlockGroup::getBlockIDList)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList()).size() == 1;
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+          return false;
+        },
+        500, 3000);
+
+    // Delete the key
+    writeClient.deleteKey(keyArg);
+
+    KeyDeletingService keyDeletingService =
+        (KeyDeletingService) keyManager.getDeletingService();
+
+    // All blocks should be pending to be deleted.
+    GenericTestUtils.waitFor(
+        () -> {
+          try {
+            return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+                .stream()
+                .map(BlockGroup::getBlockIDList)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList()).size() == 3;
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+          return false;
+        },
+        500, 3000);
+
+    // the blockClient is set to fail the deletion of key blocks, hence no keys
+    // will be deleted
+    Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
   }
 
   @Test(timeout = 30000)
@@ -299,6 +377,15 @@ public class TestKeyDeletingService {
 
   private OmKeyArgs createAndCommitKey(KeyManager keyManager, String volumeName,
       String bucketName, String keyName, int numBlocks) throws IOException {
+    return createAndCommitKey(keyManager, volumeName, bucketName, keyName,
+        numBlocks, 0);
+  }
+
+  private OmKeyArgs createAndCommitKey(KeyManager keyManager, String volumeName,
+      String bucketName, String keyName, int numBlocks, int numUncommitted)
+      throws IOException {
+    // Even if no key size is appointed, there will be at least one
+    // block pre-allocated when key is created
     OmKeyArgs keyArg =
         new OmKeyArgs.Builder()
             .setVolumeName(volumeName)
@@ -311,10 +398,35 @@ public class TestKeyDeletingService {
             .build();
     //Open and Commit the Key in the Key Manager.
     OpenKeySession session = writeClient.openKey(keyArg);
-    for (int i = 0; i < numBlocks; i++) {
-      keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(),
+
+    // add pre-allocated blocks into args and avoid creating excessive block
+    OmKeyLocationInfoGroup keyLocationVersions = session.getKeyInfo().
+        getLatestVersionLocations();
+    assert keyLocationVersions != null;
+    List<OmKeyLocationInfo> latestBlocks = keyLocationVersions.
+        getBlocksLatestVersionOnly();
+    int preAllocatedSize = latestBlocks.size();
+    for (OmKeyLocationInfo block : latestBlocks) {
+      keyArg.addLocationInfo(block);
+    }
+
+    // allocate blocks until the blocks num equal to numBlocks
+    LinkedList<OmKeyLocationInfo> allocated = new LinkedList<>();
+    for (int i = 0; i < numBlocks - preAllocatedSize; i++) {
+      allocated.add(writeClient.allocateBlock(keyArg, session.getId(),
           new ExcludeList()));
     }
+
+    // remove the blocks not to be committed
+    for (int i = 0; i < numUncommitted; i++) {
+      allocated.removeFirst();
+    }
+
+    // add the blocks to be committed
+    for (OmKeyLocationInfo block: allocated) {
+      keyArg.addLocationInfo(block);
+    }
+
     writeClient.commitKey(keyArg, session.getId());
     return keyArg;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org