You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/02/01 00:49:13 UTC

[hadoop-ozone] branch master updated: HDDS-2893. Handle replay of KeyPurge Request. (#450)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bb57442  HDDS-2893. Handle replay of KeyPurge Request. (#450)
bb57442 is described below

commit bb574424f96a5ef685de11aa7694c9f38d51f14a
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Fri Jan 31 16:49:05 2020 -0800

    HDDS-2893. Handle replay of KeyPurge Request. (#450)
---
 .../src/main/proto/OzoneManagerProtocol.proto      |   8 +-
 .../apache/hadoop/ozone/om/KeyDeletingService.java |  46 +++++-
 .../ozone/om/request/key/OMKeyPurgeRequest.java    | 127 +++++++++++++---
 .../ozone/om/response/key/OMKeyPurgeResponse.java  |  22 ++-
 .../ozone/om/request/TestOMRequestUtils.java       |  33 ++--
 .../om/request/key/TestOMKeyDeleteRequest.java     |   2 +-
 .../key/TestOMKeyPurgeRequestAndResponse.java      | 166 ++++++++++++++++++---
 7 files changed, 342 insertions(+), 62 deletions(-)

diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 568a1d9..65bf10a 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -853,8 +853,14 @@ message DeleteKeyResponse {
     optional uint64 openVersion = 4;
 }
 
+message DeletedKeys {
+    required string volumeName = 1;
+    required string bucketName = 2;
+    repeated string keys = 3;
+}
+
 message PurgeKeysRequest {
-    repeated string keys = 1;
+    repeated DeletedKeys deletedKeys = 1;
 }
 
 message PurgeKeysResponse {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
index 3005617..27ad7d8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -18,15 +18,19 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
@@ -38,6 +42,8 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 
@@ -45,6 +51,7 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.util.Preconditions;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -218,7 +225,8 @@ public class KeyDeletingService extends BackgroundService {
      * @throws IOException      on Error
      */
     public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results) {
-      List<String> purgeKeysList = new ArrayList<>();
+      Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+          new HashMap<>();
 
       // Put all keys to be purged in a list
       int deletedCount = 0;
@@ -226,15 +234,26 @@ public class KeyDeletingService extends BackgroundService {
         if (result.isSuccess()) {
           // Add key to PurgeKeys list.
           String deletedKey = result.getObjectKey();
-          purgeKeysList.add(deletedKey);
+          // Parse Volume and BucketName
+          addToMap(purgeKeysMapPerBucket, deletedKey);
           LOG.debug("Key {} set to be purged from OM DB", deletedKey);
           deletedCount++;
         }
       }
 
-      PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
-          .addAllKeys(purgeKeysList)
-          .build();
+      PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder();
+
+      // Add keys to PurgeKeysRequest bucket wise.
+      for (Map.Entry<Pair<String, String>, List<String>> entry :
+          purgeKeysMapPerBucket.entrySet()) {
+        Pair<String, String> volumeBucketPair = entry.getKey();
+        DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder()
+            .setVolumeName(volumeBucketPair.getLeft())
+            .setBucketName(volumeBucketPair.getRight())
+            .addAllKeys(entry.getValue())
+            .build();
+        purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
+      }
 
       OMRequest omRequest = OMRequest.newBuilder()
           .setCmdType(Type.PurgeKeys)
@@ -253,4 +272,21 @@ public class KeyDeletingService extends BackgroundService {
       return deletedCount;
     }
   }
+
+  /**
+   * Parse Volume and Bucket Name from ObjectKey and add it to given map of
+   * keys to be purged per bucket.
+   */
+  private void addToMap(Map<Pair<String, String>, List<String>> map,
+      String objectKey) {
+    // Parse volume and bucket name
+    String[] split = objectKey.split(OM_KEY_PREFIX);
+    Preconditions.assertTrue(split.length > 3, "Volume and/or Bucket Name " +
+        "missing from Key Name.");
+    Pair<String, String> volumeBucketPair = Pair.of(split[1], split[2]);
+    if (!map.containsKey(volumeBucketPair)) {
+      map.put(volumeBucketPair, new ArrayList<>());
+    }
+    map.get(volumeBucketPair).add(objectKey);
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index 0699b2a..3b50880 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -18,14 +18,20 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.slf4j.Logger;
@@ -33,6 +39,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
 /**
  * Handles purging of keys from OM DB.
  */
@@ -47,27 +55,112 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
 
   @Override
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
-      long transactionLogIndex,
-      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
-    PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest();
-    List<String> purgeKeysList = purgeKeysRequest.getKeysList();
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
 
-    LOG.debug("Processing Purge Keys for {} number of keys.",
-        purgeKeysList.size());
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest();
+    List<DeletedKeys> bucketDeletedKeysList = purgeKeysRequest
+        .getDeletedKeysList();
+    List<String> keysToBePurgedList = new ArrayList<>();
 
-    OMResponse omResponse = OMResponse.newBuilder()
+    OMResponse.Builder omResponse = OMResponse.newBuilder()
         .setCmdType(Type.PurgeKeys)
-        .setPurgeKeysResponse(
-            OzoneManagerProtocolProtos.PurgeKeysResponse.newBuilder().build())
+        .setPurgeKeysResponse(PurgeKeysResponse.newBuilder().build())
         .setStatus(Status.OK)
-        .setSuccess(true)
-        .build();
+        .setSuccess(true);
+    OMClientResponse omClientResponse = null;
+    boolean success = true;
+    IOException exception = null;
+
+    // Filter the keys that have updateID > transactionLogIndex. This is done so
+    // that in case this transaction is a replay, we do not purge keys
+    // created after the original purge request.
+    // PurgeKeys request has keys belonging to same bucket grouped together.
+    // We get each bucket lock and check the above condition.
+    for (DeletedKeys bucketWithDeleteKeys : bucketDeletedKeysList) {
+      boolean acquiredLock = false;
+      String volumeName = bucketWithDeleteKeys.getVolumeName();
+      String bucketName = bucketWithDeleteKeys.getBucketName();
+      ArrayList<String> keysNotPurged = new ArrayList<>();
+      Result result = null;
+      try {
+        acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+            volumeName, bucketName);
+        for (String deletedKey : bucketWithDeleteKeys.getKeysList()) {
+          RepeatedOmKeyInfo repeatedOmKeyInfo =
+              omMetadataManager.getDeletedTable().get(deletedKey);
+          boolean purgeKey = true;
+          if (repeatedOmKeyInfo != null) {
+            for (OmKeyInfo omKeyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
+              // Discard those keys whose updateID is > transactionLogIndex.
+              // This could happen when the PurgeRequest is replayed.
+              if (isReplay(ozoneManager, omKeyInfo.getUpdateID(),
+                  trxnLogIndex)) {
+                purgeKey = false;
+                result = Result.REPLAY;
+                break;
+              }
+              // TODO: If a deletedKey has any one OmKeyInfo which was
+              //  deleted after the original PurgeRequest (updateID >
+              //  trxnLogIndex), we avoid purging that whole key in the
+              //  replay request. Instead of discarding the whole key, we can
+              //  identify the OmKeyInfo's which have updateID <
+              //  trxnLogIndex and purge only those OMKeyInfo's from the
+              //  deletedKey in DeletedTable.
+            }
+            if (purgeKey) {
+              keysToBePurgedList.add(deletedKey);
+            } else {
+              keysNotPurged.add(deletedKey);
+            }
+          }
+        }
+      } catch (IOException ex) {
+        success = false;
+        exception = ex;
+        break;
+      } finally {
+        if (acquiredLock) {
+          omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+              bucketName);
+        }
+      }
+
+      if (result == Result.REPLAY) {
+        LOG.debug("Replayed Transaction {}. Request: {}", trxnLogIndex,
+            purgeKeysRequest);
+        if (!keysNotPurged.isEmpty()) {
+          StringBuilder notPurgeList = new StringBuilder();
+          for (String key : keysNotPurged) {
+            notPurgeList.append(", ").append(key);
+          }
+          LOG.debug("Following keys from Volume:{}, Bucket:{} will not be" +
+              " purged: {}", notPurgeList.toString().substring(2));
+        }
+      }
+    }
+
+    if (success) {
+      if (LOG.isDebugEnabled()) {
+        if (keysToBePurgedList.isEmpty()) {
+          LOG.debug("No keys will be purged as part of KeyPurgeRequest: {}",
+              purgeKeysRequest);
+        } else {
+          LOG.debug("Following keys will be purged as part of " +
+              "KeyPurgeRequest: {} - {}", purgeKeysRequest,
+              String.join(",", keysToBePurgedList));
+        }
+      }
+      omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
+          keysToBePurgedList);
+    } else {
+      omClientResponse = new OMKeyPurgeResponse(createErrorOMResponse(
+          omResponse, exception));
+    }
 
-    OMClientResponse omClientResponse = new OMKeyPurgeResponse(purgeKeysList,
-        omResponse);
-    omClientResponse.setFlushFuture(
-        ozoneManagerDoubleBufferHelper.add(omClientResponse,
-            transactionLogIndex));
+    omClientResponse.setFlushFuture(omDoubleBufferHelper.add(
+        omClientResponse, trxnLogIndex));
     return omClientResponse;
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
index 513b94d..abfc0f6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.om.response.key;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
@@ -36,21 +35,28 @@ public class OMKeyPurgeResponse extends OMClientResponse {
 
   private List<String> purgeKeyList;
 
-  public OMKeyPurgeResponse(List<String> keyList,
-      @Nonnull OMResponse omResponse) {
+  public OMKeyPurgeResponse(@Nonnull OMResponse omResponse,
+      List<String> keyList) {
     super(omResponse);
     this.purgeKeyList = keyList;
   }
 
+  /**
+   * For when the request is not successful or it is a replay transaction.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMKeyPurgeResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
-    if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
-      for (String key : purgeKeyList) {
-        omMetadataManager.getDeletedTable().deleteWithBatch(batchOperation,
-            key);
-      }
+    for (String key : purgeKeyList) {
+      omMetadataManager.getDeletedTable().deleteWithBatch(batchOperation,
+          key);
     }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
index de50c70..d8f6b02 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
@@ -19,7 +19,6 @@
 
 package org.apache.hadoop.ozone.om.request;
 
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -88,14 +87,7 @@ public final class TestOMRequestUtils {
 
     addVolumeToDB(volumeName, omMetadataManager);
 
-    OmBucketInfo omBucketInfo =
-        OmBucketInfo.newBuilder().setVolumeName(volumeName)
-            .setBucketName(bucketName).setCreationTime(Time.now()).build();
-
-    // Add to cache.
-    omMetadataManager.getBucketTable().addCacheEntry(
-        new CacheKey<>(omMetadataManager.getBucketKey(volumeName, bucketName)),
-        new CacheValue<>(Optional.of(omBucketInfo), 1L));
+    addBucketToDB(volumeName, bucketName, omMetadataManager);
   }
 
   @SuppressWarnings("parameterNumber")
@@ -280,6 +272,25 @@ public final class TestOMRequestUtils {
             new CacheValue<>(Optional.of(omVolumeArgs), 1L));
   }
 
+  /**
+   * Add bucket creation entry to OM DB.
+   * @param volumeName
+   * @param bucketName
+   * @param omMetadataManager
+   * @throws Exception
+   */
+  public static void addBucketToDB(String volumeName, String bucketName,
+      OMMetadataManager omMetadataManager) throws Exception {
+
+    OmBucketInfo omBucketInfo =
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName).setCreationTime(Time.now()).build();
+
+    // Add to cache.
+    omMetadataManager.getBucketTable().addCacheEntry(
+        new CacheKey<>(omMetadataManager.getBucketKey(volumeName, bucketName)),
+        new CacheValue<>(Optional.of(omBucketInfo), 1L));
+  }
 
   public static OzoneManagerProtocolProtos.OMRequest createBucketRequest(
       String bucketName, String volumeName, boolean isVersionEnabled,
@@ -455,9 +466,11 @@ public final class TestOMRequestUtils {
    * @return the deletedKey name
    */
   public static String deleteKey(String ozoneKey,
-      OMMetadataManager omMetadataManager) throws IOException {
+      OMMetadataManager omMetadataManager, long trxnLogIndex)
+      throws IOException {
     // Retrieve the keyInfo
     OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
+    omKeyInfo.setUpdateID(trxnLogIndex);
 
     // Delete key from KeyTable and put in DeletedKeyTable
     omMetadataManager.getKeyTable().delete(ozoneKey);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
index 1fc9627..b60d68e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
@@ -156,7 +156,7 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
     long deleteTrxnLogIndex = 10L;
     String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
         keyName);
-    TestOMRequestUtils.deleteKey(ozoneKey, omMetadataManager);
+    TestOMRequestUtils.deleteKey(ozoneKey, omMetadataManager, 10L);
 
     // Create the same key again with TransactionLogIndex > Delete requests
     // TransactionLogIndex
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index df6b177..10b45ad 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
@@ -48,24 +49,29 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
    * Creates volume, bucket and key entries and adds to OM DB and then
    * deletes these keys to move them to deletedKeys table.
    */
-  private List<String> createAndDeleteKeys() throws Exception {
+  private List<String> createAndDeleteKeys(Integer trxnIndex, String bucket)
+      throws Exception {
+    if (bucket == null) {
+      bucket = bucketName;
+    }
     // Add volume, bucket and key entries to OM DB.
-    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucket,
         omMetadataManager);
 
     List<String> ozoneKeyNames = new ArrayList<>(numKeys);
     for (int i = 1; i <= numKeys; i++) {
       String key = keyName + "-" + i;
-      TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, key,
-          clientID, replicationType, replicationFactor, omMetadataManager);
-      ozoneKeyNames.add(
-          omMetadataManager.getOzoneKey(volumeName, bucketName, key));
+      TestOMRequestUtils.addKeyToTable(false, false, volumeName, bucket,
+          key, clientID, replicationType, replicationFactor, trxnIndex++,
+          omMetadataManager);
+      ozoneKeyNames.add(omMetadataManager.getOzoneKey(
+          volumeName, bucket, key));
     }
 
     List<String> deletedKeyNames = new ArrayList<>(numKeys);
     for (String ozoneKey : ozoneKeyNames) {
       String deletedKeyName = TestOMRequestUtils.deleteKey(
-          ozoneKey, omMetadataManager);
+          ozoneKey, omMetadataManager, trxnIndex++);
       deletedKeyNames.add(deletedKeyName);
     }
 
@@ -77,9 +83,14 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
    * @return OMRequest
    */
   private OMRequest createPurgeKeysRequest(List<String> deletedKeys) {
-    PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
+    DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
         .addAllKeys(deletedKeys)
         .build();
+    PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
+        .addDeletedKeys(deletedKeysInBucket)
+        .build();
 
     return OMRequest.newBuilder()
         .setPurgeKeysRequest(purgeKeysRequest)
@@ -88,10 +99,22 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
         .build();
   }
 
+  private OMRequest preExecute(OMRequest originalOmRequest) throws IOException {
+    OMKeyPurgeRequest omKeyPurgeRequest =
+        new OMKeyPurgeRequest(originalOmRequest);
+
+    OMRequest modifiedOmRequest = omKeyPurgeRequest.preExecute(ozoneManager);
+
+    // Will not be equal, as UserInfo will be set.
+    Assert.assertNotEquals(originalOmRequest, modifiedOmRequest);
+
+    return modifiedOmRequest;
+  }
+
   @Test
   public void testValidateAndUpdateCache() throws Exception {
     // Create and Delete keys. The keys should be moved to DeletedKeys table
-    List<String> deletedKeyNames = createAndDeleteKeys();
+    List<String> deletedKeyNames = createAndDeleteKeys(1, null);
 
     // The keys should be present in the DeletedKeys table before purging
     for (String deletedKey : deletedKeyNames) {
@@ -106,9 +129,8 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
     OMKeyPurgeRequest omKeyPurgeRequest =
         new OMKeyPurgeRequest(preExecutedRequest);
 
-    OMClientResponse omClientResponse =
-        omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L,
-            ozoneManagerDoubleBufferHelper);
+    omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L,
+        ozoneManagerDoubleBufferHelper);
 
     OMResponse omResponse = OMResponse.newBuilder()
         .setPurgeKeysResponse(PurgeKeysResponse.getDefaultInstance())
@@ -119,8 +141,8 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
     BatchOperation batchOperation =
         omMetadataManager.getStore().initBatchOperation();
 
-    OMKeyPurgeResponse omKeyPurgeResponse =
-        new OMKeyPurgeResponse(deletedKeyNames, omResponse);
+    OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
+        omResponse, deletedKeyNames);
     omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
 
     // Do manual commit and see whether addToBatch is successful or not.
@@ -133,15 +155,119 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest {
     }
   }
 
-  private OMRequest preExecute(OMRequest originalOmRequest) throws IOException {
+  @Test
+  public void testPurgeKeysAcrossBuckets() throws Exception {
+    String bucket1 = bucketName;
+    String bucket2 = UUID.randomUUID().toString();
+
+    // bucket1 is created during setup. Create bucket2 manually.
+    TestOMRequestUtils.addBucketToDB(volumeName, bucket2, omMetadataManager);
+
+    // Create and Delete keys in Bucket1 and Bucket2.
+    List<String> deletedKeyInBucket1 = createAndDeleteKeys(1, bucket1);
+    List<String> deletedKeyInBucket2 = createAndDeleteKeys(1, bucket2);
+    List<String> deletedKeyNames = new ArrayList<>();
+    deletedKeyNames.addAll(deletedKeyInBucket1);
+    deletedKeyNames.addAll(deletedKeyInBucket2);
+
+    // The keys should be present in the DeletedKeys table before purging
+    for (String deletedKey : deletedKeyNames) {
+      Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+          deletedKey));
+    }
+
+    // Create PurgeKeysRequest to purge the deleted keys
+    DeletedKeys deletedKeysInBucket1 = DeletedKeys.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucket1)
+        .addAllKeys(deletedKeyInBucket1)
+        .build();
+    DeletedKeys deletedKeysInBucket2 = DeletedKeys.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucket2)
+        .addAllKeys(deletedKeyInBucket1)
+        .build();
+    PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
+        .addDeletedKeys(deletedKeysInBucket1)
+        .addDeletedKeys(deletedKeysInBucket2)
+        .build();
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setPurgeKeysRequest(purgeKeysRequest)
+        .setCmdType(Type.PurgeKeys)
+        .setClientId(UUID.randomUUID().toString())
+        .build();
+
+    OMRequest preExecutedRequest = preExecute(omRequest);
     OMKeyPurgeRequest omKeyPurgeRequest =
