You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/02/01 00:49:13 UTC
[hadoop-ozone] branch master updated: HDDS-2893. Handle replay of
KeyPurge Request. (#450)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bb57442 HDDS-2893. Handle replay of KeyPurge Request. (#450)
bb57442 is described below
commit bb574424f96a5ef685de11aa7694c9f38d51f14a
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Fri Jan 31 16:49:05 2020 -0800
HDDS-2893. Handle replay of KeyPurge Request. (#450)
---
.../src/main/proto/OzoneManagerProtocol.proto | 8 +-
.../apache/hadoop/ozone/om/KeyDeletingService.java | 46 +++++-
.../ozone/om/request/key/OMKeyPurgeRequest.java | 127 +++++++++++++---
.../ozone/om/response/key/OMKeyPurgeResponse.java | 22 ++-
.../ozone/om/request/TestOMRequestUtils.java | 33 ++--
.../om/request/key/TestOMKeyDeleteRequest.java | 2 +-
.../key/TestOMKeyPurgeRequestAndResponse.java | 166 ++++++++++++++++++---
7 files changed, 342 insertions(+), 62 deletions(-)
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 568a1d9..65bf10a 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -853,8 +853,14 @@ message DeleteKeyResponse {
optional uint64 openVersion = 4;
}
+message DeletedKeys {
+ required string volumeName = 1;
+ required string bucketName = 2;
+ repeated string keys = 3;
+}
+
message PurgeKeysRequest {
- repeated string keys = 1;
+ repeated DeletedKeys deletedKeys = 1;
}
message PurgeKeysResponse {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
index 3005617..27ad7d8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -18,15 +18,19 @@ package org.apache.hadoop.ozone.om;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
@@ -38,6 +42,8 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
import com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -45,6 +51,7 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.util.Preconditions;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -218,7 +225,8 @@ public class KeyDeletingService extends BackgroundService {
* @throws IOException on Error
*/
public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results) {
- List<String> purgeKeysList = new ArrayList<>();
+ Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+ new HashMap<>();
// Put all keys to be purged in a list
int deletedCount = 0;
@@ -226,15 +234,26 @@ public class KeyDeletingService extends BackgroundService {
if (result.isSuccess()) {
// Add key to PurgeKeys list.
String deletedKey = result.getObjectKey();
- purgeKeysList.add(deletedKey);
+ // Parse Volume and BucketName
+ addToMap(purgeKeysMapPerBucket, deletedKey);
LOG.debug("Key {} set to be purged from OM DB", deletedKey);
deletedCount++;
}
}
- PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
- .addAllKeys(purgeKeysList)
- .build();
+ PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder();
+
+ // Add keys to PurgeKeysRequest bucket wise.
+ for (Map.Entry<Pair<String, String>, List<String>> entry :
+ purgeKeysMapPerBucket.entrySet()) {
+ Pair<String, String> volumeBucketPair = entry.getKey();
+ DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder()
+ .setVolumeName(volumeBucketPair.getLeft())
+ .setBucketName(volumeBucketPair.getRight())
+ .addAllKeys(entry.getValue())
+ .build();
+ purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
+ }
OMRequest omRequest = OMRequest.newBuilder()
.setCmdType(Type.PurgeKeys)
@@ -253,4 +272,21 @@ public class KeyDeletingService extends BackgroundService {
return deletedCount;
}
}
+
+ /**
+ * Parse Volume and Bucket Name from ObjectKey and add it to given map of
+ * keys to be purged per bucket.
+ */
+ private void addToMap(Map<Pair<String, String>, List<String>> map,
+ String objectKey) {
+ // Parse volume and bucket name
+ String[] split = objectKey.split(OM_KEY_PREFIX);
+ Preconditions.assertTrue(split.length > 3, "Volume and/or Bucket Name " +
+ "missing from Key Name.");
+ Pair<String, String> volumeBucketPair = Pair.of(split[1], split[2]);
+ if (!map.containsKey(volumeBucketPair)) {
+ map.put(volumeBucketPair, new ArrayList<>());
+ }
+ map.get(volumeBucketPair).add(objectKey);
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index 0699b2a..3b50880 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -18,14 +18,20 @@
package org.apache.hadoop.ozone.om.request.key;
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.slf4j.Logger;
@@ -33,6 +39,8 @@ import org.slf4j.LoggerFactory;
import java.util.List;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
/**
* Handles purging of keys from OM DB.
*/
@@ -47,27 +55,112 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
@Override
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
- long transactionLogIndex,
- OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
- PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest();
- List<String> purgeKeysList = purgeKeysRequest.getKeysList();
+ long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
- LOG.debug("Processing Purge Keys for {} number of keys.",
- purgeKeysList.size());
+ OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+ PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest();
+ List<DeletedKeys> bucketDeletedKeysList = purgeKeysRequest
+ .getDeletedKeysList();
+ List<String> keysToBePurgedList = new ArrayList<>();
- OMResponse omResponse = OMResponse.newBuilder()
+ OMResponse.Builder omResponse = OMResponse.newBuilder()
.setCmdType(Type.PurgeKeys)
- .setPurgeKeysResponse(
- OzoneManagerProtocolProtos.PurgeKeysResponse.newBuilder().build())
+ .setPurgeKeysResponse(PurgeKeysResponse.newBuilder().build())
.setStatus(Status.OK)
- .setSuccess(true)
- .build();
+ .setSuccess(true);
+ OMClientResponse omClientResponse = null;
+ boolean success = true;
+ IOException exception = null;
+
+ // Filter the keys that have updateID > transactionLogIndex. This is done so
+ // that in case this transaction is a replay, we do not purge keys
+ // created after the original purge request.
+ // PurgeKeys request has keys belonging to same bucket grouped together.
+ // We get each bucket lock and check the above condition.
+ for (DeletedKeys bucketWithDeleteKeys : bucketDeletedKeysList) {
+ boolean acquiredLock = false;
+ String volumeName = bucketWithDeleteKeys.getVolumeName();
+ String bucketName = bucketWithDeleteKeys.getBucketName();
+ ArrayList<String> keysNotPurged = new ArrayList<>();
+ Result result = null;
+ try {
+ acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+ volumeName, bucketName);
+ for (String deletedKey : bucketWithDeleteKeys.getKeysList()) {
+ RepeatedOmKeyInfo repeatedOmKeyInfo =
+ omMetadataManager.getDeletedTable().get(deletedKey);
+ boolean purgeKey = true;
+ if (repeatedOmKeyInfo != null) {
+ for (OmKeyInfo omKeyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
+ // Discard those keys whose updateID is > transactionLogIndex.
+ // This could happen when the PurgeRequest is replayed.
+ if (isReplay(ozoneManager, omKeyInfo.getUpdateID(),
+ trxnLogIndex)) {
+ purgeKey = false;
+ result = Result.REPLAY;
+ break;
+ }
+ // TODO: If a deletedKey has any one OmKeyInfo which was
+ // deleted after the original PurgeRequest (updateID >
+ // trxnLogIndex), we avoid purging that whole key in the
+ // replay request. Instead of discarding the whole key, we can
+ // identify the OmKeyInfo's which have updateID <
+ // trxnLogIndex and purge only those OMKeyInfo's from the
+ // deletedKey in DeletedTable.
+ }
+ if (purgeKey) {
+ keysToBePurgedList.add(deletedKey);
+ } else {
+ keysNotPurged.add(deletedKey);
+ }
+ }
+ }
+ } catch (IOException ex) {
+ success = false;
+ exception = ex;
+ break;
+ } finally {
+ if (acquiredLock) {
+ omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+ bucketName);
+ }
+ }
+
+ if (result == Result.REPLAY) {
+ LOG.debug("Replayed Transaction {}. Request: {}", trxnLogIndex,
+ purgeKeysRequest);
+ if (!keysNotPurged.isEmpty()) {
+ StringBuilder notPurgeList = new StringBuilder();
+ for (String key : keysNotPurged) {
+ notPurgeList.append(", ").append(key);
+ }
+ LOG.debug("Following keys from Volume:{}, Bucket:{} will not be" +
+ " purged: {}", notPurgeList.toString().substring(2));
+ }
+ }
+ }
+
+ if (success) {
+ if (LOG.isDebugEnabled()) {
+ if (keysToBePurgedList.isEmpty()) {
+ LOG.debug("No keys will be purged as part of KeyPurgeRequest: {}",
+ purgeKeysRequest);
+ } else {
+ LOG.debug("Following keys will be purged as part of " +
+ "KeyPurgeRequest: {} - {}", purgeKeysRequest,
+ String.join(",", keysToBePurgedList));
+ }
+ }
+ omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
+ keysToBePurgedList);
+ } else {
+ omClientResponse = new OMKeyPurgeResponse(createErrorOMResponse(
+ omResponse, exception));
+ }
- OMClientResponse omClientResponse = new OMKeyPurgeResponse(purgeKeysList,
- omResponse);
- omClientResponse.setFlushFuture(
- ozoneManagerDoubleBufferHelper.add(omClientResponse,
- transactionLogIndex));
+ omClientResponse.setFlushFuture(omDoubleBufferHelper.add(
+ omClientResponse, trxnLogIndex));
return omClientResponse;
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
index 513b94d..abfc0f6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.om.response.key;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -36,21 +35,28 @@ public class OMKeyPurgeResponse extends OMClientResponse {
private List<String> purgeKeyList;
- public OMKeyPurgeResponse(List<String> keyList,
- @Nonnull OMResponse omResponse) {
+ public OMKeyPurgeResponse(@Nonnull OMResponse omResponse,
+ List<String> keyList) {
super(omResponse);
this.purgeKeyList = keyList;
}
+ /**
+ * For when the request is not successful or it is a replay transaction.
+ * For a successful request, the other constructor should be used.
+ */
+ public OMKeyPurgeResponse(@Nonnull OMResponse omResponse) {
+ super(omResponse);
+ checkStatusNotOK();
+ }
+
@Override
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
- if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
- for (String key : purgeKeyList) {
- omMetadataManager.getDeletedTable().deleteWithBatch(batchOperation,
- key);
- }
+ for (String key : purgeKeyList) {
+ omMetadataManager.getDeletedTable().deleteWithBatch(batchOperation,
+ key);
}
}
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
index de50c70..d8f6b02 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.ozone.om.request;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -88,14 +87,7 @@ public final class TestOMRequestUtils {
addVolumeToDB(volumeName, omMetadataManager);
- OmBucketInfo omBucketInfo =
- OmBucketInfo.newBuilder().setVolumeName(volumeName)
- .setBucketName(bucketName).setCreationTime(Time.now()).build();
-
- // Add to cache.
- omMetadataManager.getBucketTable().addCacheEntry(
- new CacheKey<>(omMetadataManager.getBucketKey(volumeName, bucketName)),
- new CacheValue<>(Optional.of(omBucketInfo), 1L));
+ addBucketToDB(volumeName, bucketName, omMetadataManager);
}
@SuppressWarnings("parameterNumber")
@@ -280,6 +272,25 @@ public final class TestOMRequestUtils {
new CacheValue<>(Optional.of(omVolumeArgs), 1L));
}
+ /**
+ * Add bucket creation entry to OM DB.
+ * @param volumeName
+ * @param bucketName
+ * @param omMetadataManager
+ * @throws Exception
+ */
+ public static void addBucketToDB(String volumeName, String bucketName,
+ OMMetadataManager omMetadataManager) throws Exception {
+
+ OmBucketInfo omBucketInfo =
+ OmBucketInfo.newBuilder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setCreationTime(Time.now()).build();
+
+ // Add to cache.
+ omMetadataManager.getBucketTable().addCacheEntry(
+ new CacheKey<>(omMetadataManager.getBucketKey(volumeName, bucketName)),
+ new CacheValue<>(Optional.of(omBucketInfo), 1L));
+ }
public static OzoneManagerProtocolProtos.OMRequest createBucketRequest(
String bucketName, String volumeName, boolean isVersionEnabled,
@@ -455,9 +466,11 @@ public final class TestOMRequestUtils {
* @return the deletedKey name
*/
public static String deleteKey(String ozoneKey,
- OMMetadataManager omMetadataManager) throws IOException {
+ OMMetadataManager omMetadataManager, long trxnLogIndex)
+ throws IOException {
// Retrieve the keyInfo
OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
+ omKeyInfo.setUpdateID(trxnLogIndex);
// Delete key from KeyTable and put in DeletedKeyTable
omMetadataManager.getKeyTable().delete(ozoneKey);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
index 1fc9627..b60d68e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
@@ -156,7 +156,7 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
long deleteTrxnLogIndex = 10L;
String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
keyName);
- TestOMRequestUtils.deleteKey(ozoneKey, omMetadataManager);
+ TestOMRequestUtils.deleteKey(ozoneKey, omMetadataManager, 10L);
// Create the same key again with TransactionLogIndex > Delete requests
// TransactionLogIndex
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index df6b177..10b45ad 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -29,6 +29,7 @@ import org.junit.Test;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
@@ -48,24 +49,29 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
* Creates volume, bucket and key entries and adds to OM DB and then
* deletes these keys to move them to deletedKeys table.
*/
- private List<String> createAndDeleteKeys() throws Exception {
+ private List<String> createAndDeleteKeys(Integer trxnIndex, String bucket)
+ throws Exception {
+ if (bucket == null) {
+ bucket = bucketName;
+ }
// Add volume, bucket and key entries to OM DB.
- TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucket,
omMetadataManager);
List<String> ozoneKeyNames = new ArrayList<>(numKeys);
for (int i = 1; i <= numKeys; i++) {
String key = keyName + "-" + i;
- TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, key,
- clientID, replicationType, replicationFactor, omMetadataManager);
- ozoneKeyNames.add(
- omMetadataManager.getOzoneKey(volumeName, bucketName, key));
+ TestOMRequestUtils.addKeyToTable(false, false, volumeName, bucket,
+ key, clientID, replicationType, replicationFactor, trxnIndex++,
+ omMetadataManager);
+ ozoneKeyNames.add(omMetadataManager.getOzoneKey(
+ volumeName, bucket, key));
}
List<String> deletedKeyNames = new ArrayList<>(numKeys);
for (String ozoneKey : ozoneKeyNames) {
String deletedKeyName = TestOMRequestUtils.deleteKey(
- ozoneKey, omMetadataManager);
+ ozoneKey, omMetadataManager, trxnIndex++);
deletedKeyNames.add(deletedKeyName);
}
@@ -77,9 +83,14 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
* @return OMRequest
*/
private OMRequest createPurgeKeysRequest(List<String> deletedKeys) {
- PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
+ DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
.addAllKeys(deletedKeys)
.build();
+ PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
+ .addDeletedKeys(deletedKeysInBucket)
+ .build();
return OMRequest.newBuilder()
.setPurgeKeysRequest(purgeKeysRequest)
@@ -88,10 +99,22 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
.build();
}
+ private OMRequest preExecute(OMRequest originalOmRequest) throws IOException {
+ OMKeyPurgeRequest omKeyPurgeRequest =
+ new OMKeyPurgeRequest(originalOmRequest);
+
+ OMRequest modifiedOmRequest = omKeyPurgeRequest.preExecute(ozoneManager);
+
+ // Will not be equal, as UserInfo will be set.
+ Assert.assertNotEquals(originalOmRequest, modifiedOmRequest);
+
+ return modifiedOmRequest;
+ }
+
@Test
public void testValidateAndUpdateCache() throws Exception {
// Create and Delete keys. The keys should be moved to DeletedKeys table
- List<String> deletedKeyNames = createAndDeleteKeys();
+ List<String> deletedKeyNames = createAndDeleteKeys(1, null);
// The keys should be present in the DeletedKeys table before purging
for (String deletedKey : deletedKeyNames) {
@@ -106,9 +129,8 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
OMKeyPurgeRequest omKeyPurgeRequest =
new OMKeyPurgeRequest(preExecutedRequest);
- OMClientResponse omClientResponse =
- omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L,
- ozoneManagerDoubleBufferHelper);
+ omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L,
+ ozoneManagerDoubleBufferHelper);
OMResponse omResponse = OMResponse.newBuilder()
.setPurgeKeysResponse(PurgeKeysResponse.getDefaultInstance())
@@ -119,8 +141,8 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
BatchOperation batchOperation =
omMetadataManager.getStore().initBatchOperation();
- OMKeyPurgeResponse omKeyPurgeResponse =
- new OMKeyPurgeResponse(deletedKeyNames, omResponse);
+ OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
+ omResponse, deletedKeyNames);
omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
// Do manual commit and see whether addToBatch is successful or not.
@@ -133,15 +155,119 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
}
}
- private OMRequest preExecute(OMRequest originalOmRequest) throws IOException {
+ @Test
+ public void testPurgeKeysAcrossBuckets() throws Exception {
+ String bucket1 = bucketName;
+ String bucket2 = UUID.randomUUID().toString();
+
+ // bucket1 is created during setup. Create bucket2 manually.
+ TestOMRequestUtils.addBucketToDB(volumeName, bucket2, omMetadataManager);
+
+ // Create and Delete keys in Bucket1 and Bucket2.
+ List<String> deletedKeyInBucket1 = createAndDeleteKeys(1, bucket1);
+ List<String> deletedKeyInBucket2 = createAndDeleteKeys(1, bucket2);
+ List<String> deletedKeyNames = new ArrayList<>();
+ deletedKeyNames.addAll(deletedKeyInBucket1);
+ deletedKeyNames.addAll(deletedKeyInBucket2);
+
+ // The keys should be present in the DeletedKeys table before purging
+ for (String deletedKey : deletedKeyNames) {
+ Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+ deletedKey));
+ }
+
+ // Create PurgeKeysRequest to purge the deleted keys
+ DeletedKeys deletedKeysInBucket1 = DeletedKeys.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucket1)
+ .addAllKeys(deletedKeyInBucket1)
+ .build();
+ DeletedKeys deletedKeysInBucket2 = DeletedKeys.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucket2)
+ .addAllKeys(deletedKeyInBucket1)
+ .build();
+ PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
+ .addDeletedKeys(deletedKeysInBucket1)
+ .addDeletedKeys(deletedKeysInBucket2)
+ .build();
+
+ OMRequest omRequest = OMRequest.newBuilder()
+ .setPurgeKeysRequest(purgeKeysRequest)
+ .setCmdType(Type.PurgeKeys)
+ .setClientId(UUID.randomUUID().toString())
+ .build();
+
+ OMRequest preExecutedRequest = preExecute(omRequest);
OMKeyPurgeRequest omKeyPurgeRequest =
- new OMKeyPurgeRequest(originalOmRequest);
+ new OMKeyPurgeRequest(preExecutedRequest);
- OMRequest modifiedOmRequest = omKeyPurgeRequest.preExecute(ozoneManager);
+ omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L,
+ ozoneManagerDoubleBufferHelper);
- // Will not be equal, as UserInfo will be set.
- Assert.assertNotEquals(originalOmRequest, modifiedOmRequest);
+ OMResponse omResponse = OMResponse.newBuilder()
+ .setPurgeKeysResponse(PurgeKeysResponse.getDefaultInstance())
+ .setCmdType(Type.PurgeKeys)
+ .setStatus(Status.OK)
+ .build();
- return modifiedOmRequest;
+ BatchOperation batchOperation =
+ omMetadataManager.getStore().initBatchOperation();
+
+ OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
+ omResponse, deletedKeyNames);
+ omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
+
+ // Do manual commit and see whether addToBatch is successful or not.
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+ // The keys should not exist in the DeletedKeys table
+ for (String deletedKey : deletedKeyNames) {
+ Assert.assertFalse(omMetadataManager.getDeletedTable().isExist(
+ deletedKey));
+ }
+ }
+
+ @Test
+ public void testReplayRequest() throws Exception {
+
+ // Create and Delete keys. The keys should be moved to DeletedKeys table
+ Integer trxnLogIndex = new Integer(1);
+ List<String> deletedKeyNames = createAndDeleteKeys(trxnLogIndex, null);
+ int purgeRequestTrxnLogIndex = ++trxnLogIndex;
+
+ // The keys should be present in the DeletedKeys table before purging
+ for (String deletedKey : deletedKeyNames) {
+ Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+ deletedKey));
+ }
+
+ // Execute PurgeKeys request to purge the keys from Deleted table.
+ // Create PurgeKeysRequest to replay the purge request
+ OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames);
+ OMRequest preExecutedRequest = preExecute(omRequest);
+ OMKeyPurgeRequest omKeyPurgeRequest =
+ new OMKeyPurgeRequest(preExecutedRequest);
+ OMClientResponse omClientResponse = omKeyPurgeRequest
+ .validateAndUpdateCache(ozoneManager, purgeRequestTrxnLogIndex,
+ ozoneManagerDoubleBufferHelper);
+
+ Assert.assertTrue(omClientResponse.getOMResponse().getStatus().equals(
+ Status.OK));
+
+ // Create and delete the same keys again
+ createAndDeleteKeys(++trxnLogIndex, null);
+
+ // Replay the PurgeKeys request. It should not purge the keys deleted
+ // after the original request was played.
+ OMClientResponse replayResponse = omKeyPurgeRequest
+ .validateAndUpdateCache(ozoneManager, purgeRequestTrxnLogIndex,
+ ozoneManagerDoubleBufferHelper);
+
+ // Verify that the new deletedKeys exist in the DeletedKeys table
+ for (String deletedKey : deletedKeyNames) {
+ Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+ deletedKey));
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org