You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/03/27 04:48:08 UTC
[hadoop] branch trunk updated: HDDS-1262. In OM HA OpenKey call
Should happen only leader OM. (#626)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new eef8cae HDDS-1262. In OM HA OpenKey call Should happen only leader OM. (#626)
eef8cae is described below
commit eef8cae7cf42c2d1622970e177d699546351587f
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Tue Mar 26 21:48:01 2019 -0700
HDDS-1262. In OM HA OpenKey call Should happen only leader OM. (#626)
---
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 2 +
.../org/apache/hadoop/ozone/audit/OMAction.java | 1 +
.../hadoop/ozone/om/exceptions/OMException.java | 2 +
.../ozone/om/protocol/OzoneManagerHAProtocol.java | 30 ++++
.../src/main/proto/OzoneManagerProtocol.proto | 16 ++
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 187 +++++++++++++++++++++
.../org/apache/hadoop/ozone/om/KeyManager.java | 33 +++-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 90 +++++++---
.../org/apache/hadoop/ozone/om/OzoneManager.java | 74 +++++++-
.../ozone/om/ratis/OzoneManagerRatisClient.java | 25 +++
.../ozone/om/ratis/OzoneManagerStateMachine.java | 137 ++++++++++++++-
.../protocolPB/OzoneManagerRequestHandler.java | 54 ++++++
.../om/ratis/TestOzoneManagerStateMachine.java | 2 +-
13 files changed, 624 insertions(+), 29 deletions(-)
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 280461a..be879d8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -210,6 +210,8 @@ public final class OmUtils {
case GetDelegationToken:
case RenewDelegationToken:
case CancelDelegationToken:
+ case ApplyCreateKey:
+ case ApplyInitiateMultiPartUpload:
return false;
default:
LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 3863a52..0cbab08 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -26,6 +26,7 @@ public enum OMAction implements AuditAction {
ALLOCATE_BLOCK,
ADD_ALLOCATE_BLOCK,
ALLOCATE_KEY,
+ APPLY_ALLOCATE_KEY,
COMMIT_KEY,
CREATE_VOLUME,
CREATE_BUCKET,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 34980f6..b2f805a 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -23,6 +23,8 @@ import java.io.IOException;
* Exception thrown by Ozone Manager.
*/
public class OMException extends IOException {
+
+ public static final String STATUS_CODE = "STATUS_CODE=";
private final OMException.ResultCodes result;
/**
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index 7390fe2..8357df2 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
@@ -52,4 +57,29 @@ public interface OzoneManagerHAProtocol {
KeyLocation keyLocation) throws IOException;
+ /**
+ * Add the openKey entry with given keyInfo and clientID in to openKeyTable.
+ * This will be called only from applyTransaction, once after calling
+ * applyKey in startTransaction.
+ *
+ * @param omKeyArgs
+ * @param keyInfo
+ * @param clientID
+ * @throws IOException
+ */
+ void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
+ throws IOException;
+
+ /**
+ * Initiate multipart upload for the specified key.
+ *
+ * This will be called only from applyTransaction.
+ * @param omKeyArgs
+ * @param multipartUploadID
+ * @return OmMultipartInfo
+ * @throws IOException
+ */
+ OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
+ String multipartUploadID) throws IOException;
+
}
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 674edbb..eac8d2a 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -60,6 +60,7 @@ enum Type {
ListKeys = 35;
CommitKey = 36;
AllocateBlock = 37;
+ ApplyCreateKey = 38;
CreateS3Bucket = 41;
DeleteS3Bucket = 42;
@@ -74,6 +75,8 @@ enum Type {
ServiceList = 51;
+ ApplyInitiateMultiPartUpload = 52;
+
GetDelegationToken = 61;
RenewDelegationToken = 62;
CancelDelegationToken = 63;
@@ -110,6 +113,8 @@ message OMRequest {
optional ListKeysRequest listKeysRequest = 35;
optional CommitKeyRequest commitKeyRequest = 36;
optional AllocateBlockRequest allocateBlockRequest = 37;
+ optional ApplyCreateKeyRequest applyCreateKeyRequest = 38;
+
optional S3CreateBucketRequest createS3BucketRequest = 41;
optional S3DeleteBucketRequest deleteS3BucketRequest = 42;
@@ -123,6 +128,7 @@ message OMRequest {
optional MultipartUploadListPartsRequest listMultipartUploadPartsRequest = 50;
optional ServiceListRequest serviceListRequest = 51;
+ optional MultipartInfoApplyInitiateRequest initiateMultiPartUploadApplyRequest = 52;
optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
@@ -555,6 +561,11 @@ message CreateKeyResponse {
optional uint64 openVersion = 4;
}
+message ApplyCreateKeyRequest {
+ required CreateKeyRequest createKeyRequest = 1;
+ required CreateKeyResponse createKeyResponse = 2;
+}
+
message LookupKeyRequest {
required KeyArgs keyArgs = 1;
}
@@ -722,6 +733,11 @@ message MultipartInfoInitiateRequest {
required KeyArgs keyArgs = 1;
}
+message MultipartInfoApplyInitiateRequest {
+ required KeyArgs keyArgs = 1;
+ required string multipartUploadID = 2;
+}
+
message MultipartInfoInitiateResponse {
required string volumeName = 1;
required string bucketName = 2;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 86a83b7..f565ad0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -18,14 +18,21 @@ package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -42,7 +49,9 @@ import org.junit.rules.Timeout;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
@@ -120,6 +129,7 @@ public class TestOzoneManagerHA {
@Test
public void testAllOMNodesRunning() throws Exception {
createVolumeTest(true);
+ createKeyTest(true);
}
/**
@@ -131,6 +141,8 @@ public class TestOzoneManagerHA {
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
createVolumeTest(true);
+
+ createKeyTest(true);
}
/**
@@ -143,8 +155,181 @@ public class TestOzoneManagerHA {
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
createVolumeTest(false);
+
+ createKeyTest(false);
+
+ }
+
+ private OzoneBucket setupBucket() throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ objectStore.createVolume(volumeName, createVolumeArgs);
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+ Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+ Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+
+ String bucketName = UUID.randomUUID().toString();
+ retVolumeinfo.createBucket(bucketName);
+
+ OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+ Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
+ Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName));
+
+ return ozoneBucket;
+ }
+
+ @Test
+ public void testMultipartUpload() throws Exception {
+
+ // Happy scenario when all OM's are up.
+ OzoneBucket ozoneBucket = setupBucket();
+
+ String keyName = UUID.randomUUID().toString();
+ String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+ createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+ }
+
+ @Test
+ public void testMultipartUploadWithOneOmNodeDown() throws Exception {
+
+ OzoneBucket ozoneBucket = setupBucket();
+
+ String keyName = UUID.randomUUID().toString();
+ String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+ // After initiate multipartupload, shutdown leader OM.
+ // Stop leader OM, to see when the OM leader changes
+ // multipart upload is happening successfully or not.
+
+ OMFailoverProxyProvider omFailoverProxyProvider =
+ objectStore.getClientProxy().getOMProxyProvider();
+
+ // The OMFailoverProxyProvider will point to the current leader OM node.
+ String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Stop one of the ozone manager, to see when the OM leader changes
+ // multipart upload is happening successfully or not.
+ cluster.stopOzoneManager(leaderOMNodeId);
+
+
+ createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+ String newLeaderOMNodeId =
+ omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ Assert.assertTrue(leaderOMNodeId != newLeaderOMNodeId);
+ }
+
+
+ private String initiateMultipartUpload(OzoneBucket ozoneBucket,
+ String keyName) throws Exception {
+
+ OmMultipartInfo omMultipartInfo =
+ ozoneBucket.initiateMultipartUpload(keyName,
+ ReplicationType.RATIS,
+ ReplicationFactor.ONE);
+
+ String uploadID = omMultipartInfo.getUploadID();
+ Assert.assertTrue(uploadID != null);
+ return uploadID;
}
+ private void createMultipartKeyAndReadKey(OzoneBucket ozoneBucket,
+ String keyName, String uploadID) throws Exception {
+
+ String value = "random data";
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey(
+ keyName, value.length(), 1, uploadID);
+ ozoneOutputStream.write(value.getBytes(), 0, value.length());
+ ozoneOutputStream.close();
+
+
+ Map<Integer, String> partsMap = new HashMap<>();
+ partsMap.put(1, ozoneOutputStream.getCommitUploadPartInfo().getPartName());
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+ ozoneBucket.completeMultipartUpload(keyName, uploadID, partsMap);
+
+ Assert.assertTrue(omMultipartUploadCompleteInfo != null);
+ Assert.assertTrue(omMultipartUploadCompleteInfo.getHash() != null);
+
+
+ OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
+
+ byte[] fileContent = new byte[value.getBytes().length];
+ ozoneInputStream.read(fileContent);
+ Assert.assertEquals(value, new String(fileContent));
+ }
+
+
+ private void createKeyTest(boolean checkSuccess) throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ try {
+ objectStore.createVolume(volumeName, createVolumeArgs);
+
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+ Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+ Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+ retVolumeinfo.createBucket(bucketName);
+
+ OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+ Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
+ Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName));
+
+ String value = "random data";
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
+ value.length(), ReplicationType.STAND_ALONE,
+ ReplicationFactor.ONE, new HashMap<>());
+ ozoneOutputStream.write(value.getBytes(), 0, value.length());
+ ozoneOutputStream.close();
+
+ OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
+
+ byte[] fileContent = new byte[value.getBytes().length];
+ ozoneInputStream.read(fileContent);
+ Assert.assertEquals(value, new String(fileContent));
+
+ } catch (ConnectException | RemoteException e) {
+ if (!checkSuccess) {
+ // If the last OM to be tried by the RetryProxy is down, we would get
+ // ConnectException. Otherwise, we would get a RemoteException from the
+ // last running OM as it would fail to get a quorum.
+ if (e instanceof RemoteException) {
+ GenericTestUtils.assertExceptionContains(
+ "RaftRetryFailureException", e);
+ }
+ } else {
+ throw e;
+ }
+ }
+
+
+ }
/**
* Create a volume and test its attribute.
*/
@@ -186,6 +371,8 @@ public class TestOzoneManagerHA {
}
}
+
+
/**
* Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
* cluster.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index e5cf200..0006e93 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -29,7 +29,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyLocation;
import org.apache.hadoop.utils.BackgroundService;
import java.io.IOException;
@@ -89,7 +94,7 @@ public interface KeyManager extends OzoneManagerFS {
* @throws IOException
*/
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
- OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException;
+ KeyLocation keyLocation) throws IOException;
/**
* Given the args of a key to put, write an open key entry to meta data.
@@ -105,6 +110,19 @@ public interface KeyManager extends OzoneManagerFS {
OpenKeySession openKey(OmKeyArgs args) throws IOException;
/**
+ * Add the openKey entry with given keyInfo and clientID in to openKeyTable.
+ * This will be called only from applyTransaction, once after calling
+ * applyKey in startTransaction.
+ *
+ * @param omKeyArgs
+ * @param keyInfo
+ * @param clientID
+ * @throws IOException
+ */
+ void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
+ throws IOException;
+
+ /**
* Look up an existing key. Return the info of the key to client side, which
* DistributedStorageHandler will use to access the data on datanode.
*
@@ -214,6 +232,17 @@ public interface KeyManager extends OzoneManagerFS {
OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
/**
+ * Initiate multipart upload for the specified key.
+ *
+ * @param keyArgs
+ * @param multipartUploadID
+ * @return MultipartInfo
+ * @throws IOException
+ */
+ OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
+ String multipartUploadID) throws IOException;
+
+ /**
* Commit Multipart upload part file.
* @param omKeyArgs
* @param clientID
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 591bb00..83a60c0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -65,7 +65,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyLocation;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -233,7 +238,7 @@ public class KeyManagerImpl implements KeyManager {
@Override
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
- OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException {
+ KeyLocation keyLocation) throws IOException {
Preconditions.checkNotNull(args);
Preconditions.checkNotNull(keyLocation);
@@ -518,10 +523,49 @@ public class KeyManagerImpl implements KeyManager {
allocateBlock(keyInfo, new ExcludeList(), args.getDataSize());
keyInfo.appendNewBlocks(locationInfos);
}
- metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+
+ // When OM is not managed via ratis we should write in to Om db in
+ // openKey call.
+ if (!isRatisEnabled) {
+ metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+ }
return new OpenKeySession(currentTime, keyInfo, openVersion);
}
+ public void applyOpenKey(KeyArgs omKeyArgs,
+ KeyInfo keyInfo, long clientID) throws IOException {
+ Preconditions.checkNotNull(omKeyArgs);
+ String volumeName = omKeyArgs.getVolumeName();
+ String bucketName = omKeyArgs.getBucketName();
+
+ // Do we need to call again validateBucket, as this is just called after
+ // start Transaction from applyTransaction. Can we remove this double
+ // check?
+ validateBucket(volumeName, bucketName);
+
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+ String keyName = omKeyArgs.getKeyName();
+
+ // TODO: here if on OM machines clocks are skewed and there is a chance
+ // for override of the openKey entries.
+ try {
+ String openKey = metadataManager.getOpenKey(
+ volumeName, bucketName, keyName, clientID);
+
+ OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+ metadataManager.getOpenKeyTable().put(openKey,
+ omKeyInfo);
+ } catch (IOException ex) {
+ LOG.error("Apply Open Key failed for volume:{} bucket:{} key:{}",
+ volumeName, bucketName, keyName, ex);
+ throw new OMException(ex.getMessage(),
+ ResultCodes.KEY_ALLOCATION_ERROR);
+ } finally {
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+ }
+ }
+
/**
* Create OmKeyInfo object.
* @param keyArgs
@@ -826,17 +870,22 @@ public class KeyManagerImpl implements KeyManager {
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
IOException {
- Preconditions.checkNotNull(omKeyArgs);
- String volumeName = omKeyArgs.getVolumeName();
- String bucketName = omKeyArgs.getBucketName();
- String keyName = omKeyArgs.getKeyName();
+ long time = Time.monotonicNowNanos();
+ String uploadID = UUID.randomUUID().toString() + "-" + time;
+ return applyInitiateMultipartUpload(omKeyArgs, uploadID);
+ }
+
+ public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
+ String multipartUploadID) throws IOException {
+ Preconditions.checkNotNull(keyArgs);
+ Preconditions.checkNotNull(multipartUploadID);
+ String volumeName = keyArgs.getVolumeName();
+ String bucketName = keyArgs.getBucketName();
+ String keyName = keyArgs.getKeyName();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
validateS3Bucket(volumeName, bucketName);
try {
- long time = Time.monotonicNowNanos();
- String uploadID = UUID.randomUUID().toString() + "-" + Long.toString(
- time);
// We are adding uploadId to key, because if multiple users try to
// perform multipart upload on the same key, each will try to upload, who
@@ -852,24 +901,24 @@ public class KeyManagerImpl implements KeyManager {
// new uploadId is returned.
String multipartKey = metadataManager.getMultipartKey(volumeName,
- bucketName, keyName, uploadID);
+ bucketName, keyName, multipartUploadID);
// Not checking if there is an already key for this in the keyTable, as
// during final complete multipart upload we take care of this.
Map<Integer, PartKeyInfo> partKeyInfoMap = new HashMap<>();
- OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(uploadID,
- partKeyInfoMap);
+ OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(
+ multipartUploadID, partKeyInfoMap);
List<OmKeyLocationInfo> locations = new ArrayList<>();
OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
- .setVolumeName(omKeyArgs.getVolumeName())
- .setBucketName(omKeyArgs.getBucketName())
- .setKeyName(omKeyArgs.getKeyName())
+ .setVolumeName(keyArgs.getVolumeName())
+ .setBucketName(keyArgs.getBucketName())
+ .setKeyName(keyArgs.getKeyName())
.setCreationTime(Time.now())
.setModificationTime(Time.now())
- .setReplicationType(omKeyArgs.getType())
- .setReplicationFactor(omKeyArgs.getFactor())
+ .setReplicationType(keyArgs.getType())
+ .setReplicationFactor(keyArgs.getFactor())
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, locations)))
.build();
@@ -882,11 +931,12 @@ public class KeyManagerImpl implements KeyManager {
metadataManager.getOpenKeyTable().putWithBatch(batch,
multipartKey, omKeyInfo);
store.commitBatchOperation(batch);
- return new OmMultipartInfo(volumeName, bucketName, keyName, uploadID);
+ return new OmMultipartInfo(volumeName, bucketName, keyName,
+ multipartUploadID);
}
} catch (IOException ex) {
LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " +
- "key:{}", volumeName, bucketName, keyName, ex);
+ "key:{}", volumeName, bucketName, keyName, ex);
throw new OMException(ex.getMessage(),
ResultCodes.INITIATE_MULTIPART_UPLOAD_ERROR);
} finally {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 8f0781f..9fa297d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -72,7 +72,12 @@ import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .KeyLocation;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -1986,6 +1991,51 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
@Override
+ public void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
+ throws IOException {
+ // Do we need to check again Acl's for apply OpenKey call?
+ if(isAclEnabled) {
+ checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+ omKeyArgs.getVolumeName(), omKeyArgs.getBucketName(),
+ omKeyArgs.getKeyName());
+ }
+ boolean auditSuccess = true;
+ try {
+ keyManager.applyOpenKey(omKeyArgs, keyInfo, clientID);
+ } catch (Exception ex) {
+ metrics.incNumKeyAllocateFails();
+ auditSuccess = false;
+ AUDIT.logWriteFailure(buildAuditMessageForFailure(
+ OMAction.APPLY_ALLOCATE_KEY,
+ (omKeyArgs == null) ? null : toAuditMap(omKeyArgs), ex));
+ throw ex;
+ } finally {
+ if(auditSuccess){
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ OMAction.APPLY_ALLOCATE_KEY, (omKeyArgs == null) ? null :
+ toAuditMap(omKeyArgs)));
+ }
+ }
+ }
+
+ private Map<String, String> toAuditMap(KeyArgs omKeyArgs) {
+ Map<String, String> auditMap = new LinkedHashMap<>();
+ auditMap.put(OzoneConsts.VOLUME, omKeyArgs.getVolumeName());
+ auditMap.put(OzoneConsts.BUCKET, omKeyArgs.getBucketName());
+ auditMap.put(OzoneConsts.KEY, omKeyArgs.getKeyName());
+ auditMap.put(OzoneConsts.DATA_SIZE,
+ String.valueOf(omKeyArgs.getDataSize()));
+ auditMap.put(OzoneConsts.REPLICATION_TYPE,
+ omKeyArgs.hasType() ? omKeyArgs.getType().name() : null);
+ auditMap.put(OzoneConsts.REPLICATION_FACTOR,
+ omKeyArgs.hasFactor() ? omKeyArgs.getFactor().name() : null);
+ auditMap.put(OzoneConsts.KEY_LOCATION_INFO,
+ (omKeyArgs.getKeyLocationsList() != null) ?
+ omKeyArgs.getKeyLocationsList().toString() : null);
+ return auditMap;
+ }
+
+ @Override
public void commitKey(OmKeyArgs args, long clientID)
throws IOException {
if(isAclEnabled) {
@@ -2474,6 +2524,28 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
+
+ @Override
+ public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
+ String multipartUploadID) throws IOException {
+ OmMultipartInfo multipartInfo;
+ metrics.incNumInitiateMultipartUploads();
+ try {
+ multipartInfo = keyManager.applyInitiateMultipartUpload(keyArgs,
+ multipartUploadID);
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null :
+ keyArgs.toAuditMap()));
+ } catch (IOException ex) {
+ AUDIT.logWriteFailure(buildAuditMessageForFailure(
+ OMAction.INITIATE_MULTIPART_UPLOAD,
+ (keyArgs == null) ? null : keyArgs.toAuditMap(), ex));
+ metrics.incNumInitiateMultipartUploadFails();
+ throw ex;
+ }
+ return multipartInfo;
+ }
+
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index c9c48a4..cd99cd1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -40,6 +41,7 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
@@ -49,6 +51,8 @@ import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+
/**
* OM Ratis client to interact with OM Ratis server endpoint.
*/
@@ -128,10 +132,31 @@ public final class OzoneManagerRatisClient implements Closeable {
CompletableFuture<OMResponse> reply = sendCommandAsync(request);
return reply.get();
} catch (ExecutionException | InterruptedException e) {
+ if (e.getCause() instanceof StateMachineException) {
+ OMResponse.Builder omResponse = OMResponse.newBuilder();
+ omResponse.setCmdType(request.getCmdType());
+ omResponse.setSuccess(false);
+ omResponse.setMessage(e.getCause().getMessage());
+ omResponse.setStatus(parseErrorStatus(e.getCause().getMessage()));
+ return omResponse.build();
+ }
throw new ServiceException(e);
}
}
+ private OzoneManagerProtocolProtos.Status parseErrorStatus(String message) {
+ if (message.contains(STATUS_CODE)) {
+ String errorCode = message.substring(message.indexOf(STATUS_CODE) +
+ STATUS_CODE.length());
+ LOG.debug("Parsing error message for error code " +
+ errorCode);
+ return OzoneManagerProtocolProtos.Status.valueOf(errorCode.trim());
+ } else {
+ return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
+ }
+
+ }
+
/**
* Sends a given command to server gets a waitable future back.
*
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 590b359..420ffb5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
@@ -30,11 +31,14 @@ import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartInfoApplyInitiateRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -48,6 +52,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+
/**
* The OM StateMachine is the state machine for OM Ratis server. It is
* responsible for applying ratis committed transactions to
@@ -108,11 +114,121 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
ctxt.setException(ioe);
return ctxt;
}
+ return handleStartTransactionRequests(raftClientRequest, omRequest);
+
+ }
+
+ /**
+ * Handle the RaftClientRequest and return TransactionContext object.
+ * @param raftClientRequest
+ * @param omRequest
+ * @return TransactionContext
+ */
+ private TransactionContext handleStartTransactionRequests(
+ RaftClientRequest raftClientRequest, OMRequest omRequest) {
- if (omRequest.getCmdType() ==
- OzoneManagerProtocolProtos.Type.AllocateBlock) {
+ switch (omRequest.getCmdType()) {
+ case AllocateBlock:
return handleAllocateBlock(raftClientRequest, omRequest);
+ case CreateKey:
+ return handleCreateKeyRequest(raftClientRequest, omRequest);
+ case InitiateMultiPartUpload:
+ return handleInitiateMultipartUpload(raftClientRequest, omRequest);
+ default:
+ return TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .setLogData(raftClientRequest.getMessage().getContent())
+ .build();
+ }
+
+ }
+
+
+ private TransactionContext handleInitiateMultipartUpload(
+ RaftClientRequest raftClientRequest, OMRequest omRequest) {
+
+ // Generate a multipart uploadID, and create a new request.
+ // When applyTransaction happen's all OM's use the same multipartUploadID
+ // for the key.
+
+ long time = Time.monotonicNowNanos();
+ String multipartUploadID = UUID.randomUUID().toString() + "-" + time;
+
+ MultipartInfoApplyInitiateRequest multipartInfoApplyInitiateRequest =
+ MultipartInfoApplyInitiateRequest.newBuilder()
+ .setKeyArgs(omRequest.getInitiateMultiPartUploadRequest()
+ .getKeyArgs()).setMultipartUploadID(multipartUploadID).build();
+
+ OMRequest.Builder newOmRequest =
+ OMRequest.newBuilder().setCmdType(
+ OzoneManagerProtocolProtos.Type.ApplyInitiateMultiPartUpload)
+ .setInitiateMultiPartUploadApplyRequest(
+ multipartInfoApplyInitiateRequest)
+ .setClientId(omRequest.getClientId());
+
+ if (omRequest.hasTraceID()) {
+ newOmRequest.setTraceID(omRequest.getTraceID());
+ }
+
+ ByteString messageContent =
+ ByteString.copyFrom(newOmRequest.build().toByteArray());
+
+ return TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .setLogData(messageContent)
+ .build();
+ }
+
+ /**
+ * Handle createKey Request, which needs a special handling. This request
+ * needs to be executed on the leader, and the response received from this
+ * request we need to create a ApplyKeyRequest and create a
+ * TransactionContext object.
+ */
+ private TransactionContext handleCreateKeyRequest(
+ RaftClientRequest raftClientRequest, OMRequest omRequest) {
+ OMResponse omResponse = handler.handle(omRequest);
+
+ // TODO: if not success should we retry depending on the error if it is
+ // retriable?
+ if (!omResponse.getSuccess()) {
+ TransactionContext transactionContext = TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .build();
+ transactionContext.setException(
+ constructExceptionForFailedRequest(omResponse));
+ return transactionContext;
+ }
+
+ // Get original request
+ OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
+ omRequest.getCreateKeyRequest();
+
+ // Create Applykey Request.
+ OzoneManagerProtocolProtos.ApplyCreateKeyRequest applyCreateKeyRequest =
+ OzoneManagerProtocolProtos.ApplyCreateKeyRequest.newBuilder()
+ .setCreateKeyRequest(createKeyRequest)
+ .setCreateKeyResponse(omResponse.getCreateKeyResponse()).build();
+
+ OMRequest.Builder newOmRequest =
+ OMRequest.newBuilder().setCmdType(
+ OzoneManagerProtocolProtos.Type.ApplyCreateKey)
+ .setApplyCreateKeyRequest(applyCreateKeyRequest)
+ .setClientId(omRequest.getClientId());
+
+ if (omRequest.hasTraceID()) {
+ newOmRequest.setTraceID(omRequest.getTraceID());
}
+
+ ByteString messageContent =
+ ByteString.copyFrom(newOmRequest.build().toByteArray());
+
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
@@ -121,6 +237,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.build();
}
+
/**
* Handle AllocateBlock Request, which needs a special handling. This
* request needs to be executed on the leader, where it connects to SCM and
@@ -148,9 +265,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.build();
- IOException ioe = new IOException(omResponse.getMessage() +
- " Status code " + omResponse.getStatus());
- transactionContext.setException(ioe);
+ transactionContext.setException(
+ constructExceptionForFailedRequest(omResponse));
return transactionContext;
}
@@ -181,6 +297,17 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
}
+ /**
+ * Construct IOException message for failed requests in StartTransaction.
+ * @param omResponse
+ * @return
+ */
+ private IOException constructExceptionForFailedRequest(
+ OMResponse omResponse) {
+ return new IOException(omResponse.getMessage() + " " +
+ STATUS_CODE + omResponse.getStatus());
+ }
+
/*
* Apply a committed log entry to the state machine.
*/
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 3fccf1b..7660ed1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -47,6 +47,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFile
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .ApplyCreateKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartInfoApplyInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessResponse;
@@ -205,6 +209,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
request.getCreateKeyRequest());
responseBuilder.setCreateKeyResponse(createKeyResponse);
break;
+ case ApplyCreateKey:
+ CreateKeyResponse applyKeyResponse =
+ applyCreateKey(request.getApplyCreateKeyRequest());
+ responseBuilder.setCreateKeyResponse(applyKeyResponse);
+ break;
case LookupKey:
LookupKeyResponse lookupKeyResponse = lookupKey(
request.getLookupKeyRequest());
@@ -262,6 +271,13 @@ public class OzoneManagerRequestHandler implements RequestHandler {
responseBuilder.setInitiateMultiPartUploadResponse(
multipartInfoInitiateResponse);
break;
+ case ApplyInitiateMultiPartUpload:
+ MultipartInfoInitiateResponse response =
+ applyInitiateMultiPartUpload(
+ request.getInitiateMultiPartUploadApplyRequest());
+ responseBuilder.setInitiateMultiPartUploadResponse(
+ response);
+ break;
case CommitMultiPartUpload:
MultipartCommitUploadPartResponse commitUploadPartResponse =
commitMultipartUploadPart(
@@ -498,6 +514,20 @@ public class OzoneManagerRequestHandler implements RequestHandler {
return resp.build();
}
+ private CreateKeyResponse applyCreateKey(ApplyCreateKeyRequest request)
+ throws IOException {
+
+ CreateKeyRequest createKeyRequest = request.getCreateKeyRequest();
+ CreateKeyResponse createKeyResponse = request.getCreateKeyResponse();
+
+ impl.applyOpenKey(createKeyRequest.getKeyArgs(),
+ createKeyResponse.getKeyInfo(), createKeyResponse.getID());
+
+ // If applying to om DB successful just return createKeyResponse.
+ return createKeyResponse;
+
+ }
+
private LookupKeyResponse lookupKey(LookupKeyRequest request)
throws IOException {
LookupKeyResponse.Builder resp =
@@ -731,6 +761,30 @@ public class OzoneManagerRequestHandler implements RequestHandler {
return resp.build();
}
+ private MultipartInfoInitiateResponse applyInitiateMultiPartUpload(
+ MultipartInfoApplyInitiateRequest request) throws IOException {
+ MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
+ .newBuilder();
+
+ KeyArgs keyArgs = request.getKeyArgs();
+ OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(keyArgs.getVolumeName())
+ .setBucketName(keyArgs.getBucketName())
+ .setKeyName(keyArgs.getKeyName())
+ .setType(keyArgs.getType())
+ .setFactor(keyArgs.getFactor())
+ .build();
+ OmMultipartInfo multipartInfo =
+ impl.applyInitiateMultipartUpload(omKeyArgs,
+ request.getMultipartUploadID());
+ resp.setVolumeName(multipartInfo.getVolumeName());
+ resp.setBucketName(multipartInfo.getBucketName());
+ resp.setKeyName(multipartInfo.getKeyName());
+ resp.setMultipartUploadID(multipartInfo.getUploadID());
+
+ return resp.build();
+ }
+
private MultipartCommitUploadPartResponse commitMultipartUploadPart(
MultipartCommitUploadPartRequest request) throws IOException {
MultipartCommitUploadPartResponse.Builder resp =
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index 4406af6..9613582 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -251,7 +251,7 @@ public class TestOzoneManagerStateMachine {
// As the request failed, check for keyLocation and the transaction
// context error message
Assert.assertFalse(newOmRequest.getAllocateBlockRequest().hasKeyLocation());
- Assert.assertEquals("Scm in Chill mode Status code "
+ Assert.assertEquals("Scm in Chill mode " + OMException.STATUS_CODE
+ OMException.ResultCodes.SCM_IN_CHILL_MODE,
transactionContext.getException().getMessage());
Assert.assertTrue(transactionContext.getException() instanceof IOException);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org