-        new OMKeyPurgeRequest(originalOmRequest);
+        new OMKeyPurgeRequest(preExecutedRequest);
 
-    OMRequest modifiedOmRequest = omKeyPurgeRequest.preExecute(ozoneManager);
+    omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L,
+        ozoneManagerDoubleBufferHelper);
 
-    // Will not be equal, as UserInfo will be set.
-    Assert.assertNotEquals(originalOmRequest, modifiedOmRequest);
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setPurgeKeysResponse(PurgeKeysResponse.getDefaultInstance())
+        .setCmdType(Type.PurgeKeys)
+        .setStatus(Status.OK)
+        .build();
 
-    return modifiedOmRequest;
+    BatchOperation batchOperation =
+        omMetadataManager.getStore().initBatchOperation();
+
+    OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
+        omResponse, deletedKeyNames);
+    omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
+
+    // Do manual commit and see whether addToBatch is successful or not.
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    // The keys should not exist in the DeletedKeys table
+    for (String deletedKey : deletedKeyNames) {
+      Assert.assertFalse(omMetadataManager.getDeletedTable().isExist(
+          deletedKey));
+    }
+  }
+
+  @Test
+  public void testReplayRequest() throws Exception {
+
+    // Create and Delete keys. The keys should be moved to DeletedKeys table
+    Integer trxnLogIndex = new Integer(1);
+    List<String> deletedKeyNames = createAndDeleteKeys(trxnLogIndex, null);
+    int purgeRequestTrxnLogIndex = ++trxnLogIndex;
+
+    // The keys should be present in the DeletedKeys table before purging
+    for (String deletedKey : deletedKeyNames) {
+      Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+          deletedKey));
+    }
+
+    // Execute PurgeKeys request to purge the keys from Deleted table.
+    // Create PurgeKeysRequest to replay the purge request
+    OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames);
+    OMRequest preExecutedRequest = preExecute(omRequest);
+    OMKeyPurgeRequest omKeyPurgeRequest =
+        new OMKeyPurgeRequest(preExecutedRequest);
+    OMClientResponse omClientResponse = omKeyPurgeRequest
+        .validateAndUpdateCache(ozoneManager, purgeRequestTrxnLogIndex,
+            ozoneManagerDoubleBufferHelper);
+
+    Assert.assertTrue(omClientResponse.getOMResponse().getStatus().equals(
+        Status.OK));
+
+    // Create and delete the same keys again
+    createAndDeleteKeys(++trxnLogIndex, null);
+
+    // Replay the PurgeKeys request. It should not purge the keys deleted
+    // after the original request was played.
+    OMClientResponse replayResponse = omKeyPurgeRequest
+        .validateAndUpdateCache(ozoneManager, purgeRequestTrxnLogIndex,
+            ozoneManagerDoubleBufferHelper);
+
+    // Verify that the new deletedKeys exist in the DeletedKeys table
+    for (String deletedKey : deletedKeyNames) {
+      Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+          deletedKey));
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org