You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/03/18 20:52:25 UTC

[hadoop] branch trunk updated: HDDS-1250. In OM HA AllocateBlock call where connecting to SCM from OM should not happen on Ratis.

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 93db5da  HDDS-1250. In OM HA AllocateBlock call where connecting to SCM from OM should not happen on Ratis.
93db5da is described below

commit 93db5da4d95b1c5397a6e703db6894791f621ff3
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Mon Mar 18 13:11:35 2019 -0700

    HDDS-1250. In OM HA AllocateBlock call where connecting to SCM from OM should not happen on Ratis.
---
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   1 +
 .../ozone/om/protocol/OzoneManagerHAProtocol.java  |  55 +++++
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   1 +
 .../om/protocol/OzoneManagerServerProtocol.java    |  28 +++
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   1 -
 .../src/main/proto/OzoneManagerProtocol.proto      |   4 +
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |   3 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  15 ++
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  54 ++++-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  10 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  34 ++-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  12 +-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  84 ++++++-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   7 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |  25 +-
 .../hadoop/ozone/protocolPB/RequestHandler.java    |  31 +++
 .../om/ratis/TestOzoneManagerStateMachine.java     | 264 +++++++++++++++++++++
 17 files changed, 600 insertions(+), 29 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 053bc0f..7e0ed88 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -24,6 +24,7 @@ public enum OMAction implements AuditAction {
 
   // WRITE Actions
   ALLOCATE_BLOCK,
+  ADD_ALLOCATE_BLOCK,
   ALLOCATE_KEY,
   COMMIT_KEY,
   CREATE_VOLUME,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
new file mode 100644
index 0000000..7390fe2
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.protocol;
+
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyLocation;
+
+import java.io.IOException;
+
+/**
+ * Protocol to talk to OM HA. These methods are needed only called from
+ * OmRequestHandler.
+ */
+public interface OzoneManagerHAProtocol {
+
+  /**
+   * Add a allocate block, it is assumed that the client is having an open
+   * key session going on. This block will be appended to this open key session.
+   * This will be called only during HA enabled OM, as during HA we get an
+   * allocated Block information, and add that information to OM DB.
+   *
+   * In HA the flow for allocateBlock is in StartTransaction allocateBlock
+   * will be called which returns block information, and in the
+   * applyTransaction addAllocateBlock will be called to add the block
+   * information to DB.
+   *
+   * @param args the key to append
+   * @param clientID the client identification
+   * @param keyLocation key location given by allocateBlock
+   * @return an allocated block
+   * @throws IOException
+   */
+  OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
+      KeyLocation keyLocation) throws IOException;
+
+
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 0fee78c..748702e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -190,6 +190,7 @@ public interface OzoneManagerProtocol
   OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
       ExcludeList excludeList) throws IOException;
 
+
   /**
    * Look up for the container of an existing key.
    *
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerServerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerServerProtocol.java
new file mode 100644
index 0000000..6f58e2d
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerServerProtocol.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.protocol;
+
+/**
+ * This will be used in the OzoneManager Server, as few of the methods in
+ * OzoneManagerHAProtocol need not be exposed to Om clients. This interface
+ * extends both OzoneManagerHAProtocol and OzoneManagerProtocol.
+ */
+public interface OzoneManagerServerProtocol extends OzoneManagerProtocol,
+    OzoneManagerHAProtocol {
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index c06efdc..49a3fe6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -719,7 +719,6 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .getAllocateBlockResponse();
     return OmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation());
   }
