You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/04/05 04:09:55 UTC
[hadoop] branch trunk updated: HDDS-1379. Convert all OM Volume
related operations to HA model. (#689)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 77fe51e HDDS-1379. Convert all OM Volume related operations to HA model. (#689)
77fe51e is described below
commit 77fe51e13666f7e10ce5fa7bf53b35cdcd4602b6
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Thu Apr 4 21:09:50 2019 -0700
HDDS-1379. Convert all OM Volume related operations to HA model. (#689)
---
.../ozone/om/helpers/OmDeleteVolumeResponse.java | 49 ++++
.../hadoop/ozone/om/helpers/OmVolumeArgs.java | 19 +-
.../om/helpers/OmVolumeOwnerChangeResponse.java | 56 ++++
.../ozone/om/protocol/OzoneManagerHAProtocol.java | 78 ++++++
.../src/main/proto/OzoneManagerProtocol.proto | 7 +
.../org/apache/hadoop/ozone/om/TestOmMetrics.java | 6 +-
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 29 +++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 77 ++++++
.../hadoop/ozone/om/S3BucketManagerImpl.java | 11 +-
.../org/apache/hadoop/ozone/om/VolumeManager.java | 52 +++-
.../apache/hadoop/ozone/om/VolumeManagerImpl.java | 289 ++++++++++++++-------
.../ozone/om/ratis/OzoneManagerStateMachine.java | 65 +++--
...dler.java => OzoneManagerHARequestHandler.java} | 36 +--
.../OzoneManagerHARequestHandlerImpl.java | 247 ++++++++++++++++++
.../protocolPB/OzoneManagerRequestHandler.java | 6 +-
.../hadoop/ozone/protocolPB/RequestHandler.java | 2 +-
.../om/ratis/TestOzoneManagerStateMachine.java | 8 +-
17 files changed, 897 insertions(+), 140 deletions(-)
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java
new file mode 100644
index 0000000..6e96674
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDeleteVolumeResponse.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .VolumeList;
+
+/**
+ * OM response for delete volume request for a ozone volume.
+ */
+public class OmDeleteVolumeResponse {
+ private String volume;
+ private String owner;
+ private VolumeList updatedVolumeList;
+
+ public OmDeleteVolumeResponse(String volume, String owner,
+ VolumeList updatedVolumeList) {
+ this.volume = volume;
+ this.owner = owner;
+ this.updatedVolumeList = updatedVolumeList;
+ }
+
+ public String getVolume() {
+ return volume;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public VolumeList getUpdatedVolumeList() {
+ return updatedVolumeList;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
index 08c17ec..7b25d78 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
@@ -36,10 +36,10 @@ import com.google.common.base.Preconditions;
*/
public final class OmVolumeArgs extends WithMetadata implements Auditable {
private final String adminName;
- private final String ownerName;
+ private String ownerName;
private final String volume;
- private final long creationTime;
- private final long quotaInBytes;
+ private long creationTime;
+ private long quotaInBytes;
private final OmOzoneAclMap aclMap;
/**
@@ -64,6 +64,19 @@ public final class OmVolumeArgs extends WithMetadata implements Auditable {
this.creationTime = creationTime;
}
+
+ public void setOwnerName(String newOwner) {
+ this.ownerName = newOwner;
+ }
+
+ public void setQuotaInBytes(long quotaInBytes) {
+ this.quotaInBytes = quotaInBytes;
+ }
+
+ public void setCreationTime(long time) {
+ this.creationTime = time;
+ }
+
/**
* Returns the Admin Name.
* @return String.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java
new file mode 100644
index 0000000..b691c73
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeOwnerChangeResponse.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .VolumeList;
+
+/**
+ * OM response for owner change request for a ozone volume.
+ */
+public class OmVolumeOwnerChangeResponse {
+ private VolumeList originalOwnerVolumeList;
+ private VolumeList newOwnerVolumeList;
+ private OmVolumeArgs newOwnerVolumeArgs;
+ private String originalOwner;
+
+ public OmVolumeOwnerChangeResponse(VolumeList originalOwnerVolumeList,
+ VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs,
+ String originalOwner) {
+ this.originalOwnerVolumeList = originalOwnerVolumeList;
+ this.newOwnerVolumeList = newOwnerVolumeList;
+ this.newOwnerVolumeArgs = newOwnerVolumeArgs;
+ this.originalOwner = originalOwner;
+ }
+
+ public String getOriginalOwner() {
+ return originalOwner;
+ }
+
+ public VolumeList getOriginalOwnerVolumeList() {
+ return originalOwnerVolumeList;
+ }
+
+ public VolumeList getNewOwnerVolumeList() {
+ return newOwnerVolumeList;
+ }
+
+ public OmVolumeArgs getNewOwnerVolumeArgs() {
+ return newOwnerVolumeArgs;
+ }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index f598997..ad2bc31 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -18,15 +18,20 @@
package org.apache.hadoop.ozone.om.protocol;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .VolumeList;
import java.io.IOException;
@@ -88,4 +93,77 @@ public interface OzoneManagerHAProtocol {
*/
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
String multipartUploadID) throws IOException;
+
+ /**
+ * Start Create Volume Transaction.
+ * @param omVolumeArgs
+ * @return VolumeList
+ * @throws IOException
+ */
+ VolumeList startCreateVolume(OmVolumeArgs omVolumeArgs) throws IOException;
+
+ /**
+ * Apply Create Volume changes to OM DB.
+ * @param omVolumeArgs
+ * @param volumeList
+ * @throws IOException
+ */
+ void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+ VolumeList volumeList) throws IOException;
+
+ /**
+ * Start setOwner Transaction.
+ * @param volume
+ * @param owner
+ * @return OmVolumeOwnerChangeResponse
+ * @throws IOException
+ */
+ OmVolumeOwnerChangeResponse startSetOwner(String volume,
+ String owner) throws IOException;
+
+ /**
+ * Apply Set Quota changes to OM DB.
+ * @param oldOwner
+ * @param oldOwnerVolumeList
+ * @param newOwnerVolumeList
+ * @param newOwnerVolumeArgs
+ * @throws IOException
+ */
+ void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+ VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+ throws IOException;
+
+ /**
+ * Start Set Quota Transaction.
+ * @param volume
+ * @param quota
+ * @return OmVolumeArgs
+ * @throws IOException
+ */
+ OmVolumeArgs startSetQuota(String volume, long quota) throws IOException;
+
+ /**
+ * Apply Set Quota Changes to OM DB.
+ * @param omVolumeArgs
+ * @throws IOException
+ */
+ void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException;
+
+ /**
+ * Start Delete Volume Transaction.
+ * @param volume
+ * @return OmDeleteVolumeResponse
+ * @throws IOException
+ */
+ OmDeleteVolumeResponse startDeleteVolume(String volume) throws IOException;
+
+ /**
+ * Apply Delete Volume changes to OM DB.
+ * @param volume
+ * @param owner
+ * @param newVolumeList
+ * @throws IOException
+ */
+ void applyDeleteVolume(String volume, String owner,
+ VolumeList newVolumeList) throws IOException;
}
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index ffd1eba..4536e87 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -273,6 +273,7 @@ message VolumeInfo {
*/
message CreateVolumeRequest {
required VolumeInfo volumeInfo = 1;
+ optional VolumeList volumeList = 2;
}
message CreateVolumeResponse {
@@ -290,6 +291,10 @@ message SetVolumePropertyRequest {
required string volumeName = 1;
optional string ownerName = 2;
optional uint64 quotaInBytes = 3;
+ optional string originalOwner = 4;
+ optional VolumeList oldOwnerVolumeList = 5;
+ optional VolumeList newOwnerVolumeList = 6;
+ optional VolumeInfo volumeInfo = 7;
}
message SetVolumePropertyResponse {
@@ -326,6 +331,8 @@ message InfoVolumeResponse {
*/
message DeleteVolumeRequest {
required string volumeName = 1;
+ optional string owner = 2;
+ optional VolumeList volumeList = 3;
}
message DeleteVolumeResponse {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index b72117e..5d739c2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -87,11 +87,11 @@ public class TestOmMetrics {
ozoneManager, "volumeManager");
VolumeManager mockVm = Mockito.spy(volumeManager);
- Mockito.doNothing().when(mockVm).createVolume(null);
- Mockito.doNothing().when(mockVm).deleteVolume(null);
+ Mockito.doReturn(null).when(mockVm).createVolume(null);
+ Mockito.doReturn(null).when(mockVm).deleteVolume(null);
Mockito.doReturn(null).when(mockVm).getVolumeInfo(null);
Mockito.doReturn(true).when(mockVm).checkVolumeAccess(null, null);
- Mockito.doNothing().when(mockVm).setOwner(null, null);
+ Mockito.doReturn(null).when(mockVm).setOwner(null, null);
Mockito.doReturn(null).when(mockVm).listVolumes(null, null, null, 0);
HddsWhiteboxTestUtils.setInternalState(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 06009e2..5f62af1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -123,6 +125,33 @@ public class TestOzoneManagerHA {
}
}
+ @Test
+ public void testAllVolumeOperations() throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ objectStore.createVolume(volumeName, createVolumeArgs);
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+ Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+ Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+
+ objectStore.deleteVolume(volumeName);
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+ () -> objectStore.getVolume(volumeName));
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+ () -> objectStore.deleteVolume(volumeName));
+ }
+
/**
* Test a client request when all OM nodes are running. The request should
* succeed.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 7a87b53..e7f1e87 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -79,6 +81,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .VolumeList;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -1630,6 +1634,79 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
+ @Override
+ public VolumeList startCreateVolume(OmVolumeArgs args) throws IOException {
+ // TODO: Need to add metrics and Audit log for HA requests
+ if(isAclEnabled) {
+ checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.CREATE,
+ args.getVolume(), null, null);
+ }
+ VolumeList volumeList = volumeManager.createVolume(args);
+ return volumeList;
+ }
+
+ public void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+ VolumeList volumeList) throws IOException {
+ // TODO: Need to add metrics and Audit log for HA requests
+ volumeManager.applyCreateVolume(omVolumeArgs, volumeList);
+ }
+
+ @Override
+ public OmVolumeOwnerChangeResponse startSetOwner(String volume,
+ String owner) throws IOException {
+ // TODO: Need to add metrics and Audit log for HA requests
+ if (isAclEnabled) {
+ checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume,
+ null, null);
+ }
+ return volumeManager.setOwner(volume, owner);
+ }
+
+ @Override
+ public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+ VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+ throws IOException {
+ // TODO: Need to add metrics and Audit log for HA requests
+ volumeManager.applySetOwner(oldOwner, oldOwnerVolumeList,
+ newOwnerVolumeList, newOwnerVolumeArgs);
+ }
+
+ @Override
+ public OmVolumeArgs startSetQuota(String volume, long quota)
+ throws IOException {
+ // TODO: Need to add metrics and Audit log for HA requests
+ if (isAclEnabled) {
+ checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume,
+ null, null);
+ }
+ return volumeManager.setQuota(volume, quota);
+ }
+
+ @Override
+ public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException {
+ // TODO: Need to add metrics and Audit log for HA requests
+ volumeManager.applySetQuota(omVolumeArgs);
+ }
+
+ @Override
+ public OmDeleteVolumeResponse startDeleteVolume(String volume)
+ throws IOException {
+ if(isAclEnabled) {
+ checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.DELETE, volume,
+ null, null);
+ }
+ // TODO: Need to add metrics and Audit log for HA requests
+ return volumeManager.deleteVolume(volume);
+ }
+
+ @Override
+ public void applyDeleteVolume(String volume, String owner,
+ VolumeList newVolumeList) throws IOException {
+ // TODO: Need to add metrics and Audit log for HA requests
+ volumeManager.applyDeleteVolume(volume, owner, newVolumeList);
+ }
+
+
/**
* Checks if current caller has acl permissions.
*
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
index d0d84f8..440a45e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
@@ -50,6 +50,7 @@ public class S3BucketManagerImpl implements S3BucketManager {
private final OMMetadataManager omMetadataManager;
private final VolumeManager volumeManager;
private final BucketManager bucketManager;
+ private final boolean isRatisEnabled;
/**
* Construct an S3 Bucket Manager Object.
@@ -66,6 +67,9 @@ public class S3BucketManagerImpl implements S3BucketManager {
this.omMetadataManager = omMetadataManager;
this.volumeManager = volumeManager;
this.bucketManager = bucketManager;
+ isRatisEnabled = configuration.getBoolean(
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
}
@Override
@@ -166,7 +170,12 @@ public class S3BucketManagerImpl implements S3BucketManager {
.setVolume(ozoneVolumeName)
.setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES)
.build();
- volumeManager.createVolume(args);
+ if (isRatisEnabled) {
+ // When ratis is enabled we need to call apply also.
+ volumeManager.applyCreateVolume(args, volumeManager.createVolume(args));
+ } else {
+ volumeManager.createVolume(args);
+ }
} catch (OMException exp) {
newVolumeCreate = false;
if (exp.getResult().compareTo(VOLUME_ALREADY_EXISTS) == 0) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
index f25fce4..a4e20c7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
@@ -16,9 +16,13 @@
*/
package org.apache.hadoop.ozone.om;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .VolumeList;
import java.io.IOException;
import java.util.List;
@@ -32,7 +36,17 @@ public interface VolumeManager {
* Create a new volume.
* @param args - Volume args to create a volume
*/
- void createVolume(OmVolumeArgs args) throws IOException;
+ VolumeList createVolume(OmVolumeArgs args)
+ throws IOException;
+
+ /**
+ * Apply Create Volume changes to OM DB.
+ * @param omVolumeArgs
+ * @param volumeList
+ * @throws IOException
+ */
+ void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+ VolumeList volumeList) throws IOException;
/**
* Changes the owner of a volume.
@@ -41,7 +55,20 @@ public interface VolumeManager {
* @param owner - Name of the owner.
* @throws IOException
*/
- void setOwner(String volume, String owner) throws IOException;
+ OmVolumeOwnerChangeResponse setOwner(String volume, String owner)
+ throws IOException;
+
+ /**
+ * Apply Set Owner changes to OM DB.
+ * @param oldOwner
+ * @param oldOwnerVolumeList
+ * @param newOwnerVolumeList
+ * @param newOwnerVolumeArgs
+ * @throws IOException
+ */
+ void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+ VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+ throws IOException;
/**
* Changes the Quota on a volume.
@@ -50,7 +77,14 @@ public interface VolumeManager {
* @param quota - Quota in bytes.
* @throws IOException
*/
- void setQuota(String volume, long quota) throws IOException;
+ OmVolumeArgs setQuota(String volume, long quota) throws IOException;
+
+ /**
+ * Apply Set Quota changes to OM DB.
+ * @param omVolumeArgs
+ * @throws IOException
+ */
+ void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException;
/**
* Gets the volume information.
@@ -66,7 +100,17 @@ public interface VolumeManager {
* @param volume - Name of the volume.
* @throws IOException
*/
- void deleteVolume(String volume) throws IOException;
+ OmDeleteVolumeResponse deleteVolume(String volume) throws IOException;
+
+ /**
+ * Apply Delete Volume changes to OM DB.
+ * @param volume
+ * @param owner
+ * @param newVolumeList
+ * @throws IOException
+ */
+ void applyDeleteVolume(String volume, String owner,
+ VolumeList newVolumeList) throws IOException;
/**
* Checks if the specified user with a role can access this volume.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index c83f733..872d7b6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -22,17 +22,18 @@ import java.util.List;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
-import org.apache.hadoop.utils.RocksDBStore;
import org.apache.hadoop.utils.db.BatchOperation;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.rocksdb.RocksDBException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class VolumeManagerImpl implements VolumeManager {
private final OMMetadataManager metadataManager;
private final int maxUserVolumeCount;
+ private final boolean isRatisEnabled;
/**
* Constructor.
@@ -52,15 +54,18 @@ public class VolumeManagerImpl implements VolumeManager {
* @throws IOException
*/
public VolumeManagerImpl(OMMetadataManager metadataManager,
- OzoneConfiguration conf) throws IOException {
+ OzoneConfiguration conf) {
this.metadataManager = metadataManager;
this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
OZONE_OM_USER_MAX_VOLUME_DEFAULT);
+ isRatisEnabled = conf.getBoolean(
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
}
// Helpers to add and delete volume from user list
- private void addVolumeToOwnerList(String volume, String owner,
- BatchOperation batchOperation) throws IOException {
+ private VolumeList addVolumeToOwnerList(String volume, String owner)
+ throws IOException {
// Get the volume list
String dbUserKey = metadataManager.getUserKey(owner);
VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
@@ -72,22 +77,22 @@ public class VolumeManagerImpl implements VolumeManager {
// Check the volume count
if (prevVolList.size() >= maxUserVolumeCount) {
LOG.debug("Too many volumes for user:{}", owner);
- throw new OMException(ResultCodes.USER_TOO_MANY_VOLUMES);
+ throw new OMException("Too many volumes for user:" + owner,
+ ResultCodes.USER_TOO_MANY_VOLUMES);
}
// Add the new volume to the list
prevVolList.add(volume);
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
- metadataManager.getUserTable().putWithBatch(batchOperation,
- dbUserKey, newVolList);
+
+ return newVolList;
}
- private void delVolumeFromOwnerList(String volume, String owner,
- BatchOperation batch) throws RocksDBException, IOException {
+ private VolumeList delVolumeFromOwnerList(String volume, String owner)
+ throws IOException {
// Get the volume list
- String dbUserKey = metadataManager.getUserKey(owner);
- VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
+ VolumeList volumeList = metadataManager.getUserTable().get(owner);
List<String> prevVolList = new ArrayList<>();
if (volumeList != null) {
prevVolList.addAll(volumeList.getVolumeNamesList());
@@ -98,58 +103,90 @@ public class VolumeManagerImpl implements VolumeManager {
// Remove the volume from the list
prevVolList.remove(volume);
- if (prevVolList.size() == 0) {
- metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey);
- } else {
- VolumeList newVolList = VolumeList.newBuilder()
- .addAllVolumeNames(prevVolList).build();
- metadataManager.getUserTable().putWithBatch(batch,
- dbUserKey, newVolList);
- }
+ VolumeList newVolList = VolumeList.newBuilder()
+ .addAllVolumeNames(prevVolList).build();
+ return newVolList;
}
/**
* Creates a volume.
- * @param args - OmVolumeArgs.
+ * @param omVolumeArgs - OmVolumeArgs.
+ * @return VolumeList
*/
@Override
- public void createVolume(OmVolumeArgs args) throws IOException {
- Preconditions.checkNotNull(args);
- metadataManager.getLock().acquireUserLock(args.getOwnerName());
- metadataManager.getLock().acquireVolumeLock(args.getVolume());
+ public VolumeList createVolume(OmVolumeArgs omVolumeArgs) throws IOException {
+ Preconditions.checkNotNull(omVolumeArgs);
+ metadataManager.getLock().acquireUserLock(omVolumeArgs.getOwnerName());
+ metadataManager.getLock().acquireVolumeLock(omVolumeArgs.getVolume());
try {
- String dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
+ String dbVolumeKey = metadataManager.getVolumeKey(
+ omVolumeArgs.getVolume());
+ String dbUserKey = metadataManager.getUserKey(
+ omVolumeArgs.getOwnerName());
OmVolumeArgs volumeInfo =
metadataManager.getVolumeTable().get(dbVolumeKey);
// Check of the volume already exists
if (volumeInfo != null) {
- LOG.debug("volume:{} already exists", args.getVolume());
+ LOG.debug("volume:{} already exists", omVolumeArgs.getVolume());
throw new OMException(ResultCodes.VOLUME_ALREADY_EXISTS);
}
- try (BatchOperation batch = metadataManager.getStore()
- .initBatchOperation()) {
- // Write the vol info
- metadataManager.getVolumeTable().putWithBatch(batch,
- dbVolumeKey, args);
+ VolumeList volumeList = addVolumeToOwnerList(omVolumeArgs.getVolume(),
+ omVolumeArgs.getOwnerName());
- // Add volume to user list
- addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
- metadataManager.getStore().commitBatchOperation(batch);
+ // Set creation time
+ omVolumeArgs.setCreationTime(System.currentTimeMillis());
+
+ if (!isRatisEnabled) {
+ createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey,
+ dbUserKey);
}
- LOG.debug("created volume:{} user:{}", args.getVolume(),
- args.getOwnerName());
+ LOG.debug("created volume:{} user:{}", omVolumeArgs.getVolume(),
+ omVolumeArgs.getOwnerName());
+ return volumeList;
} catch (IOException ex) {
if (!(ex instanceof OMException)) {
LOG.error("Volume creation failed for user:{} volume:{}",
- args.getOwnerName(), args.getVolume(), ex);
- } else {
- throw (IOException) ex;
+ omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex);
}
+ throw ex;
} finally {
- metadataManager.getLock().releaseVolumeLock(args.getVolume());
- metadataManager.getLock().releaseUserLock(args.getOwnerName());
+ metadataManager.getLock().releaseVolumeLock(omVolumeArgs.getVolume());
+ metadataManager.getLock().releaseUserLock(omVolumeArgs.getOwnerName());
+ }
+ }
+
+
+ @Override
+ public void applyCreateVolume(OmVolumeArgs omVolumeArgs,
+ VolumeList volumeList) throws IOException {
+ // Do we need to hold lock in apply Transactions requests?
+ String dbVolumeKey = metadataManager.getVolumeKey(omVolumeArgs.getVolume());
+ String dbUserKey = metadataManager.getUserKey(omVolumeArgs.getOwnerName());
+ try {
+ createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey, dbUserKey);
+ } catch (IOException ex) {
+ LOG.error("Volume creation failed for user:{} volume:{}",
+ omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex);
+ throw ex;
+ }
+ }
+
+ private void createVolumeCommitToDB(OmVolumeArgs omVolumeArgs,
+ VolumeList volumeList, String dbVolumeKey, String dbUserKey)
+ throws IOException {
+ try (BatchOperation batch = metadataManager.getStore()
+ .initBatchOperation()) {
+ // Write the vol info
+ metadataManager.getVolumeTable().putWithBatch(batch, dbVolumeKey,
+ omVolumeArgs);
+ metadataManager.getUserTable().putWithBatch(batch, dbUserKey,
+ volumeList);
+ // Add volume to user list
+ metadataManager.getStore().commitBatchOperation(batch);
+ } catch (IOException ex) {
+ throw ex;
}
}
@@ -161,7 +198,8 @@ public class VolumeManagerImpl implements VolumeManager {
* @throws IOException
*/
@Override
- public void setOwner(String volume, String owner) throws IOException {
+ public OmVolumeOwnerChangeResponse setOwner(String volume, String owner)
+ throws IOException {
Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(owner);
metadataManager.getLock().acquireUserLock(owner);
@@ -179,49 +217,84 @@ public class VolumeManagerImpl implements VolumeManager {
Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
- try (BatchOperation batch = metadataManager.getStore()
- .initBatchOperation()) {
- delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
- addVolumeToOwnerList(volume, owner, batch);
-
- OmVolumeArgs newVolumeArgs =
- OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
- .setAdminName(volumeArgs.getAdminName())
- .setOwnerName(owner)
- .setQuotaInBytes(volumeArgs.getQuotaInBytes())
- .setCreationTime(volumeArgs.getCreationTime())
- .build();
-
- metadataManager.getVolumeTable().putWithBatch(batch,
- dbVolumeKey, newVolumeArgs);
- metadataManager.getStore().commitBatchOperation(batch);
+ String originalOwner =
+ metadataManager.getUserKey(volumeArgs.getOwnerName());
+ VolumeList oldOwnerVolumeList = delVolumeFromOwnerList(volume,
+ originalOwner);
+
+ String newOwner = metadataManager.getUserKey(owner);
+ VolumeList newOwnerVolumeList = addVolumeToOwnerList(volume, newOwner);
+
+ volumeArgs.setOwnerName(owner);
+ if (!isRatisEnabled) {
+ setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList,
+ volumeArgs, owner);
}
- } catch (RocksDBException | IOException ex) {
+ return new OmVolumeOwnerChangeResponse(oldOwnerVolumeList,
+ newOwnerVolumeList, volumeArgs, originalOwner);
+ } catch (IOException ex) {
if (!(ex instanceof OMException)) {
LOG.error("Changing volume ownership failed for user:{} volume:{}",
owner, volume, ex);
}
- if(ex instanceof RocksDBException) {
- throw RocksDBStore.toIOException("Volume creation failed.",
- (RocksDBException) ex);
- } else {
- throw (IOException) ex;
- }
+ throw ex;
} finally {
metadataManager.getLock().releaseVolumeLock(volume);
metadataManager.getLock().releaseUserLock(owner);
}
}
+ @Override
+ public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
+ VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
+ throws IOException {
+ try {
+ setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList,
+ newOwnerVolumeArgs, oldOwner);
+ } catch (IOException ex) {
+ LOG.error("Changing volume ownership failed for user:{} volume:{}",
+ newOwnerVolumeArgs.getOwnerName(), newOwnerVolumeArgs.getVolume(),
+ ex);
+ throw ex;
+ }
+ }
+
+
+ private void setOwnerCommitToDB(VolumeList oldOwnerVolumeList,
+ VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs,
+ String oldOwner) throws IOException {
+ try (BatchOperation batch = metadataManager.getStore()
+ .initBatchOperation()) {
+ if (oldOwnerVolumeList.getVolumeNamesList().size() == 0) {
+ metadataManager.getUserTable().deleteWithBatch(batch, oldOwner);
+ } else {
+ metadataManager.getUserTable().putWithBatch(batch, oldOwner,
+ oldOwnerVolumeList);
+ }
+ metadataManager.getUserTable().putWithBatch(batch,
+ newOwnerVolumeArgs.getOwnerName(),
+ newOwnerVolumeList);
+
+ String dbVolumeKey =
+ metadataManager.getVolumeKey(newOwnerVolumeArgs.getVolume());
+ metadataManager.getVolumeTable().putWithBatch(batch,
+ dbVolumeKey, newOwnerVolumeArgs);
+ metadataManager.getStore().commitBatchOperation(batch);
+ }
+ }
+
+
/**
* Changes the Quota on a volume.
*
* @param volume - Name of the volume.
* @param quota - Quota in bytes.
+ *
+ * @return OmVolumeArgs
* @throws IOException
*/
@Override
- public void setQuota(String volume, long quota) throws IOException {
+ public OmVolumeArgs setQuota(String volume, long quota) throws IOException {
Preconditions.checkNotNull(volume);
metadataManager.getLock().acquireVolumeLock(volume);
try {
@@ -235,15 +308,13 @@ public class VolumeManagerImpl implements VolumeManager {
Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
- OmVolumeArgs newVolumeArgs =
- OmVolumeArgs.newBuilder()
- .setVolume(volumeArgs.getVolume())
- .setAdminName(volumeArgs.getAdminName())
- .setOwnerName(volumeArgs.getOwnerName())
- .setQuotaInBytes(quota)
- .setCreationTime(volumeArgs.getCreationTime()).build();
- metadataManager.getVolumeTable().put(dbVolumeKey, newVolumeArgs);
+ volumeArgs.setQuotaInBytes(quota);
+
+ if (!isRatisEnabled) {
+ metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
+ }
+ return volumeArgs;
} catch (IOException ex) {
if (!(ex instanceof OMException)) {
LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
@@ -255,6 +326,19 @@ public class VolumeManagerImpl implements VolumeManager {
}
}
+ @Override
+ public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException {
+ try {
+ String dbVolumeKey = metadataManager.getVolumeKey(
+ omVolumeArgs.getVolume());
+ metadataManager.getVolumeTable().put(dbVolumeKey, omVolumeArgs);
+ } catch (IOException ex) {
+ LOG.error("Changing volume quota failed for volume:{} quota:{}",
+ omVolumeArgs.getVolume(), omVolumeArgs.getQuotaInBytes(), ex);
+ throw ex;
+ }
+ }
+
/**
* Gets the volume information.
* @param volume - Volume name.
@@ -290,10 +374,12 @@ public class VolumeManagerImpl implements VolumeManager {
* Deletes an existing empty volume.
*
* @param volume - Name of the volume.
+ *
+ * @return OmDeleteVolumeResponse
* @throws IOException
*/
@Override
- public void deleteVolume(String volume) throws IOException {
+ public OmDeleteVolumeResponse deleteVolume(String volume) throws IOException {
Preconditions.checkNotNull(volume);
String owner;
metadataManager.getLock().acquireVolumeLock(volume);
@@ -305,7 +391,6 @@ public class VolumeManagerImpl implements VolumeManager {
metadataManager.getLock().acquireUserLock(owner);
metadataManager.getLock().acquireVolumeLock(volume);
try {
-
String dbVolumeKey = metadataManager.getVolumeKey(volume);
OmVolumeArgs volumeArgs =
metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -322,28 +407,54 @@ public class VolumeManagerImpl implements VolumeManager {
Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
// delete the volume from the owner list
// as well as delete the volume entry
- try (BatchOperation batch = metadataManager.getStore()
- .initBatchOperation()) {
- delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
- metadataManager.getVolumeTable().deleteWithBatch(batch, dbVolumeKey);
- metadataManager.getStore().commitBatchOperation(batch);
+ VolumeList newVolumeList = delVolumeFromOwnerList(volume,
+ volumeArgs.getOwnerName());
+
+ if (!isRatisEnabled) {
+ deleteVolumeCommitToDB(newVolumeList,
+ volume, owner);
}
- } catch (RocksDBException| IOException ex) {
+ return new OmDeleteVolumeResponse(volume, owner, newVolumeList);
+ } catch (IOException ex) {
if (!(ex instanceof OMException)) {
LOG.error("Delete volume failed for volume:{}", volume, ex);
}
- if(ex instanceof RocksDBException) {
- throw RocksDBStore.toIOException("Volume creation failed.",
- (RocksDBException) ex);
- } else {
- throw (IOException) ex;
- }
+ throw ex;
} finally {
metadataManager.getLock().releaseVolumeLock(volume);
metadataManager.getLock().releaseUserLock(owner);
}
}
+ @Override
+ public void applyDeleteVolume(String volume, String owner,
+ VolumeList newVolumeList) throws IOException {
+ try {
+ deleteVolumeCommitToDB(newVolumeList, volume, owner);
+ } catch (IOException ex) {
+ LOG.error("Delete volume failed for volume:{}", volume,
+ ex);
+ throw ex;
+ }
+ }
+
+ private void deleteVolumeCommitToDB(VolumeList newVolumeList,
+ String volume, String owner) throws IOException {
+ try (BatchOperation batch = metadataManager.getStore()
+ .initBatchOperation()) {
+ String dbUserKey = metadataManager.getUserKey(owner);
+ if (newVolumeList.getVolumeNamesList().size() == 0) {
+ metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey);
+ } else {
+ metadataManager.getUserTable().putWithBatch(batch, dbUserKey,
+ newVolumeList);
+ }
+ metadataManager.getVolumeTable().deleteWithBatch(batch,
+ metadataManager.getVolumeKey(volume));
+ metadataManager.getStore().commitBatchOperation(batch);
+ }
+ }
+
/**
* Checks if the specified user with a role can access this volume.
*
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 2f3445a..919709c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -36,8 +37,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
-import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
-import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
@@ -67,14 +68,14 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
new SimpleStateMachineStorage();
private final OzoneManagerRatisServer omRatisServer;
private final OzoneManagerServerProtocol ozoneManager;
- private RequestHandler handler;
+ private OzoneManagerHARequestHandler handler;
private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
this.ozoneManager = omRatisServer.getOzoneManager();
- this.handler = new OzoneManagerRequestHandler(ozoneManager);
+ this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager);
}
/**
@@ -185,21 +186,53 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
private TransactionContext handleStartTransactionRequests(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
- switch (omRequest.getCmdType()) {
- case AllocateBlock:
- return handleAllocateBlock(raftClientRequest, omRequest);
- case CreateKey:
- return handleCreateKeyRequest(raftClientRequest, omRequest);
- case InitiateMultiPartUpload:
- return handleInitiateMultipartUpload(raftClientRequest, omRequest);
- default:
- return TransactionContext.newBuilder()
+ OMRequest newOmRequest = null;
+ try {
+ switch (omRequest.getCmdType()) {
+ case CreateVolume:
+ case SetVolumeProperty:
+ case DeleteVolume:
+ newOmRequest = handler.handleStartTransaction(omRequest);
+ break;
+ case AllocateBlock:
+ return handleAllocateBlock(raftClientRequest, omRequest);
+ case CreateKey:
+ return handleCreateKeyRequest(raftClientRequest, omRequest);
+ case InitiateMultiPartUpload:
+ return handleInitiateMultipartUpload(raftClientRequest, omRequest);
+ default:
+ return TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .setLogData(raftClientRequest.getMessage().getContent())
+ .build();
+ }
+ } catch (IOException ex) {
+ TransactionContext transactionContext = TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
- .setLogData(raftClientRequest.getMessage().getContent())
.build();
+ if (ex instanceof OMException) {
+ IOException ioException =
+ new IOException(ex.getMessage() + STATUS_CODE +
+ ((OMException) ex).getResult());
+ transactionContext.setException(ioException);
+ } else {
+ transactionContext.setException(ex);
+ }
+ LOG.error("Exception in startTransaction for cmdType " +
+ omRequest.getCmdType(), ex);
+ return transactionContext;
}
+ TransactionContext transactionContext = TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .setLogData(OMRatisHelper.convertRequestToByteString(newOmRequest))
+ .build();
+ return transactionContext;
}
private TransactionContext handleInitiateMultipartUpload(
@@ -367,7 +400,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
* @throws ServiceException
*/
private Message runCommand(OMRequest request, long trxLogIndex) {
- OMResponse response = handler.handle(request);
+ OMResponse response = handler.handleApplyTransaction(request);
lastAppliedIndex = trxLogIndex;
return OMRatisHelper.convertResponseToMessage(response);
}
@@ -394,7 +427,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
}
@VisibleForTesting
- public void setHandler(RequestHandler handler) {
+ public void setHandler(OzoneManagerHARequestHandler handler) {
this.handler = handler;
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java
similarity index 60%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java
index 367efd4..1ccac3b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java
@@ -17,32 +17,32 @@
package org.apache.hadoop.ozone.protocolPB;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
- OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
- OMResponse;
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
/**
- * Handler to handle the OmRequests.
+ * Handler to handle OM requests in OM HA.
*/
-public interface RequestHandler {
-
+public interface OzoneManagerHARequestHandler extends RequestHandler {
/**
- * Handle the OmRequest, and returns OmResponse.
- * @param request
- * @return OmResponse
+ * Handle start Transaction Requests from OzoneManager StateMachine.
+ * @param omRequest
+ * @return OMRequest - New OM Request which will be applied during apply
+ * Transaction
+ * @throws IOException
*/
- OMResponse handle(OMRequest request);
+ OMRequest handleStartTransaction(OMRequest omRequest) throws IOException;
/**
- * Validates that the incoming OM request has required parameters.
- * TODO: Add more validation checks before writing the request to Ratis log.
- *
- * @param omRequest client request to OM
- * @throws OMException thrown if required parameters are set to null.
+ * Handle Apply Transaction Requests from OzoneManager StateMachine.
+ * @param omRequest
+ * @return OMResponse
*/
- void validateRequest(OMRequest omRequest) throws OMException;
+ OMResponse handleApplyTransaction(OMRequest omRequest);
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
new file mode 100644
index 0000000..aada6e1
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .DeleteVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .DeleteVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .SetVolumePropertyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .SetVolumePropertyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .Status;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .VolumeList;
+
+/**
+ * Command Handler for OM requests. OM State Machine calls this handler for
+ * deserializing the client request and sending it to OM.
+ */
+public class OzoneManagerHARequestHandlerImpl
+ extends OzoneManagerRequestHandler implements OzoneManagerHARequestHandler {
+
+ public OzoneManagerHARequestHandlerImpl(OzoneManagerServerProtocol om) {
+ super(om);
+ }
+
+ @Override
+ public OMRequest handleStartTransaction(OMRequest omRequest)
+ throws IOException {
+ LOG.debug("Received OMRequest: {}, ", omRequest);
+ Type cmdType = omRequest.getCmdType();
+ OMRequest newOmRequest = null;
+ switch (cmdType) {
+ case CreateVolume:
+ newOmRequest = handleCreateVolumeStart(omRequest);
+ break;
+ case SetVolumeProperty:
+ newOmRequest = handleSetVolumePropertyStart(omRequest);
+ break;
+ case DeleteVolume:
+ newOmRequest = handleDeleteVolumeStart(omRequest);
+ break;
+ default:
+ throw new IOException("Unrecognized Command Type:" + cmdType);
+ }
+ return newOmRequest;
+ }
+
+
+ @Override
+ public OMResponse handleApplyTransaction(OMRequest omRequest) {
+ LOG.debug("Received OMRequest: {}, ", omRequest);
+ Type cmdType = omRequest.getCmdType();
+ OMResponse.Builder responseBuilder =
+ OMResponse.newBuilder().setCmdType(cmdType)
+ .setStatus(Status.OK);
+ try {
+ switch (cmdType) {
+ case CreateVolume:
+ responseBuilder.setCreateVolumeResponse(
+ handleCreateVolumeApply(omRequest));
+ break;
+ case SetVolumeProperty:
+ responseBuilder.setSetVolumePropertyResponse(
+ handleSetVolumePropertyApply(omRequest));
+ break;
+ case DeleteVolume:
+ responseBuilder.setDeleteVolumeResponse(
+ handleDeleteVolumeApply(omRequest));
+ break;
+ default:
+ // As all request types are not changed so we need to call handle
+ // here.
+ return handle(omRequest);
+ }
+ responseBuilder.setSuccess(true);
+ } catch (IOException ex) {
+ responseBuilder.setSuccess(false);
+ responseBuilder.setStatus(exceptionToResponseStatus(ex));
+ if (ex.getMessage() != null) {
+ responseBuilder.setMessage(ex.getMessage());
+ }
+ }
+ return responseBuilder.build();
+ }
+
+
+ private OMRequest handleCreateVolumeStart(OMRequest omRequest)
+ throws IOException {
+ VolumeInfo volumeInfo = omRequest.getCreateVolumeRequest().getVolumeInfo();
+ OzoneManagerProtocolProtos.VolumeList volumeList =
+ getOzoneManagerServerProtocol().startCreateVolume(
+ OmVolumeArgs.getFromProtobuf(volumeInfo));
+
+ CreateVolumeRequest createVolumeRequest =
+ CreateVolumeRequest.newBuilder().setVolumeInfo(volumeInfo)
+ .setVolumeList(volumeList).build();
+ return omRequest.toBuilder().setCreateVolumeRequest(createVolumeRequest)
+ .build();
+ }
+
+ private CreateVolumeResponse handleCreateVolumeApply(OMRequest omRequest)
+ throws IOException {
+ OzoneManagerProtocolProtos.VolumeInfo volumeInfo =
+ omRequest.getCreateVolumeRequest().getVolumeInfo();
+ VolumeList volumeList =
+ omRequest.getCreateVolumeRequest().getVolumeList();
+ getOzoneManagerServerProtocol().applyCreateVolume(
+ OmVolumeArgs.getFromProtobuf(volumeInfo),
+ volumeList);
+ return CreateVolumeResponse.newBuilder().build();
+ }
+
+ private OMRequest handleSetVolumePropertyStart(OMRequest omRequest)
+ throws IOException {
+ SetVolumePropertyRequest setVolumePropertyRequest =
+ omRequest.getSetVolumePropertyRequest();
+ String volume = setVolumePropertyRequest.getVolumeName();
+ OMRequest newOmRequest = null;
+ if (setVolumePropertyRequest.hasQuotaInBytes()) {
+ long quota = setVolumePropertyRequest.getQuotaInBytes();
+ OmVolumeArgs omVolumeArgs =
+ getOzoneManagerServerProtocol().startSetQuota(volume, quota);
+ SetVolumePropertyRequest newSetVolumePropertyRequest =
+ SetVolumePropertyRequest.newBuilder().setVolumeName(volume)
+ .setVolumeInfo(omVolumeArgs.getProtobuf()).build();
+ newOmRequest =
+ omRequest.toBuilder().setSetVolumePropertyRequest(
+ newSetVolumePropertyRequest).build();
+ } else {
+ String owner = setVolumePropertyRequest.getOwnerName();
+ OmVolumeOwnerChangeResponse omVolumeOwnerChangeResponse =
+ getOzoneManagerServerProtocol().startSetOwner(volume, owner);
+ // If volumeLists become large and as ratis writes the request to disk we
+ // might take more space if the lists become very big in size. We might
+ // need to revisit this if it becomes problem
+ SetVolumePropertyRequest newSetVolumePropertyRequest =
+ SetVolumePropertyRequest.newBuilder().setVolumeName(volume)
+ .setOwnerName(owner)
+ .setOriginalOwner(omVolumeOwnerChangeResponse.getOriginalOwner())
+ .setNewOwnerVolumeList(
+ omVolumeOwnerChangeResponse.getNewOwnerVolumeList())
+ .setOldOwnerVolumeList(
+ omVolumeOwnerChangeResponse.getOriginalOwnerVolumeList())
+ .setVolumeInfo(
+ omVolumeOwnerChangeResponse.getNewOwnerVolumeArgs()
+ .getProtobuf()).build();
+ newOmRequest =
+ omRequest.toBuilder().setSetVolumePropertyRequest(
+ newSetVolumePropertyRequest).build();
+ }
+ return newOmRequest;
+ }
+
+
+ private SetVolumePropertyResponse handleSetVolumePropertyApply(
+ OMRequest omRequest) throws IOException {
+ SetVolumePropertyRequest setVolumePropertyRequest =
+ omRequest.getSetVolumePropertyRequest();
+
+ if (setVolumePropertyRequest.hasQuotaInBytes()) {
+ getOzoneManagerServerProtocol().applySetQuota(
+ OmVolumeArgs.getFromProtobuf(
+ setVolumePropertyRequest.getVolumeInfo()));
+ } else {
+ getOzoneManagerServerProtocol().applySetOwner(
+ setVolumePropertyRequest.getOriginalOwner(),
+ setVolumePropertyRequest.getOldOwnerVolumeList(),
+ setVolumePropertyRequest.getNewOwnerVolumeList(),
+ OmVolumeArgs.getFromProtobuf(
+ setVolumePropertyRequest.getVolumeInfo()));
+ }
+ return SetVolumePropertyResponse.newBuilder().build();
+ }
+
+ private OMRequest handleDeleteVolumeStart(OMRequest omRequest)
+ throws IOException {
+ DeleteVolumeRequest deleteVolumeRequest =
+ omRequest.getDeleteVolumeRequest();
+
+ String volume = deleteVolumeRequest.getVolumeName();
+
+ OmDeleteVolumeResponse omDeleteVolumeResponse =
+ getOzoneManagerServerProtocol().startDeleteVolume(volume);
+
+ DeleteVolumeRequest newDeleteVolumeRequest =
+ DeleteVolumeRequest.newBuilder().setVolumeList(
+ omDeleteVolumeResponse.getUpdatedVolumeList())
+ .setVolumeName(omDeleteVolumeResponse.getVolume())
+ .setOwner(omDeleteVolumeResponse.getOwner()).build();
+
+ return omRequest.toBuilder().setDeleteVolumeRequest(
+ newDeleteVolumeRequest).build();
+
+ }
+
+
+ private DeleteVolumeResponse handleDeleteVolumeApply(OMRequest omRequest)
+ throws IOException {
+
+ DeleteVolumeRequest deleteVolumeRequest =
+ omRequest.getDeleteVolumeRequest();
+
+ getOzoneManagerServerProtocol().applyDeleteVolume(
+ deleteVolumeRequest.getVolumeName(), deleteVolumeRequest.getOwner(),
+ deleteVolumeRequest.getVolumeList());
+
+ return DeleteVolumeResponse.newBuilder().build();
+ }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 04b4881..1200d17 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -364,7 +364,7 @@ public class OzoneManagerRequestHandler implements RequestHandler {
}
// Convert and exception to corresponding status code
- private Status exceptionToResponseStatus(IOException ex) {
+ protected Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof OMException) {
return Status.values()[((OMException) ex).getResult().ordinal()];
} else {
@@ -1027,4 +1027,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
.setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf())
.build();
}
+
+ protected OzoneManagerServerProtocol getOzoneManagerServerProtocol() {
+ return impl;
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
index 367efd4..f19dc48 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
*/
public interface RequestHandler {
-
/**
* Handle the OmRequest, and returns OmResponse.
* @param request
@@ -36,6 +35,7 @@ public interface RequestHandler {
*/
OMResponse handle(OMRequest request);
+
/**
* Validates that the incoming OM request has required parameters.
* TODO: Add more validation checks before writing the request to Ratis log.
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index 9613582..b14eac7 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
-import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
-import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
@@ -72,7 +72,7 @@ public class TestOzoneManagerStateMachine {
private OzoneConfiguration conf;
private OzoneManagerRatisServer omRatisServer;
private String omID;
- private RequestHandler requestHandler;
+ private OzoneManagerHARequestHandler requestHandler;
private RaftGroupId raftGroupId;
private OzoneManagerStateMachine ozoneManagerStateMachine;
@@ -105,7 +105,7 @@ public class TestOzoneManagerStateMachine {
ozoneManagerStateMachine =
new OzoneManagerStateMachine(omRatisServer);
- requestHandler = Mockito.mock(OzoneManagerRequestHandler.class);
+ requestHandler = Mockito.mock(OzoneManagerHARequestHandlerImpl.class);
raftGroupId = omRatisServer.getRaftGroup().getGroupId();
ozoneManagerStateMachine.setHandler(requestHandler);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org