You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/04/05 04:09:55 UTC

[hadoop] branch trunk updated: HDDS-1379. Convert all OM Volume related operations to HA model. (#689)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77fe51e  HDDS-1379. Convert all OM Volume related operations to HA model. (#689)
77fe51e is described below

commit 77fe51e13666f7e10ce5fa7bf53b35cdcd4602b6
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Thu Apr 4 21:09:50 2019 -0700

    HDDS-1379. Convert all OM Volume related operations to HA model. (#689)
---
 .../ozone/om/helpers/OmDeleteVolumeResponse.java   |  49 ++++
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      |  19 +-
 .../om/helpers/OmVolumeOwnerChangeResponse.java    |  56 ++++
 .../ozone/om/protocol/OzoneManagerHAProtocol.java  |  78 ++++++
 .../src/main/proto/OzoneManagerProtocol.proto      |   7 +
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |   6 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  29 +++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  77 ++++++
 .../hadoop/ozone/om/S3BucketManagerImpl.java       |  11 +-
 .../org/apache/hadoop/ozone/om/VolumeManager.java  |  52 +++-
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  | 289 ++++++++++++++-------
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  65 +++--
 ...dler.java => OzoneManagerHARequestHandler.java} |  36 +--
 .../OzoneManagerHARequestHandlerImpl.java          | 247 ++++++++++++++++++
 .../protocolPB/OzoneManagerRequestHandler.java     |   6 +-
 .../hadoop/ozone/protocolPB/RequestHandler.java    |   2 +-
 .../om/ratis/TestOzoneManagerStateMachine.java     |   8 +-
 17 files changed, 897 insertions(+), 140 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java
new file mode 100644
index 0000000..6e96674
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+
+/**
+ * OM response for delete volume request for a ozone volume.
+ */
+public  class OmDeleteVolumeResponse {
+  private String volume;
+  private String owner;
+  private VolumeList updatedVolumeList;
+
+  public OmDeleteVolumeResponse(String volume, String owner,
+      VolumeList updatedVolumeList) {
+    this.volume = volume;
+    this.owner = owner;
+    this.updatedVolumeList = updatedVolumeList;
+  }
+
+  public String getVolume() {
+    return volume;
+  }
+
+  public String getOwner() {
+    return owner;
+  }
+
+  public VolumeList getUpdatedVolumeList() {
+    return updatedVolumeList;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
index 08c17ec..7b25d78 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
@@ -36,10 +36,10 @@ import com.google.common.base.Preconditions;
  */
 public final class OmVolumeArgs extends WithMetadata implements Auditable {
   private final String adminName;
-  private final String ownerName;
+  private String ownerName;
   private final String volume;
-  private final long creationTime;
-  private final long quotaInBytes;
+  private long creationTime;
+  private long quotaInBytes;
   private final OmOzoneAclMap aclMap;
 
   /**
@@ -64,6 +64,19 @@ public final class OmVolumeArgs extends WithMetadata implements Auditable {
     this.creationTime = creationTime;
   }
 
+
+  public void setOwnerName(String newOwner) {
+    this.ownerName = newOwner;
+  }
+
+  public void setQuotaInBytes(long quotaInBytes) {
+    this.quotaInBytes = quotaInBytes;
+  }
+
+  public void setCreationTime(long time) {
+    this.creationTime = time;
+  }
+
   /**
    * Returns the Admin Name.
    * @return String.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java
new file mode 100644
index 0000000..b691c73
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+
+/**
+ * OM response for owner change request for a ozone volume.
+ */
+public class OmVolumeOwnerChangeResponse {
+  private VolumeList originalOwnerVolumeList;
+  private VolumeList newOwnerVolumeList;
+  private OmVolumeArgs newOwnerVolumeArgs;
+  private String originalOwner;
+
+  public OmVolumeOwnerChangeResponse(VolumeList originalOwnerVolumeList,
+      VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs,
+      String originalOwner) {
+    this.originalOwnerVolumeList = originalOwnerVolumeList;
+    this.newOwnerVolumeList = newOwnerVolumeList;
+    this.newOwnerVolumeArgs = newOwnerVolumeArgs;
+    this.originalOwner = originalOwner;
+  }
+
+  public String getOriginalOwner() {
+    return originalOwner;
+  }
+
+  public VolumeList getOriginalOwnerVolumeList() {
+    return originalOwnerVolumeList;
+  }
+
+  public VolumeList getNewOwnerVolumeList() {
+    return newOwnerVolumeList;
+  }
+
+  public OmVolumeArgs getNewOwnerVolumeArgs() {
+    return newOwnerVolumeArgs;
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index f598997..ad2bc31 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -18,15 +18,20 @@
 
 package org.apache.hadoop.ozone.om.protocol;
 
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyLocation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
 
 import java.io.IOException;
 
@@ -88,4 +93,77 @@ public interface OzoneManagerHAProtocol {
    */
   OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
       String multipartUploadID) throws IOException;
+
+  /**
+   * Start Create Volume Transaction.
+   * @param omVolumeArgs
+   * @return VolumeList
+   * @throws IOException
+   */
+  VolumeList startCreateVolume(OmVolumeArgs omVolumeArgs) throws IOException;
+
+  /**
+   * Apply Create Volume changes to OM DB.
+   * @param omVolumeArgs
+   * @param volumeList
+   * @throws IOException
+   */
+  void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+      VolumeList volumeList) throws IOException;
+
+  /**
+   * Start setOwner Transaction.
+   * @param volume
+   * @param owner
+   * @return OmVolumeOwnerChangeResponse
+   * @throws IOException
+   */
+  OmVolumeOwnerChangeResponse startSetOwner(String volume,
+      String owner) throws IOException;
+
+  /**
+   * Apply Set Quota changes to OM DB.
+   * @param oldOwner
+   * @param oldOwnerVolumeList
+   * @param newOwnerVolumeList
+   * @param newOwnerVolumeArgs
+   * @throws IOException
+   */
+  void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+      VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+      throws IOException;
+
+  /**
+   * Start Set Quota Transaction.
+   * @param volume
+   * @param quota
+   * @return OmVolumeArgs
+   * @throws IOException
+   */
+  OmVolumeArgs startSetQuota(String volume, long quota) throws IOException;
+
+  /**
+   * Apply Set Quota Changes to OM DB.
+   * @param omVolumeArgs
+   * @throws IOException
+   */
+  void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException;
+
+  /**
+   * Start Delete Volume Transaction.
+   * @param volume
+   * @return OmDeleteVolumeResponse
+   * @throws IOException
+   */
+  OmDeleteVolumeResponse startDeleteVolume(String volume) throws IOException;
+
+  /**
+   * Apply Delete Volume changes to OM DB.
+   * @param volume
+   * @param owner
+   * @param newVolumeList
+   * @throws IOException
+   */
+  void applyDeleteVolume(String volume, String owner,
+      VolumeList newVolumeList) throws IOException;
 }
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index ffd1eba..4536e87 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -273,6 +273,7 @@ message VolumeInfo {
 */
 message CreateVolumeRequest {
     required VolumeInfo volumeInfo = 1;
+    optional VolumeList volumeList = 2;
 }
 
 message CreateVolumeResponse {
@@ -290,6 +291,10 @@ message SetVolumePropertyRequest {
     required string volumeName = 1;
     optional string ownerName = 2;
     optional uint64 quotaInBytes = 3;
+    optional string originalOwner = 4;
+    optional VolumeList oldOwnerVolumeList = 5;
+    optional VolumeList newOwnerVolumeList = 6;
+    optional VolumeInfo volumeInfo = 7;
 }
 
 message SetVolumePropertyResponse {
@@ -326,6 +331,8 @@ message InfoVolumeResponse {
 */
 message DeleteVolumeRequest {
     required string volumeName = 1;
+    optional string owner = 2;
+    optional VolumeList volumeList = 3;
 }
 
 message DeleteVolumeResponse {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index b72117e..5d739c2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -87,11 +87,11 @@ public class TestOmMetrics {
             ozoneManager, "volumeManager");
     VolumeManager mockVm = Mockito.spy(volumeManager);
 
-    Mockito.doNothing().when(mockVm).createVolume(null);
-    Mockito.doNothing().when(mockVm).deleteVolume(null);
+    Mockito.doReturn(null).when(mockVm).createVolume(null);
+    Mockito.doReturn(null).when(mockVm).deleteVolume(null);
     Mockito.doReturn(null).when(mockVm).getVolumeInfo(null);
     Mockito.doReturn(true).when(mockVm).checkVolumeAccess(null, null);
-    Mockito.doNothing().when(mockVm).setOwner(null, null);
+    Mockito.doReturn(null).when(mockVm).setOwner(null, null);
     Mockito.doReturn(null).when(mockVm).listVolumes(null, null, null, 0);
 
     HddsWhiteboxTestUtils.setInternalState(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 06009e2..5f62af1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.OzoneTestUtils;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -123,6 +125,33 @@ public class TestOzoneManagerHA {
     }
   }
 
+  @Test
+  public void testAllVolumeOperations() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    objectStore.createVolume(volumeName, createVolumeArgs);
+    OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+    Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+    Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+    Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+
+    objectStore.deleteVolume(volumeName);
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+        () -> objectStore.getVolume(volumeName));
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+        () -> objectStore.deleteVolume(volumeName));
+  }
+
   /**
    * Test a client request when all OM nodes are running. The request should
    * succeed.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 7a87b53..e7f1e87 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -79,6 +81,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyLocation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
 import org.apache.hadoop.ozone.security.OzoneSecurityException;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -1630,6 +1634,79 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  @Override
+  public VolumeList startCreateVolume(OmVolumeArgs args) throws IOException {
+    // TODO: Need to add metrics and Audit log for HA requests
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.CREATE,
+          args.getVolume(), null, null);
+    }
+    VolumeList volumeList = volumeManager.createVolume(args);
+    return volumeList;
+  }
+
+  public void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+      VolumeList volumeList) throws IOException {
+    // TODO: Need to add metrics and Audit log for HA requests
+    volumeManager.applyCreateVolume(omVolumeArgs, volumeList);
+  }
+
+  @Override
+  public OmVolumeOwnerChangeResponse startSetOwner(String volume,
+      String owner) throws IOException {
+    // TODO: Need to add metrics and Audit log for HA requests
+    if (isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume,
+          null, null);
+    }
+    return volumeManager.setOwner(volume, owner);
+  }
+
+  @Override
+  public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+      VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+      throws IOException {
+    // TODO: Need to add metrics and Audit log for HA requests
+    volumeManager.applySetOwner(oldOwner, oldOwnerVolumeList,
+        newOwnerVolumeList, newOwnerVolumeArgs);
+  }
+
+  @Override
+  public OmVolumeArgs startSetQuota(String volume, long quota)
+      throws IOException {
+    // TODO: Need to add metrics and Audit log for HA requests
+    if (isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume,
+          null, null);
+    }
+    return volumeManager.setQuota(volume, quota);
+  }
+
+  @Override
+  public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException {
+      // TODO: Need to add metrics and Audit log for HA requests
+    volumeManager.applySetQuota(omVolumeArgs);
+  }
+
+  @Override
+  public OmDeleteVolumeResponse startDeleteVolume(String volume)
+      throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.DELETE, volume,
+          null, null);
+    }
+    // TODO: Need to add metrics and Audit log for HA requests
+    return volumeManager.deleteVolume(volume);
+  }
+
+  @Override
+  public void applyDeleteVolume(String volume, String owner,
+      VolumeList newVolumeList) throws IOException {
+    // TODO: Need to add metrics and Audit log for HA requests
+    volumeManager.applyDeleteVolume(volume, owner, newVolumeList);
+  }
+
+
   /**
    * Checks if current caller has acl permissions.
    *
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
index d0d84f8..440a45e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
@@ -50,6 +50,7 @@ public class S3BucketManagerImpl implements S3BucketManager {
   private final OMMetadataManager omMetadataManager;
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
+  private final boolean isRatisEnabled;
 
   /**
    * Construct an S3 Bucket Manager Object.
@@ -66,6 +67,9 @@ public class S3BucketManagerImpl implements S3BucketManager {
     this.omMetadataManager = omMetadataManager;
     this.volumeManager = volumeManager;
     this.bucketManager = bucketManager;
+    isRatisEnabled = configuration.getBoolean(
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
   }
 
   @Override
@@ -166,7 +170,12 @@ public class S3BucketManagerImpl implements S3BucketManager {
               .setVolume(ozoneVolumeName)
               .setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES)
               .build();
-      volumeManager.createVolume(args);
+      if (isRatisEnabled) {
+        // When ratis is enabled we need to call apply also.
+        volumeManager.applyCreateVolume(args, volumeManager.createVolume(args));
+      } else {
+        volumeManager.createVolume(args);
+      }
     } catch (OMException exp) {
       newVolumeCreate = false;
       if (exp.getResult().compareTo(VOLUME_ALREADY_EXISTS) == 0) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
index f25fce4..a4e20c7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
@@ -16,9 +16,13 @@
  */
 package org.apache.hadoop.ozone.om;
 
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
 
 import java.io.IOException;
 import java.util.List;
@@ -32,7 +36,17 @@ public interface VolumeManager {
    * Create a new volume.
    * @param args - Volume args to create a volume
    */
-  void createVolume(OmVolumeArgs args) throws IOException;
+  VolumeList createVolume(OmVolumeArgs args)
+      throws IOException;
+
+  /**
+   * Apply Create Volume changes to OM DB.
+   * @param omVolumeArgs
+   * @param volumeList
+   * @throws IOException
+   */
+  void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+      VolumeList volumeList) throws IOException;
 
   /**
    * Changes the owner of a volume.
@@ -41,7 +55,20 @@ public interface VolumeManager {
    * @param owner - Name of the owner.
    * @throws IOException
    */
-  void setOwner(String volume, String owner) throws IOException;
+  OmVolumeOwnerChangeResponse setOwner(String volume, String owner)
+      throws IOException;
+
+  /**
+   * Apply Set Owner changes to OM DB.
+   * @param oldOwner
+   * @param oldOwnerVolumeList
+   * @param newOwnerVolumeList
+   * @param newOwnerVolumeArgs
+   * @throws IOException
+   */
+  void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+      VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+      throws IOException;
 
   /**
    * Changes the Quota on a volume.
@@ -50,7 +77,14 @@ public interface VolumeManager {
    * @param quota - Quota in bytes.
    * @throws IOException
    */
-  void setQuota(String volume, long quota) throws IOException;
+  OmVolumeArgs setQuota(String volume, long quota) throws IOException;
+
+  /**
+   * Apply Set Quota changes to OM DB.
+   * @param omVolumeArgs
+   * @throws IOException
+   */
+  void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException;
 
   /**
    * Gets the volume information.
@@ -66,7 +100,17 @@ public interface VolumeManager {
    * @param volume - Name of the volume.
    * @throws IOException
    */
-  void deleteVolume(String volume) throws IOException;
+  OmDeleteVolumeResponse deleteVolume(String volume) throws IOException;
+
+  /**
+   * Apply Delete Volume changes to OM DB.
+   * @param volume
+   * @param owner
+   * @param newVolumeList
+   * @throws IOException
+   */
+  void applyDeleteVolume(String volume, String owner,
+      VolumeList newVolumeList) throws IOException;
 
   /**
    * Checks if the specified user with a role can access this volume.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index c83f733..872d7b6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -22,17 +22,18 @@ import java.util.List;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
-import org.apache.hadoop.utils.RocksDBStore;
 import org.apache.hadoop.utils.db.BatchOperation;
 
 import com.google.common.base.Preconditions;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.rocksdb.RocksDBException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@ public class VolumeManagerImpl implements VolumeManager {
 
   private final OMMetadataManager metadataManager;
   private final int maxUserVolumeCount;
+  private final boolean isRatisEnabled;
 
   /**
    * Constructor.
@@ -52,15 +54,18 @@ public class VolumeManagerImpl implements VolumeManager {
    * @throws IOException
    */
   public VolumeManagerImpl(OMMetadataManager metadataManager,
-      OzoneConfiguration conf) throws IOException {
+      OzoneConfiguration conf) {
     this.metadataManager = metadataManager;
     this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
         OZONE_OM_USER_MAX_VOLUME_DEFAULT);
+    isRatisEnabled = conf.getBoolean(
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
   }
 
   // Helpers to add and delete volume from user list
-  private void addVolumeToOwnerList(String volume, String owner,
-      BatchOperation batchOperation) throws IOException {
+  private VolumeList addVolumeToOwnerList(String volume, String owner)
+      throws IOException {
     // Get the volume list
     String dbUserKey = metadataManager.getUserKey(owner);
     VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
@@ -72,22 +77,22 @@ public class VolumeManagerImpl implements VolumeManager {
     // Check the volume count
     if (prevVolList.size() >= maxUserVolumeCount) {
       LOG.debug("Too many volumes for user:{}", owner);
-      throw new OMException(ResultCodes.USER_TOO_MANY_VOLUMES);
+      throw new OMException("Too many volumes for user:" + owner,
+          ResultCodes.USER_TOO_MANY_VOLUMES);
     }
 
     // Add the new volume to the list
     prevVolList.add(volume);
     VolumeList newVolList = VolumeList.newBuilder()
         .addAllVolumeNames(prevVolList).build();
-    metadataManager.getUserTable().putWithBatch(batchOperation,
-        dbUserKey, newVolList);
+
+    return newVolList;
   }
 
-  private void delVolumeFromOwnerList(String volume, String owner,
-      BatchOperation batch) throws RocksDBException, IOException {
+  private VolumeList delVolumeFromOwnerList(String volume, String owner)
+      throws IOException {
     // Get the volume list
-    String dbUserKey = metadataManager.getUserKey(owner);
-    VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
+    VolumeList volumeList = metadataManager.getUserTable().get(owner);
     List<String> prevVolList = new ArrayList<>();
     if (volumeList != null) {
       prevVolList.addAll(volumeList.getVolumeNamesList());
@@ -98,58 +103,90 @@ public class VolumeManagerImpl implements VolumeManager {
 
     // Remove the volume from the list
     prevVolList.remove(volume);
-    if (prevVolList.size() == 0) {
-      metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey);
-    } else {
-      VolumeList newVolList = VolumeList.newBuilder()
-          .addAllVolumeNames(prevVolList).build();
-      metadataManager.getUserTable().putWithBatch(batch,
-          dbUserKey, newVolList);
-    }
+    VolumeList newVolList = VolumeList.newBuilder()
+        .addAllVolumeNames(prevVolList).build();
+    return newVolList;
   }
 
   /**
    * Creates a volume.
-   * @param args - OmVolumeArgs.
+   * @param omVolumeArgs - OmVolumeArgs.
+   * @return VolumeList
    */
   @Override
-  public void createVolume(OmVolumeArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.getLock().acquireUserLock(args.getOwnerName());
-    metadataManager.getLock().acquireVolumeLock(args.getVolume());
+  public VolumeList createVolume(OmVolumeArgs omVolumeArgs) throws IOException {
+    Preconditions.checkNotNull(omVolumeArgs);
+    metadataManager.getLock().acquireUserLock(omVolumeArgs.getOwnerName());
+    metadataManager.getLock().acquireVolumeLock(omVolumeArgs.getVolume());
     try {
-      String dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
+      String dbVolumeKey = metadataManager.getVolumeKey(
+          omVolumeArgs.getVolume());
+      String dbUserKey = metadataManager.getUserKey(
+          omVolumeArgs.getOwnerName());
       OmVolumeArgs volumeInfo =
           metadataManager.getVolumeTable().get(dbVolumeKey);
 
       // Check of the volume already exists
       if (volumeInfo != null) {
-        LOG.debug("volume:{} already exists", args.getVolume());
+        LOG.debug("volume:{} already exists", omVolumeArgs.getVolume());
         throw new OMException(ResultCodes.VOLUME_ALREADY_EXISTS);
       }
 
-      try (BatchOperation batch = metadataManager.getStore()
-          .initBatchOperation()) {
-        // Write the vol info
-        metadataManager.getVolumeTable().putWithBatch(batch,
-            dbVolumeKey, args);
+      VolumeList volumeList = addVolumeToOwnerList(omVolumeArgs.getVolume(),
+          omVolumeArgs.getOwnerName());
 
-        // Add volume to user list
-        addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
-        metadataManager.getStore().commitBatchOperation(batch);
+      // Set creation time
+      omVolumeArgs.setCreationTime(System.currentTimeMillis());
+
+      if (!isRatisEnabled) {
+        createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey,
+            dbUserKey);
       }
-      LOG.debug("created volume:{} user:{}", args.getVolume(),
-          args.getOwnerName());
+      LOG.debug("created volume:{} user:{}", omVolumeArgs.getVolume(),
+          omVolumeArgs.getOwnerName());
+      return volumeList;
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Volume creation failed for user:{} volume:{}",
-            args.getOwnerName(), args.getVolume(), ex);
-      } else {
-        throw (IOException) ex;
+            omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex);
       }
+      throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(args.getVolume());
-      metadataManager.getLock().releaseUserLock(args.getOwnerName());
+      metadataManager.getLock().releaseVolumeLock(omVolumeArgs.getVolume());
+      metadataManager.getLock().releaseUserLock(omVolumeArgs.getOwnerName());
+    }
+  }
+
+
+  @Override
+  public void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+      VolumeList volumeList) throws IOException {
+    // Do we need to hold lock in apply Transactions requests?
+    String dbVolumeKey = metadataManager.getVolumeKey(omVolumeArgs.getVolume());
+    String dbUserKey = metadataManager.getUserKey(omVolumeArgs.getOwnerName());
+    try {
+      createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey, dbUserKey);
+    } catch (IOException ex) {
+      LOG.error("Volume creation failed for user:{} volume:{}",
+          omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex);
+      throw ex;
+    }
+  }
+
+  private void createVolumeCommitToDB(OmVolumeArgs omVolumeArgs,
+      VolumeList volumeList, String dbVolumeKey, String dbUserKey)
+      throws IOException {
+    try (BatchOperation batch = metadataManager.getStore()
+        .initBatchOperation()) {
+      // Write the vol info
+      metadataManager.getVolumeTable().putWithBatch(batch, dbVolumeKey,
+          omVolumeArgs);
+      metadataManager.getUserTable().putWithBatch(batch, dbUserKey,
+          volumeList);
+      // Add volume to user list
+      metadataManager.getStore().commitBatchOperation(batch);
+    } catch (IOException ex) {
+      throw ex;
     }
   }
 
@@ -161,7 +198,8 @@ public class VolumeManagerImpl implements VolumeManager {
    * @throws IOException
    */
   @Override
-  public void setOwner(String volume, String owner) throws IOException {
+  public OmVolumeOwnerChangeResponse setOwner(String volume, String owner)
+      throws IOException {
     Preconditions.checkNotNull(volume);
     Preconditions.checkNotNull(owner);
     metadataManager.getLock().acquireUserLock(owner);
@@ -179,49 +217,84 @@ public class VolumeManagerImpl implements VolumeManager {
 
       Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
 
-      try (BatchOperation batch = metadataManager.getStore()
-          .initBatchOperation()) {
-        delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
-        addVolumeToOwnerList(volume, owner, batch);
-
-        OmVolumeArgs newVolumeArgs =
-            OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
-                .setAdminName(volumeArgs.getAdminName())
-                .setOwnerName(owner)
-                .setQuotaInBytes(volumeArgs.getQuotaInBytes())
-                .setCreationTime(volumeArgs.getCreationTime())
-                .build();
-
-        metadataManager.getVolumeTable().putWithBatch(batch,
-            dbVolumeKey, newVolumeArgs);
-        metadataManager.getStore().commitBatchOperation(batch);
+      String originalOwner =
+          metadataManager.getUserKey(volumeArgs.getOwnerName());
+      VolumeList oldOwnerVolumeList = delVolumeFromOwnerList(volume,
+          originalOwner);
+
+      String newOwner =  metadataManager.getUserKey(owner);
+      VolumeList newOwnerVolumeList = addVolumeToOwnerList(volume, newOwner);
+
+      volumeArgs.setOwnerName(owner);
+      if (!isRatisEnabled) {
+        setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList,
+            volumeArgs, owner);
       }
-    } catch (RocksDBException | IOException ex) {
+      return new OmVolumeOwnerChangeResponse(oldOwnerVolumeList,
+          newOwnerVolumeList, volumeArgs, originalOwner);
+    } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Changing volume ownership failed for user:{} volume:{}",
             owner, volume, ex);
       }
-      if(ex instanceof RocksDBException) {
-        throw RocksDBStore.toIOException("Volume creation failed.",
-            (RocksDBException) ex);
-      } else {
-        throw (IOException) ex;
-      }
+      throw ex;
     } finally {
       metadataManager.getLock().releaseVolumeLock(volume);
       metadataManager.getLock().releaseUserLock(owner);
     }
   }
 
+  @Override
+  public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+      VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+      throws IOException {
+    try {
+      setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList,
+          newOwnerVolumeArgs, oldOwner);
+    } catch (IOException ex) {
+      LOG.error("Changing volume ownership failed for user:{} volume:{}",
+          newOwnerVolumeArgs.getOwnerName(), newOwnerVolumeArgs.getVolume(),
+          ex);
+      throw ex;
+    }
+  }
+
+
+  private void setOwnerCommitToDB(VolumeList oldOwnerVolumeList,
+      VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs,
+      String oldOwner) throws IOException {
+    try (BatchOperation batch = metadataManager.getStore()
+        .initBatchOperation()) {
+      if (oldOwnerVolumeList.getVolumeNamesList().size() == 0) {
+        metadataManager.getUserTable().deleteWithBatch(batch, oldOwner);
+      } else {
+        metadataManager.getUserTable().putWithBatch(batch, oldOwner,
+            oldOwnerVolumeList);
+      }
+      metadataManager.getUserTable().putWithBatch(batch,
+          newOwnerVolumeArgs.getOwnerName(),
+          newOwnerVolumeList);
+
+      String dbVolumeKey =
+          metadataManager.getVolumeKey(newOwnerVolumeArgs.getVolume());
+      metadataManager.getVolumeTable().putWithBatch(batch,
+          dbVolumeKey, newOwnerVolumeArgs);
+      metadataManager.getStore().commitBatchOperation(batch);
+    }
+  }
+
+
   /**
    * Changes the Quota on a volume.
    *
    * @param volume - Name of the volume.
    * @param quota - Quota in bytes.
+   *
+   * @return OmVolumeArgs
    * @throws IOException
    */
   @Override
-  public void setQuota(String volume, long quota) throws IOException {
+  public OmVolumeArgs setQuota(String volume, long quota) throws IOException {
     Preconditions.checkNotNull(volume);
     metadataManager.getLock().acquireVolumeLock(volume);
     try {
@@ -235,15 +308,13 @@ public class VolumeManagerImpl implements VolumeManager {
 
       Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
 
-      OmVolumeArgs newVolumeArgs =
-          OmVolumeArgs.newBuilder()
-              .setVolume(volumeArgs.getVolume())
-              .setAdminName(volumeArgs.getAdminName())
-              .setOwnerName(volumeArgs.getOwnerName())
-              .setQuotaInBytes(quota)
-              .setCreationTime(volumeArgs.getCreationTime()).build();
 
-      metadataManager.getVolumeTable().put(dbVolumeKey, newVolumeArgs);
+      volumeArgs.setQuotaInBytes(quota);
+
+      if (!isRatisEnabled) {
+        metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
+      }
+      return volumeArgs;
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
@@ -255,6 +326,19 @@ public class VolumeManagerImpl implements VolumeManager {
     }
   }
 
+  @Override
+  public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException {
+    try {
+      String dbVolumeKey = metadataManager.getVolumeKey(
+          omVolumeArgs.getVolume());
+      metadataManager.getVolumeTable().put(dbVolumeKey, omVolumeArgs);
+    } catch (IOException ex) {
+      LOG.error("Changing volume quota failed for volume:{} quota:{}",
+          omVolumeArgs.getVolume(), omVolumeArgs.getQuotaInBytes(), ex);
+      throw ex;
+    }
+  }
+
   /**
    * Gets the volume information.
    * @param volume - Volume name.
@@ -290,10 +374,12 @@ public class VolumeManagerImpl implements VolumeManager {
    * Deletes an existing empty volume.
    *
    * @param volume - Name of the volume.
+   *
+   * @return OmDeleteVolumeResponse
    * @throws IOException
    */
   @Override
-  public void deleteVolume(String volume) throws IOException {
+  public OmDeleteVolumeResponse deleteVolume(String volume) throws IOException {
     Preconditions.checkNotNull(volume);
     String owner;
     metadataManager.getLock().acquireVolumeLock(volume);
@@ -305,7 +391,6 @@ public class VolumeManagerImpl implements VolumeManager {
     metadataManager.getLock().acquireUserLock(owner);
     metadataManager.getLock().acquireVolumeLock(volume);
     try {
-
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
           metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -322,28 +407,54 @@ public class VolumeManagerImpl implements VolumeManager {
       Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
       // delete the volume from the owner list
       // as well as delete the volume entry
-      try (BatchOperation batch = metadataManager.getStore()
-          .initBatchOperation()) {
-        delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
-        metadataManager.getVolumeTable().deleteWithBatch(batch, dbVolumeKey);
-        metadataManager.getStore().commitBatchOperation(batch);
+      VolumeList newVolumeList = delVolumeFromOwnerList(volume,
+          volumeArgs.getOwnerName());
+
+      if (!isRatisEnabled) {
+        deleteVolumeCommitToDB(newVolumeList,
+            volume, owner);
       }
-    } catch (RocksDBException| IOException ex) {
+      return new OmDeleteVolumeResponse(volume, owner, newVolumeList);
+    } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Delete volume failed for volume:{}", volume, ex);
       }
-      if(ex instanceof RocksDBException) {
-        throw RocksDBStore.toIOException("Volume creation failed.",
-            (RocksDBException) ex);
-      } else {
-        throw (IOException) ex;
-      }
+      throw ex;
     } finally {
       metadataManager.getLock().releaseVolumeLock(volume);
       metadataManager.getLock().releaseUserLock(owner);
     }
   }
 
+  @Override
+  public void applyDeleteVolume(String volume, String owner,
+      VolumeList newVolumeList) throws IOException {
+    try {
+      deleteVolumeCommitToDB(newVolumeList, volume, owner);
+    } catch (IOException ex) {
+      LOG.error("Delete volume failed for volume:{}", volume,
+          ex);
+      throw ex;
+    }
+  }
+
+  private void deleteVolumeCommitToDB(VolumeList newVolumeList,
+      String volume, String owner) throws IOException {
+    try (BatchOperation batch = metadataManager.getStore()
+        .initBatchOperation()) {
+      String dbUserKey = metadataManager.getUserKey(owner);
+      if (newVolumeList.getVolumeNamesList().size() == 0) {
+        metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey);
+      } else {
+        metadataManager.getUserTable().putWithBatch(batch, dbUserKey,
+            newVolumeList);
+      }
+      metadataManager.getVolumeTable().deleteWithBatch(batch,
+          metadataManager.getVolumeKey(volume));
+      metadataManager.getStore().commitBatchOperation(batch);
+    }
+  }
+
   /**
    * Checks if the specified user with a role can access this volume.
    *
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 2f3445a..919709c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .ContainerStateMachine;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -36,8 +37,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
-import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
@@ -67,14 +68,14 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
       new SimpleStateMachineStorage();
   private final OzoneManagerRatisServer omRatisServer;
   private final OzoneManagerServerProtocol ozoneManager;
-  private RequestHandler handler;
+  private OzoneManagerHARequestHandler handler;
   private RaftGroupId raftGroupId;
   private long lastAppliedIndex = 0;
 
   public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
     this.omRatisServer = ratisServer;
     this.ozoneManager = omRatisServer.getOzoneManager();
-    this.handler = new OzoneManagerRequestHandler(ozoneManager);
+    this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager);
   }
 
   /**
@@ -185,21 +186,53 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   private TransactionContext handleStartTransactionRequests(
       RaftClientRequest raftClientRequest, OMRequest omRequest) {
 
-    switch (omRequest.getCmdType()) {
-    case AllocateBlock:
-      return handleAllocateBlock(raftClientRequest, omRequest);
-    case CreateKey:
-      return handleCreateKeyRequest(raftClientRequest, omRequest);
-    case InitiateMultiPartUpload:
-      return handleInitiateMultipartUpload(raftClientRequest, omRequest);
-    default:
-      return TransactionContext.newBuilder()
+    OMRequest newOmRequest = null;
+    try {
+      switch (omRequest.getCmdType()) {
+      case CreateVolume:
+      case SetVolumeProperty:
+      case DeleteVolume:
+        newOmRequest = handler.handleStartTransaction(omRequest);
+        break;
+      case AllocateBlock:
+        return handleAllocateBlock(raftClientRequest, omRequest);
+      case CreateKey:
+        return handleCreateKeyRequest(raftClientRequest, omRequest);
+      case InitiateMultiPartUpload:
+        return handleInitiateMultipartUpload(raftClientRequest, omRequest);
+      default:
+        return TransactionContext.newBuilder()
+            .setClientRequest(raftClientRequest)
+            .setStateMachine(this)
+            .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+            .setLogData(raftClientRequest.getMessage().getContent())
+            .build();
+      }
+    } catch (IOException ex) {
+      TransactionContext transactionContext = TransactionContext.newBuilder()
           .setClientRequest(raftClientRequest)
           .setStateMachine(this)
           .setServerRole(RaftProtos.RaftPeerRole.LEADER)
-          .setLogData(raftClientRequest.getMessage().getContent())
           .build();
+      if (ex instanceof OMException) {
+        IOException ioException =
+            new IOException(ex.getMessage() + STATUS_CODE +
+                ((OMException) ex).getResult());
+        transactionContext.setException(ioException);
+      } else {
+        transactionContext.setException(ex);
+      }
+      LOG.error("Exception in startTransaction for cmdType " +
+          omRequest.getCmdType(), ex);
+      return transactionContext;
     }
+    TransactionContext transactionContext = TransactionContext.newBuilder()
+        .setClientRequest(raftClientRequest)
+        .setStateMachine(this)
+        .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+        .setLogData(OMRatisHelper.convertRequestToByteString(newOmRequest))
+        .build();
+    return transactionContext;
   }
 
   private TransactionContext handleInitiateMultipartUpload(
@@ -367,7 +400,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
    * @throws ServiceException
    */
   private Message runCommand(OMRequest request, long trxLogIndex) {
-    OMResponse response = handler.handle(request);
+    OMResponse response = handler.handleApplyTransaction(request);
     lastAppliedIndex = trxLogIndex;
     return OMRatisHelper.convertResponseToMessage(response);
   }
@@ -394,7 +427,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   }
 
   @VisibleForTesting
-  public void setHandler(RequestHandler handler) {
+  public void setHandler(OzoneManagerHARequestHandler handler) {
     this.handler = handler;
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java
similarity index 60%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java
index 367efd4..1ccac3b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java
@@ -17,32 +17,32 @@
 
 package org.apache.hadoop.ozone.protocolPB;
 
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
-    OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
-    OMResponse;
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
 
 /**
- * Handler to handle the OmRequests.
+ * Handler to handle OM requests in OM HA.
  */
-public interface RequestHandler {
-
+public interface OzoneManagerHARequestHandler extends RequestHandler {
 
   /**
-   * Handle the OmRequest, and returns OmResponse.
-   * @param request
-   * @return OmResponse
+   * Handle start Transaction Requests from OzoneManager StateMachine.
+   * @param omRequest
+   * @return OMRequest - New OM Request which will be applied during apply
+   * Transaction
+   * @throws IOException
    */
-  OMResponse handle(OMRequest request);
+  OMRequest handleStartTransaction(OMRequest omRequest) throws IOException;
 
   /**
-   * Validates that the incoming OM request has required parameters.
-   * TODO: Add more validation checks before writing the request to Ratis log.
-   *
-   * @param omRequest client request to OM
-   * @throws OMException thrown if required parameters are set to null.
+   * Handle Apply Transaction Requests from OzoneManager StateMachine.
+   * @param omRequest
+   * @return OMResponse
    */
-  void validateRequest(OMRequest omRequest) throws OMException;
+  OMResponse handleApplyTransaction(OMRequest omRequest);
 
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
new file mode 100644
index 0000000..aada6e1
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .SetVolumePropertyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .SetVolumePropertyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .Status;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+
+/**
+ * Command Handler for OM requests. OM State Machine calls this handler for
+ * deserializing the client request and sending it to OM.
+ */
+public class OzoneManagerHARequestHandlerImpl
+    extends OzoneManagerRequestHandler implements OzoneManagerHARequestHandler {
+
+  public OzoneManagerHARequestHandlerImpl(OzoneManagerServerProtocol om) {
+    super(om);
+  }
+
+  @Override
+  public OMRequest handleStartTransaction(OMRequest omRequest)
+      throws IOException {
+    LOG.debug("Received OMRequest: {}, ", omRequest);
+    Type cmdType = omRequest.getCmdType();
+    OMRequest newOmRequest = null;
+    switch (cmdType) {
+    case CreateVolume:
+      newOmRequest = handleCreateVolumeStart(omRequest);
+      break;
+    case SetVolumeProperty:
+      newOmRequest = handleSetVolumePropertyStart(omRequest);
+      break;
+    case DeleteVolume:
+      newOmRequest = handleDeleteVolumeStart(omRequest);
+      break;
+    default:
+      throw new IOException("Unrecognized Command Type:" + cmdType);
+    }
+    return newOmRequest;
+  }
+
+
+  @Override
+  public OMResponse handleApplyTransaction(OMRequest omRequest) {
+    LOG.debug("Received OMRequest: {}, ", omRequest);
+    Type cmdType = omRequest.getCmdType();
+    OMResponse.Builder responseBuilder =
+        OMResponse.newBuilder().setCmdType(cmdType)
+            .setStatus(Status.OK);
+    try {
+      switch (cmdType) {
+      case CreateVolume:
+        responseBuilder.setCreateVolumeResponse(
+            handleCreateVolumeApply(omRequest));
+        break;
+      case SetVolumeProperty:
+        responseBuilder.setSetVolumePropertyResponse(
+            handleSetVolumePropertyApply(omRequest));
+        break;
+      case DeleteVolume:
+        responseBuilder.setDeleteVolumeResponse(
+            handleDeleteVolumeApply(omRequest));
+        break;
+      default:
+        // As all request types are not changed so we need to call handle
+        // here.
+        return handle(omRequest);
+      }
+      responseBuilder.setSuccess(true);
+    } catch (IOException ex) {
+      responseBuilder.setSuccess(false);
+      responseBuilder.setStatus(exceptionToResponseStatus(ex));
+      if (ex.getMessage() != null) {
+        responseBuilder.setMessage(ex.getMessage());
+      }
+    }
+    return responseBuilder.build();
+  }
+
+
+  private OMRequest handleCreateVolumeStart(OMRequest omRequest)
+      throws IOException {
+    VolumeInfo volumeInfo = omRequest.getCreateVolumeRequest().getVolumeInfo();
+    OzoneManagerProtocolProtos.VolumeList volumeList =
+        getOzoneManagerServerProtocol().startCreateVolume(
+            OmVolumeArgs.getFromProtobuf(volumeInfo));
+
+    CreateVolumeRequest createVolumeRequest =
+        CreateVolumeRequest.newBuilder().setVolumeInfo(volumeInfo)
+            .setVolumeList(volumeList).build();
+    return omRequest.toBuilder().setCreateVolumeRequest(createVolumeRequest)
+        .build();
+  }
+
+  private CreateVolumeResponse handleCreateVolumeApply(OMRequest omRequest)
+      throws IOException {
+    OzoneManagerProtocolProtos.VolumeInfo volumeInfo =
+        omRequest.getCreateVolumeRequest().getVolumeInfo();
+    VolumeList volumeList =
+        omRequest.getCreateVolumeRequest().getVolumeList();
+    getOzoneManagerServerProtocol().applyCreateVolume(
+        OmVolumeArgs.getFromProtobuf(volumeInfo),
+        volumeList);
+    return CreateVolumeResponse.newBuilder().build();
+  }
+
+  private OMRequest handleSetVolumePropertyStart(OMRequest omRequest)
+      throws IOException {
+    SetVolumePropertyRequest setVolumePropertyRequest =
+        omRequest.getSetVolumePropertyRequest();
+    String volume = setVolumePropertyRequest.getVolumeName();
+    OMRequest newOmRequest = null;
+    if (setVolumePropertyRequest.hasQuotaInBytes()) {
+      long quota = setVolumePropertyRequest.getQuotaInBytes();
+      OmVolumeArgs omVolumeArgs =
+          getOzoneManagerServerProtocol().startSetQuota(volume, quota);
+      SetVolumePropertyRequest newSetVolumePropertyRequest =
+          SetVolumePropertyRequest.newBuilder().setVolumeName(volume)
+              .setVolumeInfo(omVolumeArgs.getProtobuf()).build();
+      newOmRequest =
+          omRequest.toBuilder().setSetVolumePropertyRequest(
+              newSetVolumePropertyRequest).build();
+    } else {
+      String owner = setVolumePropertyRequest.getOwnerName();
+      OmVolumeOwnerChangeResponse omVolumeOwnerChangeResponse =
+          getOzoneManagerServerProtocol().startSetOwner(volume, owner);
+      // If volumeLists become large and as ratis writes the request to disk we
+      // might take more space if the lists become very big in size. We might
+      // need to revisit this if it becomes problem
+      SetVolumePropertyRequest newSetVolumePropertyRequest =
+          SetVolumePropertyRequest.newBuilder().setVolumeName(volume)
+              .setOwnerName(owner)
+              .setOriginalOwner(omVolumeOwnerChangeResponse.getOriginalOwner())
+              .setNewOwnerVolumeList(
+                  omVolumeOwnerChangeResponse.getNewOwnerVolumeList())
+              .setOldOwnerVolumeList(
+                  omVolumeOwnerChangeResponse.getOriginalOwnerVolumeList())
+              .setVolumeInfo(
+                  omVolumeOwnerChangeResponse.getNewOwnerVolumeArgs()
+                      .getProtobuf()).build();
+      newOmRequest =
+          omRequest.toBuilder().setSetVolumePropertyRequest(
+              newSetVolumePropertyRequest).build();
+    }
+    return newOmRequest;
+  }
+
+
+  private SetVolumePropertyResponse handleSetVolumePropertyApply(
+      OMRequest omRequest) throws IOException {
+    SetVolumePropertyRequest setVolumePropertyRequest =
+        omRequest.getSetVolumePropertyRequest();
+
+    if (setVolumePropertyRequest.hasQuotaInBytes()) {
+      getOzoneManagerServerProtocol().applySetQuota(
+          OmVolumeArgs.getFromProtobuf(
+              setVolumePropertyRequest.getVolumeInfo()));
+    } else {
+      getOzoneManagerServerProtocol().applySetOwner(
+          setVolumePropertyRequest.getOriginalOwner(),
+          setVolumePropertyRequest.getOldOwnerVolumeList(),
+          setVolumePropertyRequest.getNewOwnerVolumeList(),
+          OmVolumeArgs.getFromProtobuf(
+              setVolumePropertyRequest.getVolumeInfo()));
+    }
+    return SetVolumePropertyResponse.newBuilder().build();
+  }
+
+  private OMRequest handleDeleteVolumeStart(OMRequest omRequest)
+      throws IOException {
+    DeleteVolumeRequest deleteVolumeRequest =
+        omRequest.getDeleteVolumeRequest();
+
+    String volume = deleteVolumeRequest.getVolumeName();
+
+    OmDeleteVolumeResponse omDeleteVolumeResponse =
+        getOzoneManagerServerProtocol().startDeleteVolume(volume);
+
+    DeleteVolumeRequest newDeleteVolumeRequest =
+        DeleteVolumeRequest.newBuilder().setVolumeList(
+            omDeleteVolumeResponse.getUpdatedVolumeList())
+            .setVolumeName(omDeleteVolumeResponse.getVolume())
+            .setOwner(omDeleteVolumeResponse.getOwner()).build();
+
+    return omRequest.toBuilder().setDeleteVolumeRequest(
+        newDeleteVolumeRequest).build();
+
+  }
+
+
+  private DeleteVolumeResponse handleDeleteVolumeApply(OMRequest omRequest)
+      throws IOException {
+
+    DeleteVolumeRequest deleteVolumeRequest =
+        omRequest.getDeleteVolumeRequest();
+
+    getOzoneManagerServerProtocol().applyDeleteVolume(
+        deleteVolumeRequest.getVolumeName(), deleteVolumeRequest.getOwner(),
+        deleteVolumeRequest.getVolumeList());
+
+    return DeleteVolumeResponse.newBuilder().build();
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 04b4881..1200d17 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -364,7 +364,7 @@ public class OzoneManagerRequestHandler implements RequestHandler {
   }
 
   // Convert and exception to corresponding status code
-  private Status exceptionToResponseStatus(IOException ex) {
+  protected Status exceptionToResponseStatus(IOException ex) {
     if (ex instanceof OMException) {
       return Status.values()[((OMException) ex).getResult().ordinal()];
     } else {
@@ -1027,4 +1027,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf())
         .build();
   }
+
+  protected OzoneManagerServerProtocol getOzoneManagerServerProtocol() {
+    return impl;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
index 367efd4..f19dc48 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
  */
 public interface RequestHandler {
 
-
   /**
    * Handle the OmRequest, and returns OmResponse.
    * @param request
@@ -36,6 +35,7 @@ public interface RequestHandler {
    */
   OMResponse handle(OMRequest request);
 
+
   /**
    * Validates that the incoming OM request has required parameters.
    * TODO: Add more validation checks before writing the request to Ratis log.
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index 9613582..b14eac7 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
-import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
@@ -72,7 +72,7 @@ public class TestOzoneManagerStateMachine {
   private OzoneConfiguration conf;
   private OzoneManagerRatisServer omRatisServer;
   private String omID;
-  private RequestHandler requestHandler;
+  private OzoneManagerHARequestHandler requestHandler;
   private RaftGroupId raftGroupId;
   private OzoneManagerStateMachine ozoneManagerStateMachine;
 
@@ -105,7 +105,7 @@ public class TestOzoneManagerStateMachine {
     ozoneManagerStateMachine =
         new OzoneManagerStateMachine(omRatisServer);
 
-    requestHandler = Mockito.mock(OzoneManagerRequestHandler.class);
+    requestHandler = Mockito.mock(OzoneManagerHARequestHandlerImpl.class);
     raftGroupId = omRatisServer.getRaftGroup().getGroupId();
 
     ozoneManagerStateMachine.setHandler(requestHandler);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org