You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2021/10/22 16:49:43 UTC

[ozone] branch master updated: HDDS-5461. Move old objects to delete table on overwrite (#2433)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a4bc83  HDDS-5461. Move old objects to delete table on overwrite (#2433)
2a4bc83 is described below

commit 2a4bc83496ee77753ed709c6c44cad4ecbb0de09
Author: UENISHI Kota <ku...@users.noreply.github.com>
AuthorDate: Sat Oct 23 01:49:28 2021 +0900

    HDDS-5461. Move old objects to delete table on overwrite (#2433)
---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |  2 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  | 14 +++-
 .../client/rpc/TestOzoneRpcClientAbstract.java     | 80 +++++++++++++++++++++-
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |  5 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  2 +-
 .../ozone/om/request/file/OMFileRequest.java       | 17 +++++
 .../ozone/om/request/key/OMKeyCommitRequest.java   | 65 +++++++++++++++++-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  | 26 ++++++-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 17 +++--
 .../ozone/om/response/key/OMKeyCommitResponse.java | 18 ++++-
 .../response/key/OMKeyCommitResponseWithFSO.java   | 11 ++-
 .../ozone/om/request/TestOMRequestUtils.java       | 37 ++++++++--
 .../om/request/key/TestOMKeyCommitRequest.java     | 71 ++++++++++++++++++-
 .../request/key/TestOMKeyCommitRequestWithFSO.java |  2 +-
 .../om/request/key/TestOMKeyCreateRequest.java     |  4 +-
 .../ozone/om/request/key/TestOMKeyRequest.java     |  2 +
 .../om/response/key/TestOMKeyCommitResponse.java   | 28 ++++++--
 .../key/TestOMKeyCommitResponseWithFSO.java        |  5 +-
 .../ozone/om/response/key/TestOMKeyResponse.java   |  3 +
 19 files changed, 369 insertions(+), 40 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index dfb9cc1..eb36b76 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -442,7 +442,7 @@ public final class OmUtils {
    *                     of the MultipartUploadAbort request which needs to
    *                     be set as the updateID of the partKeyInfos.
    *                     For regular Key deletes, this value should be set to
-   *                     the same updaeID as is in keyInfo.
+   *                     the same updateID as is in keyInfo.
    * @return {@link RepeatedOmKeyInfo}
    */
   public static RepeatedOmKeyInfo prepareKeyForDelete(OmKeyInfo keyInfo,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index b8b0afe..da48fe4 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -272,11 +272,12 @@ public final class OmKeyInfo extends WithParentObjectId {
    *
    * @param newLocationList the list of new blocks to be added.
    * @param updateTime - if true, updates modification time.
+   * @param keepOldVersions - if false, old blocks won't be kept.
    * @throws IOException
    */
   public synchronized long addNewVersion(
-      List<OmKeyLocationInfo> newLocationList, boolean updateTime)
-      throws IOException {
+      List<OmKeyLocationInfo> newLocationList, boolean updateTime,
+      boolean keepOldVersions) {
     long latestVersionNum;
     if (keyLocationVersions.size() == 0) {
       // no version exist, these blocks are the very first version.
@@ -286,8 +287,17 @@ public final class OmKeyInfo extends WithParentObjectId {
       // it is important that the new version are always at the tail of the list
       OmKeyLocationInfoGroup currentLatestVersion =
           keyLocationVersions.get(keyLocationVersions.size() - 1);
+
+      // Create a new version here. When bucket versioning is enabled,
+      // It includes previous block versions. Otherwise, only the blocks
+      // of new version is included.
       OmKeyLocationInfoGroup newVersion =
           currentLatestVersion.generateNextVersion(newLocationList);
+      if (!keepOldVersions) {
+        // Even though old versions are cleared here, they will be
+        // moved to delete table at the time of key commit
+        keyLocationVersions.clear();
+      }
       keyLocationVersions.add(newVersion);
       latestVersionNum = newVersion.getVersion();
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 87ad651..dd8c880 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -958,7 +958,7 @@ public abstract class TestOzoneRpcClientAbstract {
         store.getVolume(volumeName).getBucket(bucketName).getUsedBytes());
 
     writeKey(bucket, keyName, ONE, value, valueLength);
-    Assert.assertEquals(valueLength * 2,
+    Assert.assertEquals(valueLength,
         store.getVolume(volumeName).getBucket(bucketName).getUsedBytes());
 
     bucket.deleteKey(keyName);
@@ -3666,4 +3666,82 @@ public abstract class TestOzoneRpcClientAbstract {
     }
 
   }
+
+  private void createRequiredForVersioningTest(String volumeName,
+      String bucketName, String keyName, boolean versioning) throws Exception {
+
+    ReplicationConfig replicationConfig = fromTypeAndFactor(RATIS,
+        HddsProtos.ReplicationFactor.THREE);
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+
+    // Bucket created with versioning false.
+    volume.createBucket(bucketName,
+        BucketArgs.newBuilder().setVersioning(versioning).build());
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    OzoneOutputStream out = bucket.createKey(keyName,
+        value.getBytes(UTF_8).length, replicationConfig, new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+
+    // Override key
+    out = bucket.createKey(keyName,
+        value.getBytes(UTF_8).length, replicationConfig, new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+  }
+
+  private void checkExceptedResultForVersioningTest(String volumeName,
+      String bucketName, String keyName, int expectedCount) throws Exception {
+    OmKeyInfo omKeyInfo = cluster.getOzoneManager().getMetadataManager()
+        .getKeyTable().get(cluster.getOzoneManager().getMetadataManager()
+            .getOzoneKey(volumeName, bucketName, keyName));
+
+    Assert.assertNotNull(omKeyInfo);
+    Assert.assertEquals(expectedCount,
+        omKeyInfo.getKeyLocationVersions().size());
+
+    if (expectedCount == 1) {
+      RepeatedOmKeyInfo repeatedOmKeyInfo = cluster
+          .getOzoneManager().getMetadataManager()
+          .getDeletedTable().get(cluster.getOzoneManager().getMetadataManager()
+              .getOzoneKey(volumeName, bucketName, keyName));
+
+      Assert.assertNotNull(repeatedOmKeyInfo);
+      Assert.assertEquals(expectedCount,
+          repeatedOmKeyInfo.getOmKeyInfoList().size());
+    } else {
+      // If expectedCount is greater than 1 means versioning enabled,
+      // so delete table should be empty.
+      RepeatedOmKeyInfo repeatedOmKeyInfo = cluster
+          .getOzoneManager().getMetadataManager()
+          .getDeletedTable().get(cluster.getOzoneManager().getMetadataManager()
+              .getOzoneKey(volumeName, bucketName, keyName));
+
+      Assert.assertNull(repeatedOmKeyInfo);
+    }
+  }
+
+  @Test
+  public void testOverWriteKeyWithAndWithOutVersioning() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    createRequiredForVersioningTest(volumeName, bucketName, keyName, false);
+
+    checkExceptedResultForVersioningTest(volumeName, bucketName, keyName, 1);
+
+
+    // Versioning turned on
+    volumeName = UUID.randomUUID().toString();
+    bucketName = UUID.randomUUID().toString();
+    keyName = UUID.randomUUID().toString();
+
+    createRequiredForVersioningTest(volumeName, bucketName, keyName, true);
+    checkExceptedResultForVersioningTest(volumeName, bucketName, keyName, 2);
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
index 1144757..02adb4d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
@@ -92,7 +92,10 @@ public class TestOmBlockVersioning {
     String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
     String keyName = "key" + RandomStringUtils.randomNumeric(5);
 
-    TestDataUtil.createVolumeAndBucket(cluster, volumeName, bucketName);
+    OzoneBucket bucket =
+        TestDataUtil.createVolumeAndBucket(cluster, volumeName, bucketName);
+    // Versioning isn't supported currently, but just preserving old behaviour
+    bucket.setVersioning(true);
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 72b49d9..090a43c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -521,7 +521,7 @@ public class KeyManagerImpl implements KeyManager {
       // the key already exist, the new blocks will be added as new version
       // when locations.size = 0, the new version will have identical blocks
       // as its previous version
-      keyInfo.addNewVersion(locations, true);
+      keyInfo.addNewVersion(locations, true, true);
       keyInfo.setDataSize(size + keyInfo.getDataSize());
     }
     if(keyInfo != null) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index 498a979..c66ed12 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
@@ -519,6 +520,22 @@ public final class OMFileRequest {
   }
 
   /**
+   * Updating the list of OmKeyInfo eligible for deleting blocks.
+   *
+   * @param omMetadataManager OM Metadata Manager
+   * @param dbDeletedKey      Ozone key in deletion table
+   * @param keysToDelete      Repeated OMKeyInfos
+   * @param trxnLogIndex      transaction log index
+   */
+  public static void addDeletedTableCacheEntry(
+          OMMetadataManager omMetadataManager, String dbDeletedKey,
+          RepeatedOmKeyInfo keysToDelete, long trxnLogIndex) {
+    omMetadataManager.getDeletedTable().addCacheEntry(
+            new CacheKey<>(dbDeletedKey),
+            new CacheValue<>(Optional.of(keysToDelete), trxnLogIndex));
+  }
+
+  /**
    * Adding omKeyInfo to open file table.
    *
    * @param omMetadataMgr    OM Metadata Manager
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 639addc..a36085f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -31,7 +31,11 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.slf4j.Logger;
@@ -43,8 +47,6 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
@@ -165,6 +167,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
               volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+      omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
 
       // Check for directory exists with same name, if it exists throw error.
       if (ozoneManager.getEnableFileSystemPaths()) {
@@ -201,6 +204,16 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
+      // If bucket versioning is turned on during the update, between key
+      // creation and key commit, old versions will be just overwritten and
+      // not kept. Bucket versioning will be effective from the first key
+      // creation after the knob turned on.
+      RepeatedOmKeyInfo keysToDelete = getOldVersionsToCleanUp(dbOzoneKey,
+          omMetadataManager, omBucketInfo.getIsVersionEnabled(), trxnLogIndex,
+          ozoneManager.isRatisEnabled());
+      OmKeyInfo keyToDelete =
+              omMetadataManager.getKeyTable().get(dbOzoneKey);
+
       // Add to cache of open key table and key table.
       omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
           new CacheKey<>(dbOpenKey),
@@ -210,6 +223,11 @@ public class OMKeyCommitRequest extends OMKeyRequest {
           new CacheKey<>(dbOzoneKey),
           new CacheValue<>(Optional.of(omKeyInfo), trxnLogIndex));
 
+      if (keysToDelete != null) {
+        OMFileRequest.addDeletedTableCacheEntry(omMetadataManager, dbOzoneKey,
+                keysToDelete, trxnLogIndex);
+      }
+
       long scmBlockSize = ozoneManager.getScmBlockSize();
       int factor = omKeyInfo.getReplicationConfig().getRequiredNodes();
       omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
@@ -219,10 +237,17 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       // be subtracted.
       long correctedSpace = omKeyInfo.getDataSize() * factor -
           allocatedLocationInfoList.size() * scmBlockSize * factor;
+      // Subtract the size of blocks to be overwritten.
+      if (keyToDelete != null) {
+        correctedSpace -= keyToDelete.getDataSize() *
+            keyToDelete.getReplicationConfig().getRequiredNodes();
+      }
+
       omBucketInfo.incrUsedBytes(correctedSpace);
 
       omClientResponse = new OMKeyCommitResponse(omResponse.build(),
-          omKeyInfo, dbOzoneKey, dbOpenKey, omBucketInfo.copyObject());
+          omKeyInfo, dbOzoneKey, dbOpenKey, omBucketInfo.copyObject(),
+          keysToDelete);
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
@@ -250,6 +275,40 @@ public class OMKeyCommitRequest extends OMKeyRequest {
   }
 
   /**
+   * Prepare key for deletion service on overwrite.
+   *
+   * @param dbOzoneKey key to point to an object in RocksDB
+   * @param omMetadataManager
+   * @param isVersionEnabled
+   * @param trxnLogIndex
+   * @param isRatisEnabled
+   * @return Old keys eligible for deletion.
+   * @throws IOException
+   */
+  protected RepeatedOmKeyInfo getOldVersionsToCleanUp(
+          String dbOzoneKey, OMMetadataManager omMetadataManager,
+          boolean isVersionEnabled, long trxnLogIndex,
+          boolean isRatisEnabled) throws IOException {
+    if (isVersionEnabled) {
+      // Nothing to clean up in case versioning is on.
+      return null;
+    }
+    // Past keys that was deleted but still in deleted table,
+    // waiting for deletion service.
+    RepeatedOmKeyInfo keysToDelete =
+            omMetadataManager.getDeletedTable().get(dbOzoneKey);
+    // Current key to be overwritten
+    OmKeyInfo keyToDelete =
+            omMetadataManager.getKeyTable().get(dbOzoneKey);
+
+    if (keyToDelete != null) {
+      keysToDelete = OmUtils.prepareKeyForDelete(keyToDelete, keysToDelete,
+                trxnLogIndex, isRatisEnabled);
+    }
+    return keysToDelete;
+  }
+
+  /**
    * Process result of om request execution.
    *
    * @param commitKeyRequest commit key request
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 0b90eb0..99b5f6f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
@@ -144,6 +145,16 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
+      // If bucket versioning is turned on during the update, between key
+      // creation and key commit, old versions will be just overwritten and
+      // not kept. Bucket versioning will be effective from the first key
+      // creation after the knob turned on.
+      RepeatedOmKeyInfo keysToDelete = getOldVersionsToCleanUp(dbFileKey,
+              omMetadataManager, omBucketInfo.getIsVersionEnabled(),
+              trxnLogIndex, ozoneManager.isRatisEnabled());
+      OmKeyInfo keyToDelete =
+              omMetadataManager.getKeyTable().get(dbFileKey);
+
       // Add to cache of open key table and key table.
       OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, dbFileKey,
               null, fileName, trxnLogIndex);
@@ -151,6 +162,11 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
       OMFileRequest.addFileTableCacheEntry(omMetadataManager, dbFileKey,
               omKeyInfo, fileName, trxnLogIndex);
 
+      if (keysToDelete != null) {
+        OMFileRequest.addDeletedTableCacheEntry(omMetadataManager, dbFileKey,
+                keysToDelete, trxnLogIndex);
+      }
+
       long scmBlockSize = ozoneManager.getScmBlockSize();
       int factor = omKeyInfo.getReplicationConfig().getRequiredNodes();
       // Block was pre-requested and UsedBytes updated when createKey and
@@ -159,10 +175,16 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
       // be subtracted.
       long correctedSpace = omKeyInfo.getDataSize() * factor -
               locationInfoList.size() * scmBlockSize * factor;
+      // Subtract the size of blocks to be overwritten.
+      if (keyToDelete != null) {
+        correctedSpace -= keyToDelete.getDataSize() *
+            keyToDelete.getReplicationConfig().getRequiredNodes();
+      }
       omBucketInfo.incrUsedBytes(correctedSpace);
 
       omClientResponse = new OMKeyCommitResponseWithFSO(omResponse.build(),
-              omKeyInfo, dbFileKey, dbOpenFileKey, omBucketInfo.copyObject());
+              omKeyInfo, dbFileKey, dbOpenFileKey, omBucketInfo.copyObject(),
+              keysToDelete);
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
@@ -174,7 +196,7 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
               omDoubleBufferHelper);
 
-      if(bucketLockAcquired) {
+      if (bucketLockAcquired) {
         omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
                 bucketName);
       }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 84d19b8..16089bb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -625,13 +625,16 @@ public abstract class OMKeyRequest extends OMClientRequest {
       //TODO args.getMetadata
     }
     if (dbKeyInfo != null) {
-      // TODO: Need to be fixed, as when key already exists, we are
-      //  appending new blocks to existing key.
-      // The key already exist, the new blocks will be added as new version
-      // when locations.size = 0, the new version will have identical blocks
-      // as its previous version
-      dbKeyInfo.addNewVersion(locations, false);
-      dbKeyInfo.setDataSize(size + dbKeyInfo.getDataSize());
+      // The key already exist, the new blocks will replace old ones
+      // as new versions unless the bucket does not have versioning
+      // turned on.
+      dbKeyInfo.addNewVersion(locations, false,
+              omBucketInfo.getIsVersionEnabled());
+      long newSize = size;
+      if (omBucketInfo.getIsVersionEnabled()) {
+        newSize += dbKeyInfo.getDataSize();
+      }
+      dbKeyInfo.setDataSize(newSize);
       // The modification time is set in preExecute. Use the same
       // modification time.
       dbKeyInfo.setModificationTime(keyArgs.getModificationTime());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index e738559..068e695 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.response.key;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -29,28 +30,31 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import java.io.IOException;
 import javax.annotation.Nonnull;
 
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
 
 /**
  * Response for CommitKey request.
  */
-@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, KEY_TABLE})
+@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, KEY_TABLE, DELETED_TABLE})
 public class OMKeyCommitResponse extends OMClientResponse {
 
   private OmKeyInfo omKeyInfo;
   private String ozoneKeyName;
   private String openKeyName;
   private OmBucketInfo omBucketInfo;
+  private RepeatedOmKeyInfo keysToDelete;
 
   public OMKeyCommitResponse(@Nonnull OMResponse omResponse,
       @Nonnull OmKeyInfo omKeyInfo, String ozoneKeyName, String openKeyName,
-      @Nonnull OmBucketInfo omBucketInfo) {
+      @Nonnull OmBucketInfo omBucketInfo, RepeatedOmKeyInfo keysToDelete) {
     super(omResponse);
     this.omKeyInfo = omKeyInfo;
     this.ozoneKeyName = ozoneKeyName;
     this.openKeyName = openKeyName;
     this.omBucketInfo = omBucketInfo;
+    this.keysToDelete = keysToDelete;
   }
 
   /**
@@ -73,6 +77,8 @@ public class OMKeyCommitResponse extends OMClientResponse {
     omMetadataManager.getKeyTable(getBucketLayout())
         .putWithBatch(batchOperation, ozoneKeyName, omKeyInfo);
 
+    updateDeletedTable(omMetadataManager, batchOperation);
+
     // update bucket usedBytes.
     omMetadataManager.getBucketTable().putWithBatch(batchOperation,
         omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
@@ -94,4 +100,12 @@ public class OMKeyCommitResponse extends OMClientResponse {
   protected String getOzoneKeyName() {
     return ozoneKeyName;
   }
+
+  protected void updateDeletedTable(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    if (this.keysToDelete != null) {
+      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+              ozoneKeyName, keysToDelete);
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
index ba68028..ecc9fd1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -33,19 +34,21 @@ import java.io.IOException;
 
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
 
 /**
  * Response for CommitKey request - prefix layout1.
  */
-@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE})
+@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE, DELETED_TABLE})
 public class OMKeyCommitResponseWithFSO extends OMKeyCommitResponse {
 
   public OMKeyCommitResponseWithFSO(@Nonnull OMResponse omResponse,
                                @Nonnull OmKeyInfo omKeyInfo,
                                String ozoneKeyName, String openKeyName,
-                               @Nonnull OmBucketInfo omBucketInfo) {
+                               @Nonnull OmBucketInfo omBucketInfo,
+                               RepeatedOmKeyInfo deleteKeys) {
     super(omResponse, omKeyInfo, ozoneKeyName, openKeyName,
-            omBucketInfo);
+            omBucketInfo, deleteKeys);
   }
 
   /**
@@ -68,6 +71,8 @@ public class OMKeyCommitResponseWithFSO extends OMKeyCommitResponse {
     OMFileRequest.addToFileTable(omMetadataManager, batchOperation,
             getOmKeyInfo());
 
+    updateDeletedTable(omMetadataManager, batchOperation);
+
     // update bucket usedBytes.
     omMetadataManager.getBucketTable().putWithBatch(batchOperation,
             omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(),
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
index 7938ce8..e9a9ad8 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
@@ -139,10 +139,10 @@ public final class TestOMRequestUtils {
       HddsProtos.ReplicationType replicationType,
       HddsProtos.ReplicationFactor replicationFactor,
       OMMetadataManager omMetadataManager,
-      List<OmKeyLocationInfo> locationList) throws Exception {
+      List<OmKeyLocationInfo> locationList, long version) throws Exception {
     addKeyToTable(openKeyTable, false, volumeName, bucketName, keyName,
         clientID, replicationType, replicationFactor, 0L, omMetadataManager,
-        locationList);
+        locationList, version);
   }
 
 
@@ -180,10 +180,10 @@ public final class TestOMRequestUtils {
       HddsProtos.ReplicationType replicationType,
       HddsProtos.ReplicationFactor replicationFactor, long trxnLogIndex,
       OMMetadataManager omMetadataManager,
-      List<OmKeyLocationInfo> locationList) throws Exception {
+      List<OmKeyLocationInfo> locationList, long version) throws Exception {
 
     OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName,
-        replicationType, replicationFactor, trxnLogIndex);
+        replicationType, replicationFactor, trxnLogIndex, Time.now(), version);
     omKeyInfo.appendNewBlocks(locationList, false);
 
     addKeyToTable(openKeyTable, addToCache, omKeyInfo, clientID, trxnLogIndex,
@@ -356,12 +356,24 @@ public final class TestOMRequestUtils {
       String keyName, HddsProtos.ReplicationType replicationType,
       HddsProtos.ReplicationFactor replicationFactor, long objectID,
       long creationTime) {
+    return createOmKeyInfo(volumeName, bucketName, keyName, replicationType,
+            replicationFactor, objectID, creationTime, 0L);
+  }
+
+  /**
+   * Create OmKeyInfo.
+   */
+  @SuppressWarnings("parameterNumber")
+  public static OmKeyInfo createOmKeyInfo(String volumeName, String bucketName,
+      String keyName, HddsProtos.ReplicationType replicationType,
+      HddsProtos.ReplicationFactor replicationFactor, long objectID,
+      long creationTime, long version) {
     return new OmKeyInfo.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setKeyName(keyName)
         .setOmKeyLocationInfos(Collections.singletonList(
-            new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+            new OmKeyLocationInfoGroup(version, new ArrayList<>())))
         .setCreationTime(creationTime)
         .setModificationTime(Time.now())
         .setDataSize(1000L)
@@ -931,13 +943,26 @@ public final class TestOMRequestUtils {
       String keyName, HddsProtos.ReplicationType replicationType,
       HddsProtos.ReplicationFactor replicationFactor, long objectID,
       long parentID, long trxnLogIndex, long creationTime) {
+    return createOmKeyInfo(volumeName, bucketName, keyName,
+      replicationType, replicationFactor, objectID,
+      parentID, trxnLogIndex, creationTime, 0L);
+  }
+
+  /**
+   * Create OmKeyInfo.
+   */
+  @SuppressWarnings("parameterNumber")
+  public static OmKeyInfo createOmKeyInfo(String volumeName, String bucketName,
+      String keyName, HddsProtos.ReplicationType replicationType,
+      HddsProtos.ReplicationFactor replicationFactor, long objectID,
+      long parentID, long trxnLogIndex, long creationTime, long version) {
     String fileName = OzoneFSUtils.getFileName(keyName);
     return new OmKeyInfo.Builder()
             .setVolumeName(volumeName)
             .setBucketName(bucketName)
             .setKeyName(keyName)
             .setOmKeyLocationInfos(Collections.singletonList(
-                    new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+                    new OmKeyLocationInfoGroup(version, new ArrayList<>())))
             .setCreationTime(creationTime)
             .setModificationTime(Time.now())
             .setDataSize(1000L)
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index df2a74d..a52ab33 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
 
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Test;
@@ -282,6 +283,72 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
     Assert.assertNull(omKeyInfo);
   }
 
+  @Test
+  public void testValidateAndUpdateCacheOnOverwrite() throws Exception {
+    testValidateAndUpdateCache();
+
+    // Become a new client and set next version number
+    clientID = Time.now();
+    version += 1;
+
+    OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest());
+
+    OMKeyCommitRequest omKeyCommitRequest =
+            getOmKeyCommitRequest(modifiedOmRequest);
+
+    KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs();
+
+    String ozoneKey = getOzonePathKey();
+    // Key should be there in key table, as validateAndUpdateCache is called.
+    OmKeyInfo omKeyInfo = omMetadataManager
+        .getKeyTable(getBucketLayout()).get(ozoneKey);
+
+    Assert.assertNotNull(omKeyInfo);
+    // Previously committed version
+    Assert.assertEquals(0L,
+            omKeyInfo.getLatestVersionLocations().getVersion());
+
+    // Append new blocks
+    List<OmKeyLocationInfo> allocatedLocationList =
+            keyArgs.getKeyLocationsList().stream()
+                    .map(OmKeyLocationInfo::getFromProtobuf)
+                    .collect(Collectors.toList());
+    addKeyToOpenKeyTable(allocatedLocationList);
+
+    OMClientResponse omClientResponse =
+            omKeyCommitRequest.validateAndUpdateCache(ozoneManager,
+                    102L, ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+            omClientResponse.getOMResponse().getStatus());
+
+    // New entry should be created in key Table.
+    omKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey);
+
+    Assert.assertNotNull(omKeyInfo);
+    Assert.assertEquals(version,
+            omKeyInfo.getLatestVersionLocations().getVersion());
+    // DB keyInfo format
+    verifyKeyName(omKeyInfo);
+
+    // Check modification time
+    CommitKeyRequest commitKeyRequest = modifiedOmRequest.getCommitKeyRequest();
+    Assert.assertEquals(commitKeyRequest.getKeyArgs().getModificationTime(),
+            omKeyInfo.getModificationTime());
+
+    // Check block location.
+    List<OmKeyLocationInfo> locationInfoListFromCommitKeyRequest =
+        commitKeyRequest.getKeyArgs()
+        .getKeyLocationsList().stream().map(OmKeyLocationInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(locationInfoListFromCommitKeyRequest,
+            omKeyInfo.getLatestVersionLocations().getLocationList());
+    Assert.assertEquals(allocatedLocationList,
+            omKeyInfo.getLatestVersionLocations().getLocationList());
+    Assert.assertEquals(1, omKeyInfo.getKeyLocationVersions().size());
+  }
+
   /**
    * This method calls preExecute and verify the modified request.
    * @param originalOMRequest
@@ -363,7 +430,7 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
               .setBlockID(HddsProtos.BlockID.newBuilder()
                   .setContainerBlockID(HddsProtos.ContainerBlockID.newBuilder()
                       .setContainerID(i+1000).setLocalID(i+100).build()))
-              .setOffset(0).setLength(200).build();
+              .setOffset(0).setLength(200).setCreateVersion(version).build();
       keyLocations.add(keyLocation);
     }
     return keyLocations;
@@ -384,7 +451,7 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
       throws Exception {
     TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
         clientID, replicationType, replicationFactor, omMetadataManager,
-        locationList);
+        locationList, version);
 
     return getOzonePathKey();
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
index f257cc9..4371999 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
@@ -76,7 +76,7 @@ public class TestOMKeyCommitRequestWithFSO extends TestOMKeyCommitRequest {
             TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
                     HddsProtos.ReplicationType.RATIS,
                     HddsProtos.ReplicationFactor.ONE, objectId, parentID, 100,
-                    Time.now());
+                    Time.now(), version);
     omKeyInfoFSO.appendNewBlocks(locationList, false);
 
     String fileName = OzoneFSUtils.getFileName(keyName);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index cbb5389..37a41cb 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -134,8 +134,8 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
     Assert.assertEquals(1, omKeyCreateResponse.getOMResponse()
         .getCreateKeyResponse().getKeyInfo().getKeyLocationListCount());
 
-    // Disk should have 2 versions.
-    Assert.assertEquals(2,
+    // Disk should have 1 versions when bucket versioning is off.
+    Assert.assertEquals(1,
         omMetadataManager.getOpenKeyTable(getBucketLayout()).get(openKey)
             .getKeyLocationVersions().size());
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index e2f315c..f3f1469 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -103,6 +103,7 @@ public class TestOMKeyRequest {
   protected long dataSize;
   protected Random random;
   protected long txnLogId = 100000L;
+  protected long version = 0L;
 
   // Just setting ozoneManagerDoubleBuffer which does nothing.
   protected OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper =
@@ -183,6 +184,7 @@ public class TestOMKeyRequest {
     clientID = Time.now();
     dataSize = 1000L;
     random = new Random();
+    version = 0L;
 
     Pair<String, String> volumeAndBucket = Pair.of(volumeName, bucketName);
     when(ozoneManager.resolveBucketLink(any(KeyArgs.class),
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
index 3b6a723..8709a07 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
@@ -60,7 +62,7 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
 
     String ozoneKey = getOzoneKey();
     OMKeyCommitResponse omKeyCommitResponse = getOmKeyCommitResponse(
-            omKeyInfo, omResponse, openKey, ozoneKey);
+            omKeyInfo, omResponse, openKey, ozoneKey, keysToDelete);
 
     omKeyCommitResponse.addToDBBatch(omMetadataManager, batchOperation);
 
@@ -94,7 +96,7 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
     String ozoneKey = getOzoneKey();
 
     OMKeyCommitResponse omKeyCommitResponse = getOmKeyCommitResponse(
-            omKeyInfo, omResponse, openKey, ozoneKey);
+            omKeyInfo, omResponse, openKey, ozoneKey, null);
 
     // As during commit Key, entry will be already there in openKeyTable.
     // Adding it here.
@@ -117,6 +119,24 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
         omMetadataManager.getKeyTable(getBucketLayout()).isExist(ozoneKey));
   }
 
+  @Test
+  public void testAddToDBBatchOnOverwrite() throws Exception {
+    omBucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName)
+            .setCreationTime(Time.now()).build();
+    OmKeyInfo omKeyInfo = getOmKeyInfo();
+    keysToDelete =
+            OmUtils.prepareKeyForDelete(omKeyInfo, null, 100, false);
+    Assert.assertNotNull(keysToDelete);
+    testAddToDBBatch();
+
+    RepeatedOmKeyInfo keysInDeleteTable =
+            omMetadataManager.getDeletedTable().get(getOzoneKey());
+    Assert.assertNotNull(keysInDeleteTable);
+    Assert.assertEquals(1, keysInDeleteTable.getOmKeyInfoList().size());
+
+  }
+
   @NotNull
   protected void addKeyToOpenKeyTable() throws Exception {
     TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
@@ -133,9 +153,9 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
   @NotNull
   protected OMKeyCommitResponse getOmKeyCommitResponse(OmKeyInfo omKeyInfo,
           OzoneManagerProtocolProtos.OMResponse omResponse, String openKey,
-          String ozoneKey) {
+          String ozoneKey, RepeatedOmKeyInfo deleteKeys) {
     Assert.assertNotNull(omBucketInfo);
     return new OMKeyCommitResponse(omResponse, omKeyInfo, ozoneKey, openKey,
-            omBucketInfo);
+            omBucketInfo, deleteKeys);
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
index f7eeee6..3cc493a 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -39,10 +40,10 @@ public class TestOMKeyCommitResponseWithFSO extends TestOMKeyCommitResponse {
   @Override
   protected OMKeyCommitResponse getOmKeyCommitResponse(OmKeyInfo omKeyInfo,
       OzoneManagerProtocolProtos.OMResponse omResponse, String openKey,
-      String ozoneKey) {
+      String ozoneKey, RepeatedOmKeyInfo deleteKeys) {
     Assert.assertNotNull(omBucketInfo);
     return new OMKeyCommitResponseWithFSO(omResponse, omKeyInfo, ozoneKey,
-        openKey, omBucketInfo);
+        openKey, omBucketInfo, deleteKeys);
   }
 
   @NotNull
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
index 462b407..bdafb68 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.jetbrains.annotations.NotNull;
 import org.junit.After;
@@ -59,6 +60,7 @@ public class TestOMKeyResponse {
   protected long clientID;
   protected Random random;
   protected long txnLogId = 100000L;
+  protected RepeatedOmKeyInfo keysToDelete;
 
   @Before
   public void setup() throws Exception {
@@ -75,6 +77,7 @@ public class TestOMKeyResponse {
     replicationType = HddsProtos.ReplicationType.RATIS;
     clientID = 1000L;
     random = new Random();
+    keysToDelete = null;
   }
 
   @NotNull

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org