-
   @Override
   public void commitKey(OmKeyArgs args, long clientId)
       throws IOException {
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 2f9410b..e720c3e 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -628,6 +628,10 @@ message AllocateBlockRequest {
     required KeyArgs keyArgs = 1;
     required uint64 clientID = 2;
     optional hadoop.hdds.ExcludeListProto excludeList = 3;
+    // During HA on one of the OM nodes, we allocate block and send the
+    // AllocateBlockRequest with keyLocation set. If this is set, no need to
+    // call scm again in OM Ratis applyTransaction just append it to DB.
+    optional KeyLocation keyLocation = 4;
 }
 
 message AllocateBlockResponse {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
index 5361745..bcb3532 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
@@ -149,7 +149,8 @@ public class TestOmBlockVersioning {
 
     // this block will be appended to the latest version of version 2.
     OmKeyLocationInfo locationInfo =
-        ozoneManager.allocateBlock(keyArgs, openKey.getId(), new ExcludeList());
+        ozoneManager.allocateBlock(keyArgs, openKey.getId(),
+            new ExcludeList());
     List<OmKeyLocationInfo> locationInfoList =
         openKey.getKeyInfo().getLatestVersionLocations()
             .getBlocksLatestVersionOnly();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 122312d..1f502d6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.utils.BackgroundService;
 
 import java.io.IOException;
@@ -75,6 +76,20 @@ public interface KeyManager {
    */
   OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
       ExcludeList excludeList) throws IOException;
+
+  /**
+   * Ozone manager state machine call's this on an open key, to add allocated
+   * block to the tail of current block list of the open client.
+   *
+   * @param args the key to append
+   * @param clientID the client requesting block.
+   * @param keyLocation key location.
+   * @return the reference to the new block.
+   * @throws IOException
+   */
+  OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
+      OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException;
+
   /**
    * Given the args of a key to put, write an open key entry to meta data.
    *
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 187509f..bac8863 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -120,6 +121,8 @@ public class KeyManagerImpl implements KeyManager {
 
   private final KeyProviderCryptoExtension kmsProvider;
 
+  private final boolean isRatisEnabled;
+
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager) {
@@ -148,6 +151,9 @@ public class KeyManagerImpl implements KeyManager {
         HDDS_BLOCK_TOKEN_ENABLED,
         HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
     this.kmsProvider = kmsProvider;
+    this.isRatisEnabled = conf.getBoolean(
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
   }
 
   @Override
@@ -223,10 +229,41 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
+  public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
+      OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException {
+    Preconditions.checkNotNull(args);
+    Preconditions.checkNotNull(keyLocation);
+
+
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    validateBucket(volumeName, bucketName);
+    String openKey = metadataManager.getOpenKey(
+        volumeName, bucketName, keyName, clientID);
+
+    OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
+    if (keyInfo == null) {
+      LOG.error("Allocate block for a key not in open status in meta store" +
+          " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
+      throw new OMException("Open Key not found",
+          OMException.ResultCodes.KEY_NOT_FOUND);
+    }
+
+    OmKeyLocationInfo omKeyLocationInfo =
+        OmKeyLocationInfo.getFromProtobuf(keyLocation);
+    keyInfo.appendNewBlocks(Collections.singletonList(omKeyLocationInfo));
+    keyInfo.updateModifcationTime();
+    metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+    return omKeyLocationInfo;
+  }
+
+  @Override
   public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
-      ExcludeList excludeList)
-      throws IOException {
+      ExcludeList excludeList) throws IOException {
     Preconditions.checkNotNull(args);
+
+
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
@@ -246,11 +283,16 @@ public class KeyManagerImpl implements KeyManager {
     // the same version
     List<OmKeyLocationInfo> locationInfos =
         allocateBlock(keyInfo, excludeList, scmBlockSize);
-    keyInfo.appendNewBlocks(locationInfos);
-    keyInfo.updateModifcationTime();
-    metadataManager.getOpenKeyTable().put(openKey,
-        keyInfo);
+
+    // If om is not managing via ratis, write to db, otherwise write to DB
+    // will happen via ratis apply transaction.
+    if (!isRatisEnabled) {
+      keyInfo.appendNewBlocks(locationInfos);
+      keyInfo.updateModifcationTime();
+      metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+    }
     return locationInfos.get(0);
+
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 946b898..fda46e3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -60,6 +60,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numVolumeLists;
   private @Metric MutableCounterLong numKeyCommits;
   private @Metric MutableCounterLong numAllocateBlockCalls;
+  private @Metric MutableCounterLong numAddAllocateBlockCalls;
   private @Metric MutableCounterLong numGetServiceLists;
   private @Metric MutableCounterLong numListS3Buckets;
   private @Metric MutableCounterLong numInitiateMultipartUploads;
@@ -85,6 +86,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numVolumeListFails;
   private @Metric MutableCounterLong numKeyCommitFails;
   private @Metric MutableCounterLong numBlockAllocateCallFails;
+  private @Metric MutableCounterLong numAddAllocateBlockCallFails;
   private @Metric MutableCounterLong numGetServiceListFails;
   private @Metric MutableCounterLong numListS3BucketsFails;
   private @Metric MutableCounterLong numInitiateMultipartUploadFails;
@@ -379,6 +381,14 @@ public class OMMetrics {
     numBlockAllocateCallFails.incr();
   }
 
+  public void incNumAddAllocateBlockCalls() {
+    numAddAllocateBlockCalls.incr();
+  }
+
+  public void incNumAddAllocateBlockFails() {
+    numAddAllocateBlockCallFails.incr();
+  }
+
   public void incNumBucketListFails() {
     numBucketListFails.incr();
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 43ad16a..0559762 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -72,6 +72,8 @@ import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
 import org.apache.hadoop.ozone.security.OzoneSecurityException;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -102,7 +104,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
@@ -205,7 +206,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
  */
 @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
 public final class OzoneManager extends ServiceRuntimeInfoImpl
-    implements OzoneManagerProtocol, OMMXBean, Auditor {
+    implements OzoneManagerServerProtocol, OMMXBean, Auditor {
   public static final Logger LOG =
       LoggerFactory.getLogger(OzoneManager.class);
 
@@ -2036,6 +2037,35 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+
+  @Override
+  public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
+      KeyLocation keyLocation) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
+    boolean auditSuccess = true;
+    Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
+        args.toAuditMap();
+    auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
+    try {
+      metrics.incNumAddAllocateBlockCalls();
+      return keyManager.addAllocatedBlock(args, clientID, keyLocation);
+    } catch (Exception ex) {
+      metrics.incNumAddAllocateBlockFails();
+      auditSuccess = false;
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(
+          OMAction.ADD_ALLOCATE_BLOCK, auditMap, ex));
+      throw ex;
+    } finally {
+      if(auditSuccess){
+        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+            OMAction.ADD_ALLOCATE_BLOCK, auditMap));
+      }
+    }
+  }
+
   /**
    * Lookup a key.
    *
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index a3cde3e..01979e4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -40,7 +40,8 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMNodeDetails;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
@@ -81,7 +82,7 @@ public final class OzoneManagerRatisServer {
   private final RaftGroup raftGroup;
   private final RaftPeerId raftPeerId;
 
-  private final OzoneManagerProtocol ozoneManager;
+  private final OzoneManagerServerProtocol ozoneManager;
   private final ClientId clientId = ClientId.randomId();
 
   private final ScheduledExecutorService scheduledRoleChecker;
@@ -107,7 +108,8 @@ public final class OzoneManagerRatisServer {
    * @param raftPeers peer nodes in the raft ring
    * @throws IOException
    */
-  private OzoneManagerRatisServer(Configuration conf, OzoneManagerProtocol om,
+  private OzoneManagerRatisServer(Configuration conf,
+      OzoneManagerServerProtocol om,
       String raftGroupIdStr, RaftPeerId localRaftPeerId,
       InetSocketAddress addr, List<RaftPeer> raftPeers)
       throws IOException {
@@ -154,7 +156,7 @@ public final class OzoneManagerRatisServer {
    * Creates an instance of OzoneManagerRatisServer.
    */
   public static OzoneManagerRatisServer newOMRatisServer(
-      Configuration ozoneConf, OzoneManagerProtocol om,
+      Configuration ozoneConf, OzoneManager om,
       OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
       throws IOException {
 
@@ -199,7 +201,7 @@ public final class OzoneManagerRatisServer {
     return  new OzoneManagerStateMachine(this);
   }
 
-  public OzoneManagerProtocol getOzoneManager() {
+  public OzoneManagerServerProtocol getOzoneManager() {
     return ozoneManager;
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index acbbd34..590b359 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om.ratis;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
@@ -26,12 +27,14 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .ContainerStateMachine;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
+import org.apache.hadoop.ozone.protocolPB.RequestHandler;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -57,8 +60,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
   private final OzoneManagerRatisServer omRatisServer;
-  private final OzoneManagerProtocol ozoneManager;
-  private final OzoneManagerRequestHandler handler;
+  private final OzoneManagerServerProtocol ozoneManager;
+  private RequestHandler handler;
   private RaftGroupId raftGroupId;
 
   public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
@@ -105,6 +108,11 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
       ctxt.setException(ioe);
       return ctxt;
     }
+
+    if (omRequest.getCmdType() ==
+        OzoneManagerProtocolProtos.Type.AllocateBlock) {
+      return handleAllocateBlock(raftClientRequest, omRequest);
+    }
     return TransactionContext.newBuilder()
         .setClientRequest(raftClientRequest)
         .setStateMachine(this)
@@ -113,6 +121,66 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
         .build();
   }
 
+  /**
+   * Handle AllocateBlock Request, which needs a special handling. This
+   * request needs to be executed on the leader, where it connects to SCM and
+   * get Block information.
+   * @param raftClientRequest
+   * @param omRequest
+   * @return TransactionContext
+   */
+  private TransactionContext handleAllocateBlock(
+      RaftClientRequest raftClientRequest, OMRequest omRequest) {
+    OMResponse omResponse = handler.handle(omRequest);
+
+
+    // If request is failed, no need to proceed further.
+    // Setting the exception with omResponse message and code.
+
+    // TODO: the allocate block fails when scm is in chill mode or when scm is
+    //  down, but that error is not correctly received in OM end, once that
+    //  is fixed, we need to see how to handle this failure case or how we
+    //  need to retry or how to handle this scenario. For other errors like
+    //  KEY_NOT_FOUND, we don't need a retry/
+    if (!omResponse.getSuccess()) {
+      TransactionContext transactionContext = TransactionContext.newBuilder()
+          .setClientRequest(raftClientRequest)
+          .setStateMachine(this)
+          .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+          .build();
+      IOException ioe = new IOException(omResponse.getMessage() +
+          " Status code " + omResponse.getStatus());
+      transactionContext.setException(ioe);
+      return transactionContext;
+    }
+
+
+    // Get original request
+    OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest =
+        omRequest.getAllocateBlockRequest();
+
+    // Create new AllocateBlockRequest with keyLocation set.
+    OzoneManagerProtocolProtos.AllocateBlockRequest newAllocateBlockRequest =
+        OzoneManagerProtocolProtos.AllocateBlockRequest.newBuilder().
+            mergeFrom(allocateBlockRequest)
+            .setKeyLocation(
+                omResponse.getAllocateBlockResponse().getKeyLocation()).build();
+
+    OMRequest newOmRequest = omRequest.toBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.AllocateBlock)
+        .setAllocateBlockRequest(newAllocateBlockRequest).build();
+
+    ByteString messageContent = ByteString.copyFrom(newOmRequest.toByteArray());
+
+    return TransactionContext.newBuilder()
+        .setClientRequest(raftClientRequest)
+        .setStateMachine(this)
+        .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+        .setLogData(messageContent)
+        .build();
+
+  }
+
   /*
    * Apply a committed log entry to the state machine.
    */
@@ -169,4 +237,14 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
     return future;
   }
 
+  @VisibleForTesting
+  public void setHandler(RequestHandler handler) {
+    this.handler = handler;
+  }
+
+  @VisibleForTesting
+  public void setRaftGroupId(RaftGroupId raftGroupId) {
+    this.raftGroupId = raftGroupId;
+  }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 395cc42..72b4d12 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.protocolPB;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
@@ -46,7 +46,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
       .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
   private final OzoneManagerRatisServer omRatisServer;
   private final OzoneManagerRatisClient omRatisClient;
-  private final OzoneManagerRequestHandler handler;
+  private final RequestHandler handler;
   private final boolean isRatisEnabled;
 
   /**
@@ -55,7 +55,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
    * @param impl OzoneManagerProtocolPB
    */
   public OzoneManagerProtocolServerSideTranslatorPB(
-      OzoneManagerProtocol impl, OzoneManagerRatisServer ratisServer,
+      OzoneManagerServerProtocol impl, OzoneManagerRatisServer ratisServer,
       OzoneManagerRatisClient ratisClient, boolean enableRatis) {
     handler = new OzoneManagerRequestHandler(impl);
     this.omRatisServer = ratisServer;
@@ -89,7 +89,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
       scope.close();
     }
   }
-
   /**
    * Submits request to OM's Ratis server.
    */
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 8d76842..81bc512 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
@@ -123,17 +123,18 @@ import org.slf4j.LoggerFactory;
  * Command Handler for OM requests. OM State Machine calls this handler for
  * deserializing the client request and sending it to OM.
  */
-public class OzoneManagerRequestHandler {
+public class OzoneManagerRequestHandler implements RequestHandler {
   static final Logger LOG =
       LoggerFactory.getLogger(OzoneManagerRequestHandler.class);
-  private final OzoneManagerProtocol impl;
+  private final OzoneManagerServerProtocol impl;
 
-  public OzoneManagerRequestHandler(OzoneManagerProtocol om) {
+  public OzoneManagerRequestHandler(OzoneManagerServerProtocol om) {
     this.impl = om;
   }
 
   //TODO simplify it to make it shorter
   @SuppressWarnings("methodlength")
+  @Override
   public OMResponse handle(OMRequest request) {
     LOG.debug("Received OMRequest: {}, ", request);
     Type cmdType = request.getCmdType();
@@ -344,6 +345,7 @@ public class OzoneManagerRequestHandler {
    * @param omRequest client request to OM
    * @throws OMException thrown if required parameters are set to null.
    */
+  @Override
   public void validateRequest(OMRequest omRequest) throws OMException {
     Type cmdType = omRequest.getCmdType();
     if (cmdType == null) {
@@ -627,9 +629,18 @@ public class OzoneManagerRequestHandler {
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
         .build();
-    OmKeyLocationInfo newLocation =
-        impl.allocateBlock(omKeyArgs, request.getClientID(),
-            ExcludeList.getFromProtoBuf(request.getExcludeList()));
+
+    OmKeyLocationInfo newLocation;
+    if (request.hasKeyLocation()) {
+      newLocation =
+          impl.addAllocatedBlock(omKeyArgs, request.getClientID(),
+              request.getKeyLocation());
+    } else {
+      newLocation =
+          impl.allocateBlock(omKeyArgs, request.getClientID(),
+              ExcludeList.getFromProtoBuf(request.getExcludeList()));
+    }
+
     resp.setKeyLocation(newLocation.getProtobuf());
 
     return resp.build();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
new file mode 100644
index 0000000..2cf8dea
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
@@ -0,0 +1,31 @@
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
+    OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
+    OMResponse;
+
+/**
+ * Handler to handle the OmRequests.
+ */
+public interface RequestHandler {
+
+
+  /**
+   * Handle the OmRequest, and returns OmResponse.
+   * @param request
+   * @return OmResponse
+   */
+  OMResponse handle(OMRequest request);
+
+  /**
+   * Validates that the incoming OM request has required parameters.
+   * TODO: Add more validation checks before writing the request to Ratis log.
+   *
+   * @param omRequest client request to OM
+   * @throws OMException thrown if required parameters are set to null.
+   */
+  void validateRequest(OMRequest omRequest) throws OMException;
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
new file mode 100644
index 0000000..f98c8ca
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -0,0 +1,264 @@
+package org.apache.hadoop.ozone.om.ratis;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMNodeDetails;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .AllocateBlockRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
+import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.UUID;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * This class tests OzoneManagerStateMachine.
+ */
+public class TestOzoneManagerStateMachine {
+
+  private OzoneConfiguration conf;
+  private OzoneManagerRatisServer omRatisServer;
+  private String omID;
+  private RequestHandler requestHandler;
+  private RaftGroupId raftGroupId;
+  private OzoneManagerStateMachine ozoneManagerStateMachine;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    omID = UUID.randomUUID().toString();
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        temporaryFolder.newFolder().toString());
+    int ratisPort = conf.getInt(
+        OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
+    InetSocketAddress rpcAddress = new InetSocketAddress(
+        InetAddress.getLocalHost(), 0);
+    OMNodeDetails omNodeDetails = new OMNodeDetails.Builder()
+        .setRpcAddress(rpcAddress)
+        .setRatisPort(ratisPort)
+        .setOMNodeId(omID)
+        .setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT)
+        .build();
+    // Starts a single node Ratis server
+    omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null,
+        omNodeDetails, Collections.emptyList());
+
+
+    ozoneManagerStateMachine =
+        new OzoneManagerStateMachine(omRatisServer);
+
+    requestHandler = Mockito.mock(OzoneManagerRequestHandler.class);
+    raftGroupId = omRatisServer.getRaftGroup().getGroupId();
+
+    ozoneManagerStateMachine.setHandler(requestHandler);
+    ozoneManagerStateMachine.setRaftGroupId(raftGroupId);
+
+  }
+
+  @Test
+  public void testAllocateBlockRequestWithSuccess() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    long allocateBlockClientId = RandomUtils.nextLong();
+    String clientId = UUID.randomUUID().toString();
+
+
+    OMRequest omRequest = createOmRequestForAllocateBlock(volumeName,
+        bucketName, keyName, allocateBlockClientId, clientId);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        createOmResponseForAllocateBlock(true);
+
+    when(requestHandler.handle(omRequest)).thenReturn(omResponse);
+
+
+    RaftClientRequest raftClientRequest =
+        new RaftClientRequest(ClientId.randomId(),
+            RaftPeerId.valueOf("random"), raftGroupId, 1, 1,
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)),
+            RaftClientRequest.Type.valueOf(
+                RaftProtos.WriteRequestTypeProto.getDefaultInstance()));
+
+    TransactionContext transactionContext =
+        ozoneManagerStateMachine.startTransaction(raftClientRequest);
+
+
+    OMRequest newOmRequest = OMRatisHelper.convertByteStringToOMRequest(
+        transactionContext.getStateMachineLogEntry().getLogData());
+
+    Assert.assertTrue(newOmRequest.hasAllocateBlockRequest());
+    checkModifiedOmRequest(omRequest, newOmRequest);
+
+    // Check this keyLocation, and the keyLocation is same as from OmResponse.
+    Assert.assertTrue(newOmRequest.getAllocateBlockRequest().hasKeyLocation());
+
+    Assert.assertEquals(omResponse.getAllocateBlockResponse().getKeyLocation(),
+        newOmRequest.getAllocateBlockRequest().getKeyLocation());
+
+  }
+
+
+  private OMRequest createOmRequestForAllocateBlock(String volumeName,
+      String bucketName, String keyName, long allocateClientId,
+      String clientId) {
+    //Create AllocateBlockRequest
+    AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder();
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(100).build();
+    req.setKeyArgs(keyArgs);
+    req.setClientID(allocateClientId);
+    req.setExcludeList(HddsProtos.ExcludeListProto.getDefaultInstance());
+    return OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
+        .setAllocateBlockRequest(req)
+        .setClientId(clientId)
+        .build();
+  }
+
+  private OMResponse createOmResponseForAllocateBlock(boolean status) {
+    OmKeyLocationInfo newLocation = new OmKeyLocationInfo.Builder().setBlockID(
+        new BlockID(RandomUtils.nextLong(),
+            RandomUtils.nextLong()))
+        .setLength(RandomUtils.nextLong())
+        .setOffset(0).setPipeline(
+            Pipeline.newBuilder().setId(PipelineID.randomId())
+                .setType(HddsProtos.ReplicationType.RATIS)
+                .setFactor(HddsProtos.ReplicationFactor.THREE)
+                .setState(Pipeline.PipelineState.OPEN)
+                .setNodes(new ArrayList<>()).build()).build();
+
+    OzoneManagerProtocolProtos.AllocateBlockResponse.Builder resp =
+        OzoneManagerProtocolProtos.AllocateBlockResponse.newBuilder();
+    resp.setKeyLocation(newLocation.getProtobuf());
+
+
+    if (status) {
+      return OzoneManagerProtocolProtos.OMResponse.newBuilder().setSuccess(true)
+          .setAllocateBlockResponse(resp)
+          .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
+          .setStatus(OzoneManagerProtocolProtos.Status.OK)
+          .setSuccess(status).build();
+    } else {
+      return OzoneManagerProtocolProtos.OMResponse.newBuilder().setSuccess(true)
+          .setAllocateBlockResponse(resp)
+          .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
+          .setStatus(OzoneManagerProtocolProtos.Status.SCM_IN_CHILL_MODE)
+          .setMessage("Scm in Chill mode")
+          .setSuccess(status).build();
+    }
+
+  }
+  @Test
+  public void testAllocateBlockWithFailure() throws Exception{
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    long allocateBlockClientId = RandomUtils.nextLong();
+    String clientId = UUID.randomUUID().toString();
+
+
+    OMRequest omRequest = createOmRequestForAllocateBlock(volumeName,
+        bucketName, keyName, allocateBlockClientId, clientId);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        createOmResponseForAllocateBlock(false);
+
+    when(requestHandler.handle(omRequest)).thenReturn(omResponse);
+
+
+    RaftClientRequest raftClientRequest =
+        new RaftClientRequest(ClientId.randomId(),
+            RaftPeerId.valueOf("random"), raftGroupId, 1, 1,
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)),
+            RaftClientRequest.Type.valueOf(
+                RaftProtos.WriteRequestTypeProto.getDefaultInstance()));
+
+    TransactionContext transactionContext =
+        ozoneManagerStateMachine.startTransaction(raftClientRequest);
+
+
+    OMRequest newOmRequest = OMRatisHelper.convertByteStringToOMRequest(
+        transactionContext.getStateMachineLogEntry().getLogData());
+
+    Assert.assertTrue(newOmRequest.hasAllocateBlockRequest());
+    checkModifiedOmRequest(omRequest, newOmRequest);
+
+    // As the request failed, check for keyLocation and  the transaction
+    // context error message
+    Assert.assertFalse(newOmRequest.getAllocateBlockRequest().hasKeyLocation());
+    Assert.assertEquals("Scm in Chill mode Status code "
+            + OMException.ResultCodes.SCM_IN_CHILL_MODE,
+        transactionContext.getException().getMessage());
+    Assert.assertTrue(transactionContext.getException() instanceof IOException);
+
+  }
+
+  private void checkModifiedOmRequest(OMRequest omRequest,
+      OMRequest newOmRequest) {
+    Assert.assertTrue(newOmRequest.getAllocateBlockRequest()
+        .getKeyArgs().getBucketName().equals(
+            omRequest.getAllocateBlockRequest().getKeyArgs().getBucketName()));
+    Assert.assertTrue(omRequest.getAllocateBlockRequest()
+        .getKeyArgs().getVolumeName().equals(
+            newOmRequest.getAllocateBlockRequest().getKeyArgs()
+                .getVolumeName()));
+    Assert.assertTrue(omRequest.getAllocateBlockRequest()
+        .getKeyArgs().getKeyName().equals(
+            newOmRequest.getAllocateBlockRequest().getKeyArgs().getKeyName()));
+    Assert.assertEquals(omRequest.getAllocateBlockRequest()
+            .getKeyArgs().getDataSize(),
+        newOmRequest.getAllocateBlockRequest().getKeyArgs().getDataSize());
+    Assert.assertEquals(omRequest.getAllocateBlockRequest()
+            .getClientID(),
+        newOmRequest.getAllocateBlockRequest().getClientID());
+    Assert.assertEquals(omRequest.getClientId(), newOmRequest.getClientId());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org