You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/07/14 16:24:05 UTC

[hadoop-ozone] branch master updated: HDDS-3930. Fix OMKeyDeletesRequest. (#1195)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 34404f3  HDDS-3930. Fix OMKeyDeletesRequest. (#1195)
34404f3 is described below

commit 34404f3a53f177863c90d5e583f51d851a2a0936
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Tue Jul 14 09:23:57 2020 -0700

    HDDS-3930. Fix OMKeyDeletesRequest. (#1195)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   2 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  14 +-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   1 +
 .../hadoop/ozone/om/exceptions/OMException.java    |   2 +
 .../hadoop/ozone/om/helpers/OmDeleteKeys.java      |  51 +++++
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   5 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  24 ++-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  16 +-
 .../ozone/om/TestOzoneManagerHAWithData.java       |   7 +-
 .../src/main/proto/OmClientProtocol.proto          |  20 +-
 .../interface-client/src/main/proto/proto.lock     |  60 ++++--
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |   4 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  18 +-
 .../hadoop/ozone/om/request/OMClientRequest.java   |  50 +----
 .../ozone/om/request/key/OMKeysDeleteRequest.java  | 206 +++++++++++++--------
 .../om/response/key/OMKeysDeleteResponse.java      | 100 ++++------
 .../om/request/key/TestOMKeysDeleteRequest.java    | 155 ++++++++++++++++
 .../om/response/key/TestOMKeysDeleteResponse.java  | 125 +++++++++++++
 18 files changed, 606 insertions(+), 254 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index ea0466f..a473948 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -296,6 +296,8 @@ public final class OzoneConsts {
   public static final String MULTIPART_UPLOAD_PART_NUMBER = "partNumber";
   public static final String MULTIPART_UPLOAD_PART_NAME = "partName";
   public static final String BUCKET_ENCRYPTION_KEY = "bucketEncryptionKey";
+  public static final String DELETED_KEYS_LIST = "deletedKeysList";
+  public static final String UNDELETED_KEYS_LIST = "unDeletedKeysList";
 
 
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 56c867d..bfc2f0d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@@ -730,16 +731,9 @@ public class RpcClient implements ClientProtocol {
           throws IOException {
     HddsClientUtils.verifyResourceName(volumeName, bucketName);
     Preconditions.checkNotNull(keyNameList);
-    List<OmKeyArgs> keyArgsList = new ArrayList<>();
-    for (String keyName: keyNameList) {
-      OmKeyArgs keyArgs = new OmKeyArgs.Builder()
-          .setVolumeName(volumeName)
-          .setBucketName(bucketName)
-          .setKeyName(keyName)
-          .build();
-      keyArgsList.add(keyArgs);
-    }
-    ozoneManagerClient.deleteKeys(keyArgsList);
+    OmDeleteKeys omDeleteKeys = new OmDeleteKeys(volumeName, bucketName,
+        keyNameList);
+    ozoneManagerClient.deleteKeys(omDeleteKeys);
   }
 
   @Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index cd8b126..31cccac 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -38,6 +38,7 @@ public enum OMAction implements AuditAction {
   UPDATE_BUCKET,
   UPDATE_KEY,
   PURGE_KEYS,
+  DELETE_KEYS,
 
   // S3 Bucket
   CREATE_S3_BUCKET,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 1eed619..e2b3418 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -222,5 +222,7 @@ public class OMException extends IOException {
     DIRECTORY_ALREADY_EXISTS,
 
     INVALID_VOLUME_NAME,
+
+    PARTIAL_DELETE
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteKeys.java
new file mode 100644
index 0000000..4274078
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteKeys.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import java.util.List;
+
+/**
+ * Represent class which has info of Keys to be deleted from Client.
+ */
+public class OmDeleteKeys {
+
+  private String volume;
+  private String bucket;
+
+  private List<String> keyNames;
+
+
+  public OmDeleteKeys(String volume, String bucket, List<String> keyNames) {
+    this.volume = volume;
+    this.bucket = bucket;
+    this.keyNames = keyNames;
+  }
+
+  public String getVolume() {
+    return volume;
+  }
+
+  public String getBucket() {
+    return bucket;
+  }
+
+  public List< String > getKeyNames() {
+    return keyNames;
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index b342ef2..9ae107b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.DBUpdates;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -229,10 +230,10 @@ public interface OzoneManagerProtocol
    * multiple keys and a single key. Used by deleting files
    * through OzoneFileSystem.
    *
-   * @param args the list args of the key.
+   * @param deleteKeys
    * @throws IOException
    */
-  void deleteKeys(List<OmKeyArgs> args) throws IOException;
+  void deleteKeys(OmDeleteKeys deleteKeys) throws IOException;
 
   /**
    * Deletes an existing empty bucket from volume.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index ae2c622..1377c53 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.DBUpdates;
 import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -71,8 +72,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateV
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclResponse;
@@ -141,6 +143,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.protobuf.ByteString;
+
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@@ -717,22 +720,17 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
    * Deletes existing key/keys. This interface supports delete
    * multiple keys and a single key.
    *
-   * @param args the list args of the key.
+   * @param deleteKeys
    * @throws IOException
    */
   @Override
-  public void deleteKeys(List<OmKeyArgs> args) throws IOException {
+  public void deleteKeys(OmDeleteKeys deleteKeys) throws IOException {
     DeleteKeysRequest.Builder req = DeleteKeysRequest.newBuilder();
-    List <KeyArgs> keyArgsList = new ArrayList<KeyArgs>();
-    for (OmKeyArgs omKeyArgs : args) {
-      KeyArgs keyArgs = KeyArgs.newBuilder()
-          .setVolumeName(omKeyArgs.getVolumeName())
-          .setBucketName(omKeyArgs.getBucketName())
-          .setKeyName(omKeyArgs.getKeyName()).build();
-      keyArgsList.add(keyArgs);
-    }
-    req.addAllKeyArgs(keyArgsList);
-
+    DeleteKeyArgs deletedKeys = DeleteKeyArgs.newBuilder()
+        .setBucketName(deleteKeys.getBucket())
+        .setVolumeName(deleteKeys.getVolume())
+        .addAllKeys(deleteKeys.getKeyNames()).build();
+    req.setDeleteKeys(deletedKeys);
     OMRequest omRequest = createOMRequest(Type.DeleteKeys)
         .setDeleteKeysRequest(req)
         .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index 75107d0..700506a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -72,8 +72,8 @@ import org.slf4j.LoggerFactory;
 public class TestOzoneFileSystem {
 
   /**
-    * Set a timeout for each test.
-    */
+   * Set a timeout for each test.
+   */
   @Rule
   public Timeout timeout = new Timeout(300000);
 
@@ -89,7 +89,7 @@ public class TestOzoneFileSystem {
 
   @Test(timeout = 300_000)
   public void testCreateFileShouldCheckExistenceOfDirWithSameName()
-          throws Exception {
+      throws Exception {
     /*
      * Op 1. create file -> /d1/d2/d3/d4/key2
      * Op 2. create dir -> /d1/d2/d3/d4/key2
@@ -195,11 +195,11 @@ public class TestOzoneFileSystem {
   }
 
   private void setupOzoneFileSystem()
-          throws IOException, TimeoutException, InterruptedException {
+      throws IOException, TimeoutException, InterruptedException {
     OzoneConfiguration conf = new OzoneConfiguration();
     cluster = MiniOzoneCluster.newBuilder(conf)
-            .setNumDatanodes(3)
-            .build();
+        .setNumDatanodes(3)
+        .build();
     cluster.waitForClusterToBeReady();
     // create a volume and a bucket to be used by OzoneFileSystem
     OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
@@ -207,8 +207,8 @@ public class TestOzoneFileSystem {
     bucketName = bucket.getName();
 
     String rootPath = String.format("%s://%s.%s/",
-            OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
-            bucket.getVolumeName());
+        OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
+        bucket.getVolumeName());
 
     // Set the fs.defaultFS and start the filesystem
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
index 646b915..aed84f5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
@@ -50,7 +50,7 @@ import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOU
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PARTIAL_DELETE;
 import static org.junit.Assert.fail;
 
 /**
@@ -187,8 +187,9 @@ public class TestOzoneManagerHAWithData extends TestOzoneManagerHA {
       ozoneBucket.deleteKeys(keyList2);
       fail("testFilesDelete");
     } catch (OMException ex) {
-      // The expected exception KEY_NOT_FOUND.
-      Assert.assertEquals(KEY_NOT_FOUND, ex.getResult());
+      // The expected exception PARTIAL_DELETE, as if not able to delete, we
+      // return error codee PARTIAL_DElETE.
+      Assert.assertEquals(PARTIAL_DELETE, ex.getResult());
     }
   }
 
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index f4cf79a..f6eaf38 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -302,6 +302,8 @@ enum Status {
     DIRECTORY_ALREADY_EXISTS = 60;
 
     INVALID_VOLUME_NAME = 61;
+
+    PARTIAL_DELETE = 62;
 }
 
 /**
@@ -845,7 +847,18 @@ message DeleteKeyRequest {
 }
 
 message DeleteKeysRequest {
-    repeated KeyArgs keyArgs = 1;
+    optional DeleteKeyArgs deleteKeys = 1;
+}
+
+message DeleteKeyArgs {
+    required string volumeName = 1;
+    required string bucketName = 2;
+    repeated string keys = 3;
+}
+
+message DeleteKeysResponse {
+    optional DeleteKeyArgs unDeletedKeys = 1;
+    optional bool status = 2;
 }
 
 message DeleteKeyResponse {
@@ -863,10 +876,7 @@ message DeletedKeys {
     repeated string keys = 3;
 }
 
-message DeleteKeysResponse {
-    repeated KeyInfo deletedKeys = 1;
-    repeated KeyInfo unDeletedKeys = 2;
-}
+
 
 message PurgeKeysRequest {
     repeated DeletedKeys deletedKeys = 1;
diff --git a/hadoop-ozone/interface-client/src/main/proto/proto.lock b/hadoop-ozone/interface-client/src/main/proto/proto.lock
index f591ad1..bef3a0d 100644
--- a/hadoop-ozone/interface-client/src/main/proto/proto.lock
+++ b/hadoop-ozone/interface-client/src/main/proto/proto.lock
@@ -415,6 +415,10 @@
               {
                 "name": "INVALID_VOLUME_NAME",
                 "integer": 61
+              },
+              {
+                "name": "PARTIAL_DELETE",
+                "integer": 62
               }
             ]
           },
@@ -2430,13 +2434,48 @@
             "fields": [
               {
                 "id": 1,
-                "name": "keyArgs",
-                "type": "KeyArgs",
+                "name": "deleteKeys",
+                "type": "DeleteKeyArgs"
+              }
+            ]
+          },
+          {
+            "name": "DeleteKeyArgs",
+            "fields": [
+              {
+                "id": 1,
+                "name": "volumeName",
+                "type": "string"
+              },
+              {
+                "id": 2,
+                "name": "bucketName",
+                "type": "string"
+              },
+              {
+                "id": 3,
+                "name": "keys",
+                "type": "string",
                 "is_repeated": true
               }
             ]
           },
           {
+            "name": "DeleteKeysResponse",
+            "fields": [
+              {
+                "id": 1,
+                "name": "unDeletedKeys",
+                "type": "DeleteKeyArgs"
+              },
+              {
+                "id": 2,
+                "name": "status",
+                "type": "bool"
+              }
+            ]
+          },
+          {
             "name": "DeleteKeyResponse",
             "fields": [
               {
@@ -2478,23 +2517,6 @@
             ]
           },
           {
-            "name": "DeleteKeysResponse",
-            "fields": [
-              {
-                "id": 1,
-                "name": "deletedKeys",
-                "type": "KeyInfo",
-                "is_repeated": true
-              },
-              {
-                "id": 2,
-                "name": "unDeletedKeys",
-                "type": "KeyInfo",
-                "is_repeated": true
-              }
-            ]
-          },
-          {
             "name": "PurgeKeysRequest",
             "fields": [
               {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index f229277..cd65662 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -214,6 +214,10 @@ public class OMMetrics {
     this.numKeys.incr(val- oldVal);
   }
 
+  public void decNumKeys(long val) {
+    this.numKeys.incr(-val);
+  }
+
   public long getNumVolumes() {
     return numVolumes.value();
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 1b75b95..8c0fcbd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -42,8 +42,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -107,6 +107,7 @@ import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
 import org.apache.hadoop.ozone.om.helpers.DBUpdates;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -161,7 +162,6 @@ import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.KMSUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
@@ -170,6 +170,8 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.ProtocolMessageEnum;
 import org.apache.commons.lang3.StringUtils;
+
+
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
@@ -207,6 +209,7 @@ import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVA
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
+
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.FileUtils;
@@ -2220,16 +2223,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   /**
    * Deletes an existing key.
    *
-   * @param args - List attributes of the key.
+   * @param deleteKeys - List of keys to be deleted from volume and a bucket.
    * @throws IOException
    */
   @Override
-  public void deleteKeys(List<OmKeyArgs> args) throws IOException {
-    if (args != null) {
-      for (OmKeyArgs keyArgs : args) {
-        deleteKey(keyArgs);
-      }
-    }
+  public void deleteKeys(OmDeleteKeys deleteKeys) throws IOException {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 3ce059f..4ced9fd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -18,15 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -36,22 +29,21 @@ import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .DeleteKeysResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 
 /**
@@ -221,36 +213,6 @@ public abstract class OMClientRequest implements RequestAuditor {
   }
 
   /**
-   * Set parameters needed for return error response to client.
-   *
-   * @param omResponse
-   * @param ex         - IOException
-   * @param unDeletedKeys    - Set<OmKeyInfo>
-   * @return error response need to be returned to client - OMResponse.
-   */
-  protected OMResponse createOperationKeysErrorOMResponse(
-      @Nonnull OMResponse.Builder omResponse,
-      @Nonnull IOException ex, @Nonnull Set<OmKeyInfo> unDeletedKeys) {
-    omResponse.setSuccess(false);
-    StringBuffer errorMsg = new StringBuffer();
-    DeleteKeysResponse.Builder resp = DeleteKeysResponse.newBuilder();
-    for (OmKeyInfo key : unDeletedKeys) {
-      if(key != null) {
-        resp.addUnDeletedKeys(key.getProtobuf());
-      }
-    }
-    if (errorMsg != null) {
-      omResponse.setMessage(errorMsg.toString());
-    }
-    // TODO: Currently all delete operations in OzoneBucket.java are void. Here
-    //  we put the List of unDeletedKeys into Response. These KeyInfo can be
-    //  used to continue deletion if client support delete retry.
-    omResponse.setDeleteKeysResponse(resp.build());
-    omResponse.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(ex));
-    return omResponse.build();
-  }
-
-  /**
    * Add the client response to double buffer and set the flush future.
    * @param trxIndex
    * @param omClientResponse
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index 9a7d993..adc42d8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -18,44 +18,42 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
-import com.google.common.base.Preconditions;
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.audit.AuditLogger;
-import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.om.response.key.OMKeyDeleteResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeysDeleteResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .DeleteKeysRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .DeleteKeysResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
-import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.OzoneConsts.BUCKET;
+import static org.apache.hadoop.ozone.OzoneConsts.DELETED_KEYS_LIST;
+import static org.apache.hadoop.ozone.OzoneConsts.UNDELETED_KEYS_LIST;
+import static org.apache.hadoop.ozone.OzoneConsts.VOLUME;
+import static org.apache.hadoop.ozone.audit.OMAction.DELETE_KEYS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.PARTIAL_DELETE;
 
 /**
  * Handles DeleteKey request.
@@ -70,32 +68,17 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
   }
 
   @Override
-  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    DeleteKeysRequest deleteKeyRequest =
-        getOmRequest().getDeleteKeysRequest();
-    Preconditions.checkNotNull(deleteKeyRequest);
-    List<KeyArgs> newKeyArgsList = new ArrayList<>();
-    for (KeyArgs keyArgs : deleteKeyRequest.getKeyArgsList()) {
-      newKeyArgsList.add(
-          keyArgs.toBuilder().setModificationTime(Time.now()).build());
-    }
-    DeleteKeysRequest newDeleteKeyRequest = DeleteKeysRequest
-        .newBuilder().addAllKeyArgs(newKeyArgsList).build();
-
-    return getOmRequest().toBuilder()
-        .setDeleteKeysRequest(newDeleteKeyRequest)
-        .setUserInfo(getUserInfo()).build();
-  }
-
-  @Override
   @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
     DeleteKeysRequest deleteKeyRequest =
         getOmRequest().getDeleteKeysRequest();
 
-    List<KeyArgs> deleteKeyArgsList = deleteKeyRequest.getKeyArgsList();
-    Set<OmKeyInfo> unDeletedKeys = new HashSet<>();
+    OzoneManagerProtocolProtos.DeleteKeyArgs deleteKeyArgs =
+        deleteKeyRequest.getDeleteKeys();
+
+    List<String> deleteKeys = new ArrayList<>(deleteKeyArgs.getKeysList());
+
     IOException exception = null;
     OMClientResponse omClientResponse = null;
     Result result = null;
@@ -103,8 +86,8 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
     OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumKeyDeletes();
     Map<String, String> auditMap = null;
-    String volumeName = "";
-    String bucketName = "";
+    String volumeName = deleteKeyArgs.getVolumeName();
+    String bucketName = deleteKeyArgs.getBucketName();
     String keyName = "";
     List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
 
@@ -115,79 +98,144 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    try {
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        String objectKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-            keyName);
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
-        omKeyInfoList.add(omKeyInfo);
-        unDeletedKeys.add(omKeyInfo);
-      }
 
-      // Check if any of the key in the batch cannot be deleted. If exists the
-      // batch will delete failed.
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
-        // check Acl
-        checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-            IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
 
-        String objectKey = omMetadataManager.getOzoneKey(
-            volumeName, bucketName, keyName);
+    boolean acquiredLock =
+        omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+
+    int indexFailed = 0;
+    int length = deleteKeys.size();
+    OzoneManagerProtocolProtos.DeleteKeyArgs.Builder unDeletedKeys =
+        OzoneManagerProtocolProtos.DeleteKeyArgs.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName);
+
+    boolean deleteStatus = true;
+    try {
 
-        // Validate bucket and volume exists or not.
-        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
+      for (indexFailed = 0; indexFailed < length; indexFailed++) {
+        keyName = deleteKeyArgs.getKeys(indexFailed);
+        String objectKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
+            keyName);
         OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
 
         if (omKeyInfo == null) {
-          throw new OMException("Key not found: " + keyName, KEY_NOT_FOUND);
+          deleteStatus = false;
+          LOG.error("Received a request to delete a Key does not exist {}",
+              objectKey);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
+          continue;
+        }
+
+        try {
+          // check Acl
+          checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+          omKeyInfoList.add(omKeyInfo);
+        } catch (Exception ex) {
+          deleteStatus = false;
+          LOG.error("Acl check failed for Key: {}", objectKey, ex);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
         }
+      }
 
+      // Mark all keys which can be deleted, in cache as deleted.
+      for (OmKeyInfo omKeyInfo : omKeyInfoList) {
+        omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getOzoneKey(volumeName, bucketName,
+                omKeyInfo.getKeyName())),
+            new CacheValue<>(Optional.absent(), trxnLogIndex));
       }
 
       omClientResponse = new OMKeysDeleteResponse(omResponse
-          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()).build(),
-          omKeyInfoList, trxnLogIndex, ozoneManager.isRatisEnabled());
+          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+              .setStatus(deleteStatus).setUnDeletedKeys(unDeletedKeys))
+          .setStatus(deleteStatus ? OK : PARTIAL_DELETE)
+          .setSuccess(deleteStatus).build(),
+          omKeyInfoList, trxnLogIndex,
+          ozoneManager.isRatisEnabled());
+
       result = Result.SUCCESS;
+
     } catch (IOException ex) {
       result = Result.FAILURE;
       exception = ex;
+      createErrorOMResponse(omResponse, ex);
+
+      // reset deleteKeys as request failed.
+      deleteKeys = new ArrayList<>();
+      // Add all keys which are failed due to any other exception .
+      for (int i = indexFailed; i < length; i++) {
+        unDeletedKeys.addKeys(deleteKeyArgs.getKeys(i));
+      }
 
-      omClientResponse = new OMKeyDeleteResponse(
-          createOperationKeysErrorOMResponse(omResponse, exception,
-              unDeletedKeys));
+      omResponse.setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+          .setStatus(false).setUnDeletedKeys(unDeletedKeys).build()).build();
+      omClientResponse = new OMKeysDeleteResponse(omResponse.build());
 
     } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
     }
 
-    auditLog(auditLogger, buildAuditMessage(
-        OMAction.DELETE_KEY, auditMap, exception, userInfo));
+    auditMap = buildDeleteKeysAuditMap(volumeName, bucketName, deleteKeys,
+        unDeletedKeys.getKeysList());
+
+    auditLog(auditLogger, buildAuditMessage(DELETE_KEYS, auditMap, exception,
+        userInfo));
+
 
     switch (result) {
     case SUCCESS:
-      omMetrics.decNumKeys();
-      LOG.debug("Key deleted. Volume:{}, Bucket:{}, Key:{}", volumeName,
-          bucketName, keyName);
+      omMetrics.decNumKeys(deleteKeys.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Keys delete success. Volume:{}, Bucket:{}, Keys:{}",
+            volumeName, bucketName, auditMap.get(DELETED_KEYS_LIST));
+      }
       break;
     case FAILURE:
+      omMetrics.decNumKeys(deleteKeys.size());
       omMetrics.incNumKeyDeleteFails();
-      LOG.error("Key delete failed. Volume:{}, Bucket:{}, Key{}." +
-          " Exception:{}", volumeName, bucketName, keyName, exception);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Keys delete failed. Volume:{}, Bucket:{}, DeletedKeys:{}, " +
+                "UnDeletedKeys:{}", volumeName, bucketName,
+            auditMap.get(DELETED_KEYS_LIST), auditMap.get(UNDELETED_KEYS_LIST),
+            exception);
+      }
       break;
     default:
-      LOG.error("Unrecognized Result for OMKeyDeleteRequest: {}",
+      LOG.error("Unrecognized Result for OMKeysDeleteRequest: {}",
           deleteKeyRequest);
     }
 
     return omClientResponse;
   }
+
+  /**
+   * Build audit map for DeleteKeys request.
+   * @param volumeName
+   * @param bucketName
+   * @param deletedKeys
+   * @param unDeletedKeys
+   * @return
+   */
+  private Map<String, String> buildDeleteKeysAuditMap(String volumeName,
+      String bucketName, List<String> deletedKeys, List<String> unDeletedKeys) {
+    Map< String, String > auditMap = new HashMap<>();
+    auditMap.put(VOLUME, volumeName);
+    auditMap.put(BUCKET, bucketName);
+    auditMap.put(DELETED_KEYS_LIST, String.join(",", deletedKeys));
+    auditMap.put(UNDELETED_KEYS_LIST, String.join(",",
+        unDeletedKeys));
+    return auditMap;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
index 597841c..9d2cd53 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
@@ -18,17 +18,13 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
-import com.google.common.base.Optional;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
 import javax.annotation.Nonnull;
@@ -36,7 +32,8 @@ import java.io.IOException;
 import java.util.List;
 
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.PARTIAL_DELETE;
 
 /**
  * Response for DeleteKey request.
@@ -48,10 +45,10 @@ public class OMKeysDeleteResponse extends OMClientResponse {
   private long trxnLogIndex;
 
   public OMKeysDeleteResponse(@Nonnull OMResponse omResponse,
-                              @Nonnull List<OmKeyInfo> omKeyInfoList,
+                              @Nonnull List<OmKeyInfo> keyDeleteList,
                               long trxnLogIndex, boolean isRatisEnabled) {
     super(omResponse);
-    this.omKeyInfoList = omKeyInfoList;
+    this.omKeyInfoList = keyDeleteList;
     this.isRatisEnabled = isRatisEnabled;
     this.trxnLogIndex = trxnLogIndex;
   }
@@ -65,69 +62,48 @@ public class OMKeysDeleteResponse extends OMClientResponse {
     checkStatusNotOK();
   }
 
+  public void checkAndUpdateDB(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    if (getOMResponse().getStatus() == OK ||
+        getOMResponse().getStatus() == PARTIAL_DELETE) {
+      addToDBBatch(omMetadataManager, batchOperation);
+    }
+  }
+
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
                            BatchOperation batchOperation) throws IOException {
 
+    String volumeName = "";
+    String bucketName = "";
+    String keyName = "";
     for (OmKeyInfo omKeyInfo : omKeyInfoList) {
-      // Set the UpdateID to current transactionLogIndex
-      omKeyInfo.setUpdateID(trxnLogIndex, isRatisEnabled);
+      volumeName = omKeyInfo.getVolumeName();
+      bucketName = omKeyInfo.getBucketName();
+      keyName = omKeyInfo.getKeyName();
 
-      // For OmResponse with failure, this should do nothing. This method is
-      // not called in failure scenario in OM code.
-      if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
-        boolean acquiredLock = false;
-        String volumeName = "";
-        String bucketName = "";
+      String deleteKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
+          keyName);
 
-        try {
-          volumeName = omKeyInfo.getVolumeName();
-          bucketName = omKeyInfo.getBucketName();
-          String keyName = omKeyInfo.getKeyName();
-          acquiredLock =
-              omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
-                  volumeName, bucketName);
-          // Update table cache.
-          omMetadataManager.getKeyTable().addCacheEntry(
-              new CacheKey<>(omMetadataManager.getOzoneKey(
-                  volumeName, bucketName, keyName)),
-              new CacheValue<>(Optional.absent(), trxnLogIndex));
+      omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
+          deleteKey);
 
-          String ozoneKey = omMetadataManager.getOzoneKey(
-              omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(),
-              omKeyInfo.getKeyName());
-          omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
-              ozoneKey);
-          // If a deleted key is put in the table where a key with the same
-          // name already exists, then the old deleted key information would
-          // be lost. To avoid this, first check if a key with same name
-          // exists. deletedTable in OM Metadata stores <KeyName,
-          // RepeatedOMKeyInfo>. The RepeatedOmKeyInfo is the structure that
-          // allows us to store a list of OmKeyInfo that can be tied to same
-          // key name. For a keyName if RepeatedOMKeyInfo structure is null,
-          // we create a new instance, if it is not null, then we simply add
-          // to the list and store this instance in deletedTable.
-          RepeatedOmKeyInfo repeatedOmKeyInfo =
-              omMetadataManager.getDeletedTable().get(ozoneKey);
-          repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-              omKeyInfo, repeatedOmKeyInfo, omKeyInfo.getUpdateID(),
-              isRatisEnabled);
-          omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
-              ozoneKey, repeatedOmKeyInfo);
-          if (acquiredLock) {
-            omMetadataManager.getLock().releaseWriteLock(
-                BUCKET_LOCK, volumeName, bucketName);
-            acquiredLock = false;
-          }
-        } finally {
-          if (acquiredLock) {
-            omMetadataManager.getLock()
-                .releaseWriteLock(BUCKET_LOCK, volumeName,
-                    bucketName);
-          }
-        }
-      }
+      // If a deleted key is put in the table where a key with the same
+      // name already exists, then the old deleted key information would
+      // be lost. To avoid this, first check if a key with same name
+      // exists. deletedTable in OM Metadata stores <KeyName,
+      // RepeatedOMKeyInfo>. The RepeatedOmKeyInfo is the structure that
+      // allows us to store a list of OmKeyInfo that can be tied to same
+      // key name. For a keyName if RepeatedOMKeyInfo structure is null,
+      // we create a new instance, if it is not null, then we simply add
+      // to the list and store this instance in deletedTable.
+      RepeatedOmKeyInfo repeatedOmKeyInfo =
+          omMetadataManager.getDeletedTable().get(deleteKey);
+      repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+          omKeyInfo, repeatedOmKeyInfo, trxnLogIndex,
+          isRatisEnabled);
+      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+          deleteKey, repeatedOmKeyInfo);
     }
   }
-
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeysDeleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeysDeleteRequest.java
new file mode 100644
index 0000000..ac50af8
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeysDeleteRequest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.PARTIAL_DELETE;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.DeleteKeys;
+
+/**
+ * Class tests OMKeysDeleteRequest.
+ */
+public class TestOMKeysDeleteRequest extends TestOMKeyRequest {
+
+
+  private List<String> deleteKeyList;
+  private OMRequest omRequest;
+
+  @Test
+  public void testKeysDeleteRequest() throws Exception {
+
+    createPreRequisites();
+
+    OMKeysDeleteRequest omKeysDeleteRequest =
+        new OMKeysDeleteRequest(omRequest);
+
+    OMClientResponse omClientResponse =
+        omKeysDeleteRequest.validateAndUpdateCache(ozoneManager, 0L,
+            ozoneManagerDoubleBufferHelper);
+
+    Assert.assertTrue(omClientResponse.getOMResponse().getSuccess());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omClientResponse.getOMResponse().getStatus());
+
+    Assert.assertTrue(omClientResponse.getOMResponse().getDeleteKeysResponse()
+        .getStatus());
+    DeleteKeyArgs unDeletedKeys =
+        omClientResponse.getOMResponse().getDeleteKeysResponse()
+            .getUnDeletedKeys();
+    Assert.assertEquals(0,
+        unDeletedKeys.getKeysCount());
+
+    // Check all keys are deleted.
+    for (String deleteKey : deleteKeyList) {
+      Assert.assertNull(omMetadataManager.getKeyTable()
+          .get(omMetadataManager.getOzoneKey(volumeName, bucketName,
+              deleteKey)));
+    }
+
+  }
+
+  @Test
+  public void testKeysDeleteRequestFail() throws Exception {
+
+    createPreRequisites();
+
+    // Add a key which not exist, which causes batch delete to fail.
+
+    omRequest = omRequest.toBuilder()
+            .setDeleteKeysRequest(DeleteKeysRequest.newBuilder()
+                .setDeleteKeys(DeleteKeyArgs.newBuilder()
+                .setBucketName(bucketName).setVolumeName(volumeName)
+                    .addAllKeys(deleteKeyList).addKeys("dummy"))).build();
+
+    OMKeysDeleteRequest omKeysDeleteRequest =
+        new OMKeysDeleteRequest(omRequest);
+
+    OMClientResponse omClientResponse =
+        omKeysDeleteRequest.validateAndUpdateCache(ozoneManager, 0L,
+        ozoneManagerDoubleBufferHelper);
+
+    Assert.assertFalse(omClientResponse.getOMResponse().getSuccess());
+    Assert.assertEquals(PARTIAL_DELETE,
+        omClientResponse.getOMResponse().getStatus());
+
+    Assert.assertFalse(omClientResponse.getOMResponse().getDeleteKeysResponse()
+        .getStatus());
+
+    // Check keys are deleted and in response check unDeletedKey.
+    for (String deleteKey : deleteKeyList) {
+      Assert.assertNull(omMetadataManager.getKeyTable()
+          .get(omMetadataManager.getOzoneKey(volumeName, bucketName,
+              deleteKey)));
+    }
+
+    DeleteKeyArgs unDeletedKeys = omClientResponse.getOMResponse()
+        .getDeleteKeysResponse().getUnDeletedKeys();
+    Assert.assertEquals(1,
+        unDeletedKeys.getKeysCount());
+    Assert.assertEquals("dummy", unDeletedKeys.getKeys(0));
+
+  }
+
+  private void createPreRequisites() throws Exception {
+
+    deleteKeyList = new ArrayList<>();
+    // Add volume, bucket and key entries to OM DB.
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    int count = 10;
+
+    DeleteKeyArgs.Builder deleteKeyArgs = DeleteKeyArgs.newBuilder()
+        .setBucketName(bucketName).setVolumeName(volumeName);
+
+    // Create 10 keys
+    String parentDir = "/user";
+    String key = "";
+
+
+    for (int i = 0; i < count; i++) {
+      key = parentDir.concat("/key" + i);
+      TestOMRequestUtils.addKeyToTableCache(volumeName, bucketName,
+          parentDir.concat("/key" + i), HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.THREE, omMetadataManager);
+      deleteKeyArgs.addKeys(key);
+      deleteKeyList.add(key);
+    }
+
+    omRequest =
+        OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+            .setCmdType(DeleteKeys)
+            .setDeleteKeysRequest(DeleteKeysRequest.newBuilder()
+                .setDeleteKeys(deleteKeyArgs).build()).build();
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeysDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeysDeleteResponse.java
new file mode 100644
index 0000000..c5dd96b
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeysDeleteResponse.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key;
+
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.DeleteKeys;
+
+/**
+ * Class to test OMKeysDeleteResponse.
+ */
+public class TestOMKeysDeleteResponse extends TestOMKeyResponse {
+
+
+  private List<OmKeyInfo> omKeyInfoList;
+  private List<String> ozoneKeys;
+
+
+  private void createPreRequisities() throws Exception {
+    String parent = "/user";
+    String key = "key";
+
+    omKeyInfoList = new ArrayList<>();
+    ozoneKeys = new ArrayList<>();
+    String ozoneKey = "";
+    for (int i = 0; i < 10; i++) {
+      keyName = parent.concat(key + i);
+      TestOMRequestUtils.addKeyToTable(false, volumeName,
+          bucketName, keyName, 0L, RATIS, THREE, omMetadataManager);
+      ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
+      omKeyInfoList.add(omMetadataManager.getKeyTable().get(ozoneKey));
+      ozoneKeys.add(ozoneKey);
+    }
+  }
+
+  @Test
+  public void testKeysDeleteResponse() throws Exception {
+
+    createPreRequisities();
+
+    OMResponse omResponse =
+        OMResponse.newBuilder().setCmdType(DeleteKeys).setStatus(OK)
+            .setSuccess(true)
+            .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+                .setStatus(true)).build();
+    OMClientResponse omKeysDeleteResponse =
+        new OMKeysDeleteResponse(omResponse, omKeyInfoList, 10L, true);
+
+    omKeysDeleteResponse.checkAndUpdateDB(omMetadataManager, batchOperation);
+
+
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+    for (String ozKey : ozoneKeys) {
+      Assert.assertNull(omMetadataManager.getKeyTable().get(ozKey));
+
+      RepeatedOmKeyInfo repeatedOmKeyInfo =
+          omMetadataManager.getDeletedTable().get(ozKey);
+      Assert.assertNotNull(repeatedOmKeyInfo);
+
+      Assert.assertEquals(1, repeatedOmKeyInfo.getOmKeyInfoList().size());
+      Assert.assertEquals(10L,
+          repeatedOmKeyInfo.getOmKeyInfoList().get(0).getUpdateID());
+
+    }
+
+  }
+
+  @Test
+  public void testKeysDeleteResponseFail() throws Exception {
+    createPreRequisities();
+
+    OMResponse omResponse =
+        OMResponse.newBuilder().setCmdType(DeleteKeys).setStatus(KEY_NOT_FOUND)
+            .setSuccess(false)
+            .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+                .setStatus(false)).build();
+
+
+    OMClientResponse omKeysDeleteResponse =
+        new OMKeysDeleteResponse(omResponse, omKeyInfoList, 10L, true);
+
+    omKeysDeleteResponse.checkAndUpdateDB(omMetadataManager, batchOperation);
+
+
+    for (String ozKey : ozoneKeys) {
+      Assert.assertNotNull(omMetadataManager.getKeyTable().get(ozKey));
+
+      RepeatedOmKeyInfo repeatedOmKeyInfo =
+          omMetadataManager.getDeletedTable().get(ozKey);
+      Assert.assertNull(repeatedOmKeyInfo);
+
+    }
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org