You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ck...@apache.org on 2022/10/26 02:40:04 UTC
[ozone] 02/03: HDDS-7258. Cleanup the allocated but uncommitted blocks (#3778)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch ozone-1.3
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit b2ff74f7f5ed71faca9c7bdcccdb18b5bb27f050
Author: Nibiru <ax...@qq.com>
AuthorDate: Wed Oct 26 01:00:02 2022 +0800
HDDS-7258. Cleanup the allocated but uncommitted blocks (#3778)
---
.../apache/hadoop/ozone/om/helpers/OmKeyInfo.java | 76 ++++++++----
.../ozone/om/request/key/OMKeyCommitRequest.java | 19 ++-
.../om/request/key/OMKeyCommitRequestWithFSO.java | 23 +++-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 24 ++++
.../ozone/om/response/key/OMKeyCommitResponse.java | 6 +
.../om/request/key/TestOMKeyCommitRequest.java | 96 ++++++++++++++-
.../ozone/om/service/TestKeyDeletingService.java | 136 +++++++++++++++++++--
7 files changed, 333 insertions(+), 47 deletions(-)
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 9b2014dd7b..f8f589af2e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -26,9 +26,9 @@ import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -180,25 +180,30 @@ public final class OmKeyInfo extends WithParentObjectId {
/**
* updates the length of the each block in the list given.
* This will be called when the key is being committed to OzoneManager.
+ * Return the uncommitted locationInfo to be deleted.
*
* @param locationInfoList list of locationInfo
+ * @return allocated but uncommitted locationInfos
*/
- public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
- boolean isMpu) {
- updateLocationInfoList(locationInfoList, isMpu, false);
+ public List<OmKeyLocationInfo> updateLocationInfoList(
+ List<OmKeyLocationInfo> locationInfoList, boolean isMpu) {
+ return updateLocationInfoList(locationInfoList, isMpu, false);
}
/**
* updates the length of the each block in the list given.
* This will be called when the key is being committed to OzoneManager.
+ * Return the uncommitted locationInfo to be deleted.
*
* @param locationInfoList list of locationInfo
* @param isMpu a true represents multi part key, false otherwise
* @param skipBlockIDCheck a true represents that the blockId verification
* check should be skipped, false represents that
* the blockId verification will be required
+ * @return allocated but uncommitted locationInfos
*/
- public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
+ public List<OmKeyLocationInfo> updateLocationInfoList(
+ List<OmKeyLocationInfo> locationInfoList,
boolean isMpu, boolean skipBlockIDCheck) {
long latestVersion = getLatestVersionLocations().getVersion();
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
@@ -207,51 +212,68 @@ public final class OmKeyInfo extends WithParentObjectId {
// Compare user given block location against allocatedBlockLocations
// present in OmKeyInfo.
+ List<OmKeyLocationInfo> uncommittedBlocks;
List<OmKeyLocationInfo> updatedBlockLocations;
if (skipBlockIDCheck) {
updatedBlockLocations = locationInfoList;
+ uncommittedBlocks = new ArrayList<>();
} else {
- updatedBlockLocations =
+ Pair<List<OmKeyLocationInfo>, List<OmKeyLocationInfo>> verifiedResult =
verifyAndGetKeyLocations(locationInfoList, keyLocationInfoGroup);
+ updatedBlockLocations = verifiedResult.getLeft();
+ uncommittedBlocks = verifiedResult.getRight();
}
- // Updates the latest locationList in the latest version only with
- // given locationInfoList here.
- // TODO : The original allocated list and the updated list here may vary
- // as the containers on the Datanode on which the blocks were pre allocated
- // might get closed. The diff of blocks between these two lists here
- // need to be garbage collected in case the ozone client dies.
+
keyLocationInfoGroup.removeBlocks(latestVersion);
// set each of the locationInfo object to the latest version
updatedBlockLocations.forEach(omKeyLocationInfo -> omKeyLocationInfo
.setCreateVersion(latestVersion));
keyLocationInfoGroup.addAll(latestVersion, updatedBlockLocations);
- }
- private List<OmKeyLocationInfo> verifyAndGetKeyLocations(
- List<OmKeyLocationInfo> locationInfoList,
- OmKeyLocationInfoGroup keyLocationInfoGroup) {
-
- List<OmKeyLocationInfo> allocatedBlockLocations =
- keyLocationInfoGroup.getBlocksLatestVersionOnly();
- List<OmKeyLocationInfo> updatedBlockLocations = new ArrayList<>();
+ return uncommittedBlocks;
+ }
- List<ContainerBlockID> existingBlockIDs = new ArrayList<>();
- for (OmKeyLocationInfo existingLocationInfo : allocatedBlockLocations) {
- BlockID existingBlockID = existingLocationInfo.getBlockID();
- existingBlockIDs.add(existingBlockID.getContainerBlockID());
+ /**
+ * 1. Verify committed KeyLocationInfos
+ * 2. Find out the allocated but uncommitted KeyLocationInfos.
+ *
+ * @param locationInfoList committed KeyLocationInfos
+ * @param keyLocationInfoGroup allocated KeyLocationInfoGroup
+ * @return Pair of updatedOmKeyLocationInfo and uncommittedOmKeyLocationInfo
+ */
+ private Pair<List<OmKeyLocationInfo>, List<OmKeyLocationInfo>>
+ verifyAndGetKeyLocations(
+ List<OmKeyLocationInfo> locationInfoList,
+ OmKeyLocationInfoGroup keyLocationInfoGroup) {
+ // Only check ContainerBlockID here to avoid the mismatch of the pipeline
+ // field and BcsId in the OmKeyLocationInfo, as the OmKeyInfoCodec ignores
+ // the pipeline field by default and bcsId would be updated in Ratis mode.
+ Map<ContainerBlockID, OmKeyLocationInfo> allocatedBlockLocations =
+ new HashMap<>();
+ for (OmKeyLocationInfo existingLocationInfo : keyLocationInfoGroup.
+ getLocationList()) {
+ ContainerBlockID existingBlockID = existingLocationInfo.getBlockID().
+ getContainerBlockID();
+ // The case of overwriting value should never happen
+ allocatedBlockLocations.put(existingBlockID, existingLocationInfo);
}
+ List<OmKeyLocationInfo> updatedBlockLocations = new ArrayList<>();
for (OmKeyLocationInfo modifiedLocationInfo : locationInfoList) {
- BlockID modifiedBlockID = modifiedLocationInfo.getBlockID();
- if (existingBlockIDs.contains(modifiedBlockID.getContainerBlockID())) {
+ ContainerBlockID modifiedContainerBlockId =
+ modifiedLocationInfo.getBlockID().getContainerBlockID();
+ if (allocatedBlockLocations.containsKey(modifiedContainerBlockId)) {
updatedBlockLocations.add(modifiedLocationInfo);
+ allocatedBlockLocations.remove(modifiedContainerBlockId);
} else {
LOG.warn("Unknown BlockLocation:{}, where the blockID of given "
+ "location doesn't match with the stored/allocated block of"
+ " keyName:{}", modifiedLocationInfo, keyName);
}
}
- return updatedBlockLocations;
+ List<OmKeyLocationInfo> uncommittedLocationInfos = new ArrayList<>(
+ allocatedBlockLocations.values());
+ return Pair.of(updatedBlockLocations, uncommittedLocationInfos);
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index ebff5416f3..bb070b6ba1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -218,8 +218,11 @@ public class OMKeyCommitRequest extends OMKeyRequest {
omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
- // Update the block length for each block
- omKeyInfo.updateLocationInfoList(locationInfoList, false);
+ // Update the block length for each block, return the allocated but
+ // uncommitted blocks
+ List<OmKeyLocationInfo> uncommitted = omKeyInfo.updateLocationInfoList(
+ locationInfoList, false);
+
// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
@@ -239,6 +242,18 @@ public class OMKeyCommitRequest extends OMKeyRequest {
omBucketInfo.incrUsedNamespace(1L);
}
+ // let the uncommitted blocks pretend as key's old version blocks
+ // which will be deleted as RepeatedOmKeyInfo
+ OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
+ omKeyInfo);
+ if (pseudoKeyInfo != null) {
+ if (oldKeyVersionsToDelete != null) {
+ oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
+ } else {
+ oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
+ }
+ }
+
// Add to cache of open key table and key table.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
new CacheKey<>(dbOpenKey),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index b9239c9d86..5c7ac450b8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLoca
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
@@ -58,6 +60,9 @@ import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_L
*/
public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OMKeyCommitRequestWithFSO.class);
+
public OMKeyCommitRequestWithFSO(OMRequest omRequest,
BucketLayout bucketLayout) {
super(omRequest, bucketLayout);
@@ -144,10 +149,8 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
- // Update the block length for each block
- List<OmKeyLocationInfo> allocatedLocationInfoList =
- omKeyInfo.getLatestVersionLocations().getLocationList();
- omKeyInfo.updateLocationInfoList(locationInfoList, false);
+ List<OmKeyLocationInfo> uncommitted = omKeyInfo.updateLocationInfoList(
+ locationInfoList, false);
// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
@@ -177,6 +180,18 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
omBucketInfo.incrUsedNamespace(1L);
}
+ // let the uncommitted blocks pretend as key's old version blocks
+ // which will be deleted as RepeatedOmKeyInfo
+ OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
+ omKeyInfo);
+ if (pseudoKeyInfo != null) {
+ if (oldKeyVersionsToDelete != null) {
+ oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
+ } else {
+ oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
+ }
+ }
+
// Add to cache of open key table and key table.
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, dbFileKey,
null, fileName, trxnLogIndex);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 904b5c0005..8c79e16dcd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -816,4 +816,28 @@ public abstract class OMKeyRequest extends OMClientRequest {
return ozoneManager.getOzoneLockProvider()
.createLockStrategy(getBucketLayout());
}
+
+ /**
+ * Wrap the uncommitted blocks as pseudoKeyInfo.
+ *
+ * @param uncommitted Uncommitted OmKeyLocationInfo
+ * @param omKeyInfo Args for key block
+ * @return pseudoKeyInfo
+ */
+ protected OmKeyInfo wrapUncommittedBlocksAsPseudoKey(
+ List<OmKeyLocationInfo> uncommitted, OmKeyInfo omKeyInfo) {
+ if (uncommitted.isEmpty()) {
+ return null;
+ }
+ LOG.info("Detect allocated but uncommitted blocks {} in key {}.",
+ uncommitted, omKeyInfo.getKeyName());
+ OmKeyInfo pseudoKeyInfo = omKeyInfo.copyObject();
+ // TODO dataSize of pseudoKey is not real here
+ List<OmKeyLocationInfoGroup> uncommittedGroups = new ArrayList<>();
+ // version not matters in the current logic of keyDeletingService,
+ // all versions of blocks will be deleted.
+ uncommittedGroups.add(new OmKeyLocationInfoGroup(0, uncommitted));
+ pseudoKeyInfo.setKeyLocationVersions(uncommittedGroups);
+ return pseudoKeyInfo;
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index 513e10cba4..911a61cce6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.response.key;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -104,6 +105,11 @@ public class OMKeyCommitResponse extends OmKeyResponse {
return ozoneKeyName;
}
+ @VisibleForTesting
+ public RepeatedOmKeyInfo getKeysToDelete() {
+ return keysToDelete;
+ }
+
protected void updateDeletedTable(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
if (this.keysToDelete != null) {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index bdf16fef42..10552e380d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse;
import org.apache.hadoop.util.Time;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
@@ -190,6 +191,93 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
omKeyInfo.getLatestVersionLocations().getLocationList());
}
+ @Test
+ public void testValidateAndUpdateCacheWithUncommittedBlocks()
+ throws Exception {
+
+ // allocated block list
+ List<KeyLocation> allocatedKeyLocationList = getKeyLocation(5);
+
+ List<OmKeyLocationInfo> allocatedBlockList = allocatedKeyLocationList
+ .stream().map(OmKeyLocationInfo::getFromProtobuf)
+ .collect(Collectors.toList());
+
+ // committed block list, with three blocks different with the allocated
+ List<KeyLocation> committedKeyLocationList = getKeyLocation(3);
+
+ OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest(
+ committedKeyLocationList));
+
+ OMKeyCommitRequest omKeyCommitRequest =
+ getOmKeyCommitRequest(modifiedOmRequest);
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager, omKeyCommitRequest.getBucketLayout());
+
+ String ozoneKey = addKeyToOpenKeyTable(allocatedBlockList);
+
+ // Key should not be there in key table, as validateAndUpdateCache is
+ // still not called.
+ OmKeyInfo omKeyInfo =
+ omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
+ .get(ozoneKey);
+
+ Assert.assertNull(omKeyInfo);
+
+ OMClientResponse omClientResponse =
+ omKeyCommitRequest.validateAndUpdateCache(ozoneManager,
+ 100L, ozoneManagerDoubleBufferHelper);
+
+ Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+ omClientResponse.getOMResponse().getStatus());
+
+ List<OmKeyInfo> toDeleteKeyList = ((OMKeyCommitResponse) omClientResponse).
+ getKeysToDelete().cloneOmKeyInfoList();
+
+ // This is the first time to commit key, only the allocated but uncommitted
+ // blocks should be deleted.
+ Assert.assertEquals(1, toDeleteKeyList.size());
+ Assert.assertEquals(2, toDeleteKeyList.get(0).
+ getKeyLocationVersions().get(0).getLocationList().size());
+
+ // Entry should be deleted from openKey Table.
+ omKeyInfo =
+ omMetadataManager.getOpenKeyTable(omKeyCommitRequest.getBucketLayout())
+ .get(ozoneKey);
+ Assert.assertNull(omKeyInfo);
+
+ // Now entry should be created in key Table.
+ omKeyInfo =
+ omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
+ .get(ozoneKey);
+
+ Assert.assertNotNull(omKeyInfo);
+
+ // DB keyInfo format
+ verifyKeyName(omKeyInfo);
+
+ // Check modification time
+ CommitKeyRequest commitKeyRequest = modifiedOmRequest.getCommitKeyRequest();
+ Assert.assertEquals(commitKeyRequest.getKeyArgs().getModificationTime(),
+ omKeyInfo.getModificationTime());
+
+ // Check block location.
+ List<OmKeyLocationInfo> locationInfoListFromCommitKeyRequest =
+ commitKeyRequest.getKeyArgs()
+ .getKeyLocationsList().stream()
+ .map(OmKeyLocationInfo::getFromProtobuf)
+ .collect(Collectors.toList());
+
+ List<OmKeyLocationInfo> intersection = new ArrayList<>(allocatedBlockList);
+ intersection.retainAll(locationInfoListFromCommitKeyRequest);
+
+ // Key table should have three blocks.
+ Assert.assertEquals(intersection,
+ omKeyInfo.getLatestVersionLocations().getLocationList());
+ Assert.assertEquals(3, intersection.size());
+
+ }
+
@Test
public void testValidateAndUpdateCacheWithSubDirs() throws Exception {
parentDir = "dir1/dir2/dir3/";
@@ -466,15 +554,19 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
modifiedKeyArgs.getFactor());
}
+ private OMRequest createCommitKeyRequest() {
+ return createCommitKeyRequest(getKeyLocation(5));
+ }
+
/**
* Create OMRequest which encapsulates CommitKeyRequest.
*/
- private OMRequest createCommitKeyRequest() {
+ private OMRequest createCommitKeyRequest(List<KeyLocation> keyLocations) {
KeyArgs keyArgs =
KeyArgs.newBuilder().setDataSize(dataSize).setVolumeName(volumeName)
.setKeyName(keyName).setBucketName(bucketName)
.setType(replicationType).setFactor(replicationFactor)
- .addAllKeyLocations(getKeyLocation(5)).build();
+ .addAllKeyLocations(keyLocations).build();
CommitKeyRequest commitKeyRequest =
CommitKeyRequest.newBuilder().setKeyArgs(keyArgs)
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index a24a72ac80..3bc35c8b3b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -22,14 +22,21 @@ package org.apache.hadoop.ozone.om.service;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OmTestManagers;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.junit.BeforeClass;
@@ -133,8 +140,8 @@ public class TestKeyDeletingService {
() -> keyDeletingService.getDeletedKeyCount().get() >= keyCount,
1000, 10000);
Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
- Assert.assertEquals(
- keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), 0);
+ Assert.assertEquals(0,
+ keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
}
@Test(timeout = 40000)
@@ -176,9 +183,9 @@ public class TestKeyDeletingService {
() -> keyDeletingService.getRunCount().get() >= 5,
100, 1000);
// Since SCM calls are failing, deletedKeyCount should be zero.
- Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0);
- Assert.assertEquals(
- keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), keyCount);
+ Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
+ Assert.assertEquals(keyCount,
+ keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
}
@Test(timeout = 30000)
@@ -200,15 +207,86 @@ public class TestKeyDeletingService {
KeyDeletingService keyDeletingService =
(KeyDeletingService) keyManager.getDeletingService();
- // Since empty keys are directly deleted from db there should be no
- // pending deletion keys. Also deletedKeyCount should be zero.
- Assert.assertEquals(
- keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), 0);
+ // the pre-allocated blocks are not committed, hence they will be deleted.
+ Assert.assertEquals(100,
+ keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
// Make sure that we have run the background thread 2 times or more
GenericTestUtils.waitFor(
() -> keyDeletingService.getRunCount().get() >= 2,
100, 1000);
- Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0);
+ // the blockClient is set to fail the deletion of key blocks, hence no keys
+ // will be deleted
+ Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
+ }
+
+ @Test(timeout = 30000)
+ public void checkDeletionForPartiallyCommitKey()
+ throws IOException, TimeoutException, InterruptedException,
+ AuthenticationException {
+ OzoneConfiguration conf = createConfAndInitValues();
+ ScmBlockLocationProtocol blockClient =
+ //failCallsFrequency = 1 , means all calls fail.
+ new ScmBlockLocationTestingClient(null, null, 1);
+ OmTestManagers omTestManagers
+ = new OmTestManagers(conf, blockClient, null);
+ KeyManager keyManager = omTestManagers.getKeyManager();
+ writeClient = omTestManagers.getWriteClient();
+ om = omTestManagers.getOzoneManager();
+
+ String volumeName = String.format("volume%s",
+ RandomStringUtils.randomAlphanumeric(5));
+ String bucketName = String.format("bucket%s",
+ RandomStringUtils.randomAlphanumeric(5));
+ String keyName = String.format("key%s",
+ RandomStringUtils.randomAlphanumeric(5));
+
+ // Create Volume and Bucket
+ createVolumeAndBucket(keyManager, volumeName, bucketName, false);
+
+ OmKeyArgs keyArg = createAndCommitKey(keyManager, volumeName, bucketName,
+ keyName, 3, 1);
+
+ // Only the uncommitted block should be pending to be deleted.
+ GenericTestUtils.waitFor(
+ () -> {
+ try {
+ return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+ .stream()
+ .map(BlockGroup::getBlockIDList)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()).size() == 1;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ },
+ 500, 3000);
+
+ // Delete the key
+ writeClient.deleteKey(keyArg);
+
+ KeyDeletingService keyDeletingService =
+ (KeyDeletingService) keyManager.getDeletingService();
+
+ // All blocks should be pending to be deleted.
+ GenericTestUtils.waitFor(
+ () -> {
+ try {
+ return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+ .stream()
+ .map(BlockGroup::getBlockIDList)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()).size() == 3;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ },
+ 500, 3000);
+
+ // the blockClient is set to fail the deletion of key blocks, hence no keys
+ // will be deleted
+ Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
}
@Test(timeout = 30000)
@@ -299,6 +377,15 @@ public class TestKeyDeletingService {
private OmKeyArgs createAndCommitKey(KeyManager keyManager, String volumeName,
String bucketName, String keyName, int numBlocks) throws IOException {
+ return createAndCommitKey(keyManager, volumeName, bucketName, keyName,
+ numBlocks, 0);
+ }
+
+ private OmKeyArgs createAndCommitKey(KeyManager keyManager, String volumeName,
+ String bucketName, String keyName, int numBlocks, int numUncommitted)
+ throws IOException {
+ // Even if no key size is appointed, there will be at least one
+ // block pre-allocated when key is created
OmKeyArgs keyArg =
new OmKeyArgs.Builder()
.setVolumeName(volumeName)
@@ -311,10 +398,35 @@ public class TestKeyDeletingService {
.build();
//Open and Commit the Key in the Key Manager.
OpenKeySession session = writeClient.openKey(keyArg);
- for (int i = 0; i < numBlocks; i++) {
- keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(),
+
+ // add pre-allocated blocks into args and avoid creating excessive block
+ OmKeyLocationInfoGroup keyLocationVersions = session.getKeyInfo().
+ getLatestVersionLocations();
+ assert keyLocationVersions != null;
+ List<OmKeyLocationInfo> latestBlocks = keyLocationVersions.
+ getBlocksLatestVersionOnly();
+ int preAllocatedSize = latestBlocks.size();
+ for (OmKeyLocationInfo block : latestBlocks) {
+ keyArg.addLocationInfo(block);
+ }
+
+ // allocate blocks until the blocks num equal to numBlocks
+ LinkedList<OmKeyLocationInfo> allocated = new LinkedList<>();
+ for (int i = 0; i < numBlocks - preAllocatedSize; i++) {
+ allocated.add(writeClient.allocateBlock(keyArg, session.getId(),
new ExcludeList()));
}
+
+ // remove the blocks not to be committed
+ for (int i = 0; i < numUncommitted; i++) {
+ allocated.removeFirst();
+ }
+
+ // add the blocks to be committed
+ for (OmKeyLocationInfo block: allocated) {
+ keyArg.addLocationInfo(block);
+ }
+
writeClient.commitKey(keyArg, session.getId());
return keyArg;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org