You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/03/27 04:48:08 UTC

[hadoop] branch trunk updated: HDDS-1262. In OM HA OpenKey call Should happen only leader OM. (#626)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eef8cae  HDDS-1262. In OM HA OpenKey call Should happen only leader OM. (#626)
eef8cae is described below

commit eef8cae7cf42c2d1622970e177d699546351587f
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Tue Mar 26 21:48:01 2019 -0700

    HDDS-1262. In OM HA OpenKey call Should happen only leader OM. (#626)
---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   2 +
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   1 +
 .../hadoop/ozone/om/exceptions/OMException.java    |   2 +
 .../ozone/om/protocol/OzoneManagerHAProtocol.java  |  30 ++++
 .../src/main/proto/OzoneManagerProtocol.proto      |  16 ++
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 187 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  33 +++-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  90 +++++++---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  74 +++++++-
 .../ozone/om/ratis/OzoneManagerRatisClient.java    |  25 +++
 .../ozone/om/ratis/OzoneManagerStateMachine.java   | 137 ++++++++++++++-
 .../protocolPB/OzoneManagerRequestHandler.java     |  54 ++++++
 .../om/ratis/TestOzoneManagerStateMachine.java     |   2 +-
 13 files changed, 624 insertions(+), 29 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 280461a..be879d8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -210,6 +210,8 @@ public final class OmUtils {
     case GetDelegationToken:
     case RenewDelegationToken:
     case CancelDelegationToken:
+    case ApplyCreateKey:
+    case ApplyInitiateMultiPartUpload:
       return false;
     default:
       LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 3863a52..0cbab08 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -26,6 +26,7 @@ public enum OMAction implements AuditAction {
   ALLOCATE_BLOCK,
   ADD_ALLOCATE_BLOCK,
   ALLOCATE_KEY,
+  APPLY_ALLOCATE_KEY,
   COMMIT_KEY,
   CREATE_VOLUME,
   CREATE_BUCKET,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 34980f6..b2f805a 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -23,6 +23,8 @@ import java.io.IOException;
  * Exception thrown by Ozone Manager.
  */
 public class OMException extends IOException {
+
+  public static final String STATUS_CODE = "STATUS_CODE=";
   private final OMException.ResultCodes result;
 
   /**
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index 7390fe2..8357df2 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.ozone.om.protocol;
 
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyLocation;
 
@@ -52,4 +57,29 @@ public interface OzoneManagerHAProtocol {
       KeyLocation keyLocation) throws IOException;
 
 
+  /**
+   * Add the openKey entry with given keyInfo and clientID in to openKeyTable.
+   * This will be called only from applyTransaction, once after calling
+   * applyKey in startTransaction.
+   *
+   * @param omKeyArgs
+   * @param keyInfo
+   * @param clientID
+   * @throws IOException
+   */
+  void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
+      throws IOException;
+
+  /**
+   * Initiate multipart upload for the specified key.
+   *
+   * This will be called only from applyTransaction.
+   * @param omKeyArgs
+   * @param multipartUploadID
+   * @return OmMultipartInfo
+   * @throws IOException
+   */
+  OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
+      String multipartUploadID) throws IOException;
+
 }
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 674edbb..eac8d2a 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -60,6 +60,7 @@ enum Type {
   ListKeys = 35;
   CommitKey = 36;
   AllocateBlock = 37;
+  ApplyCreateKey = 38;
 
   CreateS3Bucket = 41;
   DeleteS3Bucket = 42;
@@ -74,6 +75,8 @@ enum Type {
 
   ServiceList = 51;
 
+  ApplyInitiateMultiPartUpload = 52;
+
   GetDelegationToken = 61;
   RenewDelegationToken = 62;
   CancelDelegationToken = 63;
@@ -110,6 +113,8 @@ message OMRequest {
   optional ListKeysRequest                  listKeysRequest                = 35;
   optional CommitKeyRequest                 commitKeyRequest               = 36;
   optional AllocateBlockRequest             allocateBlockRequest           = 37;
+  optional ApplyCreateKeyRequest            applyCreateKeyRequest          = 38;
+
 
   optional S3CreateBucketRequest            createS3BucketRequest          = 41;
   optional S3DeleteBucketRequest            deleteS3BucketRequest          = 42;
@@ -123,6 +128,7 @@ message OMRequest {
   optional MultipartUploadListPartsRequest  listMultipartUploadPartsRequest = 50;
 
   optional ServiceListRequest               serviceListRequest             = 51;
+  optional MultipartInfoApplyInitiateRequest initiateMultiPartUploadApplyRequest = 52;
 
   optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
   optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
@@ -555,6 +561,11 @@ message CreateKeyResponse {
     optional uint64 openVersion = 4;
 }
 
+message ApplyCreateKeyRequest {
+    required CreateKeyRequest createKeyRequest = 1;
+    required CreateKeyResponse createKeyResponse = 2;
+}
+
 message LookupKeyRequest {
     required KeyArgs keyArgs = 1;
 }
@@ -722,6 +733,11 @@ message MultipartInfoInitiateRequest {
     required KeyArgs keyArgs = 1;
 }
 
+message MultipartInfoApplyInitiateRequest {
+    required KeyArgs keyArgs = 1;
+    required string multipartUploadID = 2;
+}
+
 message MultipartInfoInitiateResponse {
     required string volumeName = 1;
     required string bucketName = 2;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 86a83b7..f565ad0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -18,14 +18,21 @@ package org.apache.hadoop.ozone.om;
 
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -42,7 +49,9 @@ import org.junit.rules.Timeout;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
@@ -120,6 +129,7 @@ public class TestOzoneManagerHA {
   @Test
   public void testAllOMNodesRunning() throws Exception {
     createVolumeTest(true);
+    createKeyTest(true);
   }
 
   /**
@@ -131,6 +141,8 @@ public class TestOzoneManagerHA {
     Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
 
     createVolumeTest(true);
+
+    createKeyTest(true);
   }
 
   /**
@@ -143,8 +155,181 @@ public class TestOzoneManagerHA {
     Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
 
     createVolumeTest(false);
+
+    createKeyTest(false);
+
+  }
+
+  private OzoneBucket setupBucket() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    objectStore.createVolume(volumeName, createVolumeArgs);
+    OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+    Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+    Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+    Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+
+    String bucketName = UUID.randomUUID().toString();
+    retVolumeinfo.createBucket(bucketName);
+
+    OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+    Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
+    Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName));
+
+    return ozoneBucket;
+  }
+
+  @Test
+  public void testMultipartUpload() throws Exception {
+
+    // Happy scenario when all OM's are up.
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String keyName = UUID.randomUUID().toString();
+    String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+    createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+  }
+
+  @Test
+  public void testMultipartUploadWithOneOmNodeDown() throws Exception {
+
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String keyName = UUID.randomUUID().toString();
+    String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+    // After initiate multipartupload, shutdown leader OM.
+    // Stop leader OM, to see when the OM leader changes
+    // multipart upload is happening successfully or not.
+
+    OMFailoverProxyProvider omFailoverProxyProvider =
+        objectStore.getClientProxy().getOMProxyProvider();
+
+    // The OMFailoverProxyProvider will point to the current leader OM node.
+    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // Stop one of the ozone manager, to see when the OM leader changes
+    // multipart upload is happening successfully or not.
+    cluster.stopOzoneManager(leaderOMNodeId);
+
+
+    createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+    String newLeaderOMNodeId =
+        omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    Assert.assertTrue(leaderOMNodeId != newLeaderOMNodeId);
+  }
+
+
+  private String initiateMultipartUpload(OzoneBucket ozoneBucket,
+      String keyName) throws Exception {
+
+    OmMultipartInfo omMultipartInfo =
+        ozoneBucket.initiateMultipartUpload(keyName,
+            ReplicationType.RATIS,
+            ReplicationFactor.ONE);
+
+    String uploadID = omMultipartInfo.getUploadID();
+    Assert.assertTrue(uploadID != null);
+    return uploadID;
   }
 
+  private void createMultipartKeyAndReadKey(OzoneBucket ozoneBucket,
+      String keyName, String uploadID) throws Exception {
+
+    String value = "random data";
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey(
+        keyName, value.length(), 1, uploadID);
+    ozoneOutputStream.write(value.getBytes(), 0, value.length());
+    ozoneOutputStream.close();
+
+
+    Map<Integer, String> partsMap = new HashMap<>();
+    partsMap.put(1, ozoneOutputStream.getCommitUploadPartInfo().getPartName());
+    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+        ozoneBucket.completeMultipartUpload(keyName, uploadID, partsMap);
+
+    Assert.assertTrue(omMultipartUploadCompleteInfo != null);
+    Assert.assertTrue(omMultipartUploadCompleteInfo.getHash() != null);
+
+
+    OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
+
+    byte[] fileContent = new byte[value.getBytes().length];
+    ozoneInputStream.read(fileContent);
+    Assert.assertEquals(value, new String(fileContent));
+  }
+
+
+  private void createKeyTest(boolean checkSuccess) throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    try {
+      objectStore.createVolume(volumeName, createVolumeArgs);
+
+      OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+      Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+      Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+      Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+
+      String bucketName = UUID.randomUUID().toString();
+      String keyName = UUID.randomUUID().toString();
+      retVolumeinfo.createBucket(bucketName);
+
+      OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+      Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
+      Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName));
+
+      String value = "random data";
+      OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
+          value.length(), ReplicationType.STAND_ALONE,
+          ReplicationFactor.ONE, new HashMap<>());
+      ozoneOutputStream.write(value.getBytes(), 0, value.length());
+      ozoneOutputStream.close();
+
+      OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
+
+      byte[] fileContent = new byte[value.getBytes().length];
+      ozoneInputStream.read(fileContent);
+      Assert.assertEquals(value, new String(fileContent));
+
+    } catch (ConnectException | RemoteException e) {
+      if (!checkSuccess) {
+        // If the last OM to be tried by the RetryProxy is down, we would get
+        // ConnectException. Otherwise, we would get a RemoteException from the
+        // last running OM as it would fail to get a quorum.
+        if (e instanceof RemoteException) {
+          GenericTestUtils.assertExceptionContains(
+              "RaftRetryFailureException", e);
+        }
+      } else {
+        throw e;
+      }
+    }
+
+
+  }
   /**
    * Create a volume and test its attribute.
    */
@@ -186,6 +371,8 @@ public class TestOzoneManagerHA {
     }
   }
 
+
+
   /**
    * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
    * cluster.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index e5cf200..0006e93 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -29,7 +29,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyLocation;
 import org.apache.hadoop.utils.BackgroundService;
 
 import java.io.IOException;
@@ -89,7 +94,7 @@ public interface KeyManager extends OzoneManagerFS {
    * @throws IOException
    */
   OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
-      OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException;
+      KeyLocation keyLocation) throws IOException;
 
   /**
    * Given the args of a key to put, write an open key entry to meta data.
@@ -105,6 +110,19 @@ public interface KeyManager extends OzoneManagerFS {
   OpenKeySession openKey(OmKeyArgs args) throws IOException;
 
   /**
+   * Add the openKey entry with given keyInfo and clientID in to openKeyTable.
+   * This will be called only from applyTransaction, once after calling
+   * applyKey in startTransaction.
+   *
+   * @param omKeyArgs
+   * @param keyInfo
+   * @param clientID
+   * @throws IOException
+   */
+  void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
+      throws IOException;
+
+  /**
    * Look up an existing key. Return the info of the key to client side, which
    * DistributedStorageHandler will use to access the data on datanode.
    *
@@ -214,6 +232,17 @@ public interface KeyManager extends OzoneManagerFS {
   OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
 
   /**
+   * Initiate multipart upload for the specified key.
+   *
+   * @param keyArgs
+   * @param multipartUploadID
+   * @return MultipartInfo
+   * @throws IOException
+   */
+  OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
+      String multipartUploadID) throws IOException;
+
+  /**
    * Commit Multipart upload part file.
    * @param omKeyArgs
    * @param clientID
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 591bb00..83a60c0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -65,7 +65,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyLocation;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -233,7 +238,7 @@ public class KeyManagerImpl implements KeyManager {
 
   @Override
   public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
-      OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException {
+      KeyLocation keyLocation) throws IOException {
     Preconditions.checkNotNull(args);
     Preconditions.checkNotNull(keyLocation);
 
@@ -518,10 +523,49 @@ public class KeyManagerImpl implements KeyManager {
           allocateBlock(keyInfo, new ExcludeList(), args.getDataSize());
       keyInfo.appendNewBlocks(locationInfos);
     }
-    metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+
+    // When OM is not managed via ratis we should write in to Om db in
+    // openKey call.
+    if (!isRatisEnabled) {
+      metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+    }
     return new OpenKeySession(currentTime, keyInfo, openVersion);
   }
 
+  public void applyOpenKey(KeyArgs omKeyArgs,
+      KeyInfo keyInfo, long clientID) throws IOException {
+    Preconditions.checkNotNull(omKeyArgs);
+    String volumeName = omKeyArgs.getVolumeName();
+    String bucketName = omKeyArgs.getBucketName();
+
+    // Do we need to call again validateBucket, as this is just called after
+    // start Transaction from applyTransaction. Can we remove this double
+    // check?
+    validateBucket(volumeName, bucketName);
+
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    String keyName = omKeyArgs.getKeyName();
+
+    // TODO: here if on OM machines clocks are skewed and there is a chance
+    //  for override of the openKey entries.
+    try {
+      String openKey = metadataManager.getOpenKey(
+          volumeName, bucketName, keyName, clientID);
+
+      OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+      metadataManager.getOpenKeyTable().put(openKey,
+          omKeyInfo);
+    } catch (IOException ex) {
+      LOG.error("Apply Open Key failed for volume:{} bucket:{} key:{}",
+          volumeName, bucketName, keyName, ex);
+      throw new OMException(ex.getMessage(),
+          ResultCodes.KEY_ALLOCATION_ERROR);
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+  }
+
   /**
    * Create OmKeyInfo object.
    * @param keyArgs
@@ -826,17 +870,22 @@ public class KeyManagerImpl implements KeyManager {
   @Override
   public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
       IOException {
-    Preconditions.checkNotNull(omKeyArgs);
-    String volumeName = omKeyArgs.getVolumeName();
-    String bucketName = omKeyArgs.getBucketName();
-    String keyName = omKeyArgs.getKeyName();
+    long time = Time.monotonicNowNanos();
+    String uploadID = UUID.randomUUID().toString() + "-" + time;
+    return applyInitiateMultipartUpload(omKeyArgs, uploadID);
+  }
+
+  public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
+      String multipartUploadID) throws IOException {
+    Preconditions.checkNotNull(keyArgs);
+    Preconditions.checkNotNull(multipartUploadID);
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String keyName = keyArgs.getKeyName();
 
     metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     validateS3Bucket(volumeName, bucketName);
     try {
-      long time = Time.monotonicNowNanos();
-      String uploadID = UUID.randomUUID().toString() + "-" + Long.toString(
-          time);
 
       // We are adding uploadId to key, because if multiple users try to
       // perform multipart upload on the same key, each will try to upload, who
@@ -852,24 +901,24 @@ public class KeyManagerImpl implements KeyManager {
       // new uploadId is returned.
 
       String multipartKey = metadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, uploadID);
+          bucketName, keyName, multipartUploadID);
 
       // Not checking if there is an already key for this in the keyTable, as
       // during final complete multipart upload we take care of this.
 
 
       Map<Integer, PartKeyInfo> partKeyInfoMap = new HashMap<>();
-      OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(uploadID,
-          partKeyInfoMap);
+      OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(
+          multipartUploadID, partKeyInfoMap);
       List<OmKeyLocationInfo> locations = new ArrayList<>();
       OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
-          .setVolumeName(omKeyArgs.getVolumeName())
-          .setBucketName(omKeyArgs.getBucketName())
-          .setKeyName(omKeyArgs.getKeyName())
+          .setVolumeName(keyArgs.getVolumeName())
+          .setBucketName(keyArgs.getBucketName())
+          .setKeyName(keyArgs.getKeyName())
           .setCreationTime(Time.now())
           .setModificationTime(Time.now())
-          .setReplicationType(omKeyArgs.getType())
-          .setReplicationFactor(omKeyArgs.getFactor())
+          .setReplicationType(keyArgs.getType())
+          .setReplicationFactor(keyArgs.getFactor())
           .setOmKeyLocationInfos(Collections.singletonList(
               new OmKeyLocationInfoGroup(0, locations)))
           .build();
@@ -882,11 +931,12 @@ public class KeyManagerImpl implements KeyManager {
         metadataManager.getOpenKeyTable().putWithBatch(batch,
             multipartKey, omKeyInfo);
         store.commitBatchOperation(batch);
-        return new OmMultipartInfo(volumeName, bucketName, keyName, uploadID);
+        return new OmMultipartInfo(volumeName, bucketName, keyName,
+            multipartUploadID);
       }
     } catch (IOException ex) {
       LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " +
-              "key:{}", volumeName, bucketName, keyName, ex);
+          "key:{}", volumeName, bucketName, keyName, ex);
       throw new OMException(ex.getMessage(),
           ResultCodes.INITIATE_MULTIPART_UPLOAD_ERROR);
     } finally {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 8f0781f..9fa297d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -72,7 +72,12 @@ import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyLocation;
 import org.apache.hadoop.ozone.security.OzoneSecurityException;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -1986,6 +1991,51 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   @Override
+  public void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
+      throws IOException {
+    // Do we need to check again Acl's for apply OpenKey call?
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          omKeyArgs.getVolumeName(), omKeyArgs.getBucketName(),
+          omKeyArgs.getKeyName());
+    }
+    boolean auditSuccess = true;
+    try {
+      keyManager.applyOpenKey(omKeyArgs, keyInfo, clientID);
+    } catch (Exception ex) {
+      metrics.incNumKeyAllocateFails();
+      auditSuccess = false;
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(
+          OMAction.APPLY_ALLOCATE_KEY,
+          (omKeyArgs == null) ? null : toAuditMap(omKeyArgs), ex));
+      throw ex;
+    } finally {
+      if(auditSuccess){
+        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+            OMAction.APPLY_ALLOCATE_KEY, (omKeyArgs == null) ? null :
+                toAuditMap(omKeyArgs)));
+      }
+    }
+  }
+
+  private Map<String, String> toAuditMap(KeyArgs omKeyArgs) {
+    Map<String, String> auditMap = new LinkedHashMap<>();
+    auditMap.put(OzoneConsts.VOLUME, omKeyArgs.getVolumeName());
+    auditMap.put(OzoneConsts.BUCKET, omKeyArgs.getBucketName());
+    auditMap.put(OzoneConsts.KEY, omKeyArgs.getKeyName());
+    auditMap.put(OzoneConsts.DATA_SIZE,
+        String.valueOf(omKeyArgs.getDataSize()));
+    auditMap.put(OzoneConsts.REPLICATION_TYPE,
+        omKeyArgs.hasType() ? omKeyArgs.getType().name() : null);
+    auditMap.put(OzoneConsts.REPLICATION_FACTOR,
+        omKeyArgs.hasFactor() ? omKeyArgs.getFactor().name() : null);
+    auditMap.put(OzoneConsts.KEY_LOCATION_INFO,
+        (omKeyArgs.getKeyLocationsList() != null) ?
+            omKeyArgs.getKeyLocationsList().toString() : null);
+    return auditMap;
+  }
+
+  @Override
   public void commitKey(OmKeyArgs args, long clientID)
       throws IOException {
     if(isAclEnabled) {
@@ -2474,6 +2524,28 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+
+  @Override
+  public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
+      String multipartUploadID) throws IOException {
+    OmMultipartInfo multipartInfo;
+    metrics.incNumInitiateMultipartUploads();
+    try {
+      multipartInfo = keyManager.applyInitiateMultipartUpload(keyArgs,
+          multipartUploadID);
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+          OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null :
+              keyArgs.toAuditMap()));
+    } catch (IOException ex) {
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(
+          OMAction.INITIATE_MULTIPART_UPLOAD,
+          (keyArgs == null) ? null : keyArgs.toAuditMap(), ex));
+      metrics.incNumInitiateMultipartUploadFails();
+      throw ex;
+    }
+    return multipartInfo;
+  }
+
   @Override
   public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
       IOException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index c9c48a4..cd99cd1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -40,6 +41,7 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
@@ -49,6 +51,8 @@ import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+
 /**
  * OM Ratis client to interact with OM Ratis server endpoint.
  */
@@ -128,10 +132,31 @@ public final class OzoneManagerRatisClient implements Closeable {
       CompletableFuture<OMResponse> reply = sendCommandAsync(request);
       return reply.get();
     } catch (ExecutionException | InterruptedException e) {
+      if (e.getCause() instanceof StateMachineException) {
+        OMResponse.Builder omResponse = OMResponse.newBuilder();
+        omResponse.setCmdType(request.getCmdType());
+        omResponse.setSuccess(false);
+        omResponse.setMessage(e.getCause().getMessage());
+        omResponse.setStatus(parseErrorStatus(e.getCause().getMessage()));
+        return omResponse.build();
+      }
       throw new ServiceException(e);
     }
   }
 
+  private OzoneManagerProtocolProtos.Status parseErrorStatus(String message) {
+    if (message.contains(STATUS_CODE)) {
+      String errorCode = message.substring(message.indexOf(STATUS_CODE) +
+          STATUS_CODE.length());
+      LOG.debug("Parsing error message for error code " +
+          errorCode);
+      return OzoneManagerProtocolProtos.Status.valueOf(errorCode.trim());
+    } else {
+      return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
+    }
+
+  }
+
   /**
    * Sends a given command to server gets a waitable future back.
    *
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 590b359..420ffb5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .ContainerStateMachine;
@@ -30,11 +31,14 @@ import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartInfoApplyInitiateRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
 import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -48,6 +52,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+
 /**
  * The OM StateMachine is the state machine for OM Ratis server. It is
  * responsible for applying ratis committed transactions to
@@ -108,11 +114,121 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
       ctxt.setException(ioe);
       return ctxt;
     }
+    return handleStartTransactionRequests(raftClientRequest, omRequest);
+
+  }
+
+  /**
+   * Handle the RaftClientRequest and return TransactionContext object.
+   * @param raftClientRequest
+   * @param omRequest
+   * @return TransactionContext
+   */
+  private TransactionContext handleStartTransactionRequests(
+      RaftClientRequest raftClientRequest, OMRequest omRequest) {
 
-    if (omRequest.getCmdType() ==
-        OzoneManagerProtocolProtos.Type.AllocateBlock) {
+    switch (omRequest.getCmdType()) {
+    case AllocateBlock:
       return handleAllocateBlock(raftClientRequest, omRequest);
+    case CreateKey:
+      return handleCreateKeyRequest(raftClientRequest, omRequest);
+    case InitiateMultiPartUpload:
+      return handleInitiateMultipartUpload(raftClientRequest, omRequest);
+    default:
+      return TransactionContext.newBuilder()
+          .setClientRequest(raftClientRequest)
+          .setStateMachine(this)
+          .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+          .setLogData(raftClientRequest.getMessage().getContent())
+          .build();
+    }
+
+  }
+
+
+  private TransactionContext handleInitiateMultipartUpload(
+      RaftClientRequest raftClientRequest, OMRequest omRequest) {
+
+    // Generate a multipart uploadID, and create a new request.
+    // When applyTransaction happen's all OM's use the same multipartUploadID
+    // for the key.
+
+    long time = Time.monotonicNowNanos();
+    String multipartUploadID = UUID.randomUUID().toString() + "-" + time;
+
+    MultipartInfoApplyInitiateRequest multipartInfoApplyInitiateRequest =
+        MultipartInfoApplyInitiateRequest.newBuilder()
+            .setKeyArgs(omRequest.getInitiateMultiPartUploadRequest()
+                .getKeyArgs()).setMultipartUploadID(multipartUploadID).build();
+
+    OMRequest.Builder newOmRequest =
+        OMRequest.newBuilder().setCmdType(
+            OzoneManagerProtocolProtos.Type.ApplyInitiateMultiPartUpload)
+            .setInitiateMultiPartUploadApplyRequest(
+                multipartInfoApplyInitiateRequest)
+            .setClientId(omRequest.getClientId());
+
+    if (omRequest.hasTraceID()) {
+      newOmRequest.setTraceID(omRequest.getTraceID());
+    }
+
+    ByteString messageContent =
+        ByteString.copyFrom(newOmRequest.build().toByteArray());
+
+    return TransactionContext.newBuilder()
+        .setClientRequest(raftClientRequest)
+        .setStateMachine(this)
+        .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+        .setLogData(messageContent)
+        .build();
+  }
+
+  /**
+   * Handle createKey Request, which needs a special handling. This request
+   * needs to be executed on the leader, and the response received from this
+   * request we need to create a ApplyKeyRequest and create a
+   * TransactionContext object.
+   */
+  private TransactionContext handleCreateKeyRequest(
+      RaftClientRequest raftClientRequest, OMRequest omRequest) {
+    OMResponse omResponse = handler.handle(omRequest);
+
+    // TODO: if not success should we retry depending on the error if it is
+    //  retriable?
+    if (!omResponse.getSuccess()) {
+      TransactionContext transactionContext = TransactionContext.newBuilder()
+          .setClientRequest(raftClientRequest)
+          .setStateMachine(this)
+          .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+          .build();
+      transactionContext.setException(
+          constructExceptionForFailedRequest(omResponse));
+      return transactionContext;
+    }
+
+    // Get original request
+    OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
+        omRequest.getCreateKeyRequest();
+
+    // Create Applykey Request.
+    OzoneManagerProtocolProtos.ApplyCreateKeyRequest applyCreateKeyRequest =
+        OzoneManagerProtocolProtos.ApplyCreateKeyRequest.newBuilder()
+            .setCreateKeyRequest(createKeyRequest)
+            .setCreateKeyResponse(omResponse.getCreateKeyResponse()).build();
+
+    OMRequest.Builder newOmRequest =
+        OMRequest.newBuilder().setCmdType(
+            OzoneManagerProtocolProtos.Type.ApplyCreateKey)
+            .setApplyCreateKeyRequest(applyCreateKeyRequest)
+            .setClientId(omRequest.getClientId());
+
+    if (omRequest.hasTraceID()) {
+      newOmRequest.setTraceID(omRequest.getTraceID());
     }
+
+    ByteString messageContent =
+        ByteString.copyFrom(newOmRequest.build().toByteArray());
+
     return TransactionContext.newBuilder()
         .setClientRequest(raftClientRequest)
         .setStateMachine(this)
@@ -121,6 +237,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
         .build();
   }
 
+
   /**
    * Handle AllocateBlock Request, which needs a special handling. This
    * request needs to be executed on the leader, where it connects to SCM and
@@ -148,9 +265,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
           .setStateMachine(this)
           .setServerRole(RaftProtos.RaftPeerRole.LEADER)
           .build();
-      IOException ioe = new IOException(omResponse.getMessage() +
-          " Status code " + omResponse.getStatus());
-      transactionContext.setException(ioe);
+      transactionContext.setException(
+          constructExceptionForFailedRequest(omResponse));
       return transactionContext;
     }
 
@@ -181,6 +297,17 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
 
   }
 
+  /**
+   * Construct IOException message for failed requests in StartTransaction.
+   * @param omResponse
+   * @return
+   */
+  private IOException constructExceptionForFailedRequest(
+      OMResponse omResponse) {
+    return new IOException(omResponse.getMessage() + " " +
+        STATUS_CODE + omResponse.getStatus());
+  }
+
   /*
    * Apply a committed log entry to the state machine.
    */
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 3fccf1b..7660ed1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -47,6 +47,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFile
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .ApplyCreateKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartInfoApplyInitiateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessResponse;
@@ -205,6 +209,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
             request.getCreateKeyRequest());
         responseBuilder.setCreateKeyResponse(createKeyResponse);
         break;
+      case ApplyCreateKey:
+        CreateKeyResponse applyKeyResponse =
+            applyCreateKey(request.getApplyCreateKeyRequest());
+        responseBuilder.setCreateKeyResponse(applyKeyResponse);
+        break;
       case LookupKey:
         LookupKeyResponse lookupKeyResponse = lookupKey(
             request.getLookupKeyRequest());
@@ -262,6 +271,13 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         responseBuilder.setInitiateMultiPartUploadResponse(
             multipartInfoInitiateResponse);
         break;
+      case ApplyInitiateMultiPartUpload:
+        MultipartInfoInitiateResponse response =
+            applyInitiateMultiPartUpload(
+                request.getInitiateMultiPartUploadApplyRequest());
+        responseBuilder.setInitiateMultiPartUploadResponse(
+            response);
+        break;
       case CommitMultiPartUpload:
         MultipartCommitUploadPartResponse commitUploadPartResponse =
             commitMultipartUploadPart(
@@ -498,6 +514,20 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     return resp.build();
   }
 
+  private CreateKeyResponse applyCreateKey(ApplyCreateKeyRequest request)
+      throws IOException {
+
+    CreateKeyRequest createKeyRequest = request.getCreateKeyRequest();
+    CreateKeyResponse createKeyResponse = request.getCreateKeyResponse();
+
+    impl.applyOpenKey(createKeyRequest.getKeyArgs(),
+        createKeyResponse.getKeyInfo(), createKeyResponse.getID());
+
+    // If applying to om DB successful just return createKeyResponse.
+    return createKeyResponse;
+
+  }
+
   private LookupKeyResponse lookupKey(LookupKeyRequest request)
       throws IOException {
     LookupKeyResponse.Builder resp =
@@ -731,6 +761,30 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     return resp.build();
   }
 
+  private MultipartInfoInitiateResponse applyInitiateMultiPartUpload(
+      MultipartInfoApplyInitiateRequest request) throws IOException {
+    MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
+        .newBuilder();
+
+    KeyArgs keyArgs = request.getKeyArgs();
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .setType(keyArgs.getType())
+        .setFactor(keyArgs.getFactor())
+        .build();
+    OmMultipartInfo multipartInfo =
+        impl.applyInitiateMultipartUpload(omKeyArgs,
+            request.getMultipartUploadID());
+    resp.setVolumeName(multipartInfo.getVolumeName());
+    resp.setBucketName(multipartInfo.getBucketName());
+    resp.setKeyName(multipartInfo.getKeyName());
+    resp.setMultipartUploadID(multipartInfo.getUploadID());
+
+    return resp.build();
+  }
+
   private MultipartCommitUploadPartResponse commitMultipartUploadPart(
       MultipartCommitUploadPartRequest request) throws IOException {
     MultipartCommitUploadPartResponse.Builder resp =
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index 4406af6..9613582 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -251,7 +251,7 @@ public class TestOzoneManagerStateMachine {
     // As the request failed, check for keyLocation and  the transaction
     // context error message
     Assert.assertFalse(newOmRequest.getAllocateBlockRequest().hasKeyLocation());
-    Assert.assertEquals("Scm in Chill mode Status code "
+    Assert.assertEquals("Scm in Chill mode " + OMException.STATUS_CODE
             + OMException.ResultCodes.SCM_IN_CHILL_MODE,
         transactionContext.getException().getMessage());
     Assert.assertTrue(transactionContext.getException() instanceof IOException);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org