You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/11/24 00:51:29 UTC

[GitHub] [ozone] errose28 commented on a change in pull request #2857: HDDS-6022. [Multi-Tenant] Implement DeleteTenant: `ozone tenant delete`

errose28 commented on a change in pull request #2857:
URL: https://github.com/apache/ozone/pull/2857#discussion_r755504110



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
##########
@@ -668,29 +670,63 @@ public void revokeS3Secret(String kerberosID) throws IOException {
    * {@inheritDoc}
    */
   @Override
-  public void createTenant(String tenantName) throws IOException {
-    Preconditions.checkArgument(Strings.isNotBlank(tenantName),
+  public void createTenant(String tenantId) throws IOException {
+    Preconditions.checkArgument(Strings.isNotBlank(tenantId),
         "tenantName cannot be null or empty.");
-    ozoneManagerClient.createTenant(tenantName);
+    ozoneManagerClient.createTenant(tenantId);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void createTenant(String tenantId, OmTenantArgs omTenantArgs)

Review comment:
       Looking at RpcClient#createVolume I think the convention is that things in the ozone.client package are passed to the RpcClient, which then converts them to the corresponding type from om.helpers package before passing to the RPC call. We should probably follow this convention here.

##########
File path: hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
##########
@@ -84,6 +97,14 @@ private OmVolumeArgs(String adminName, String ownerName, String volume,
     this.updateID = updateID;
   }
 
+  public long getRefCount() {
+    assert (refCount >= 0);

Review comment:
       nit. We can use the Preconditions library with a message for this check to be consistent with existing code.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
##########
@@ -141,7 +142,9 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
     // A caveat is that this assumes OM's auth_to_local is the same as
     //  the client's. Maybe move this logic to the client and pass VolumeArgs?
     final String owner = ugi.getShortUserName();
-    final String volumeName = tenantId;  // TODO: Configurable
+    // Volume name defaults to tenant name if unspecified in the request
+    final String volumeName =
+        request.hasTenantName() ? request.getTenantName() : tenantId;

Review comment:
       Reading from the CreateTenantRequest proto here, looks like this should be
   ```
   final String volumeName =
           request.hasVolumeName() ? request.getVolumeName() : tenantId;
   ```
   Also this reveals there is no testing for tenantName != volumeName, so we should probably add that in this PR since its adding the volumeName proto field.

##########
File path: hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
##########
@@ -416,6 +416,9 @@ enum Status {
     INVALID_TENANT_USER_NAME = 80;
     INVALID_ACCESSID = 81;
     TENANT_AUTHORIZER_ERROR = 82;
+
+    VOLUME_IN_USE = 83;
+    TENANT_NOT_EMPTY = 84;  // TODO: Renumber this when rebasing?

Review comment:
       nit. Don't need this TODO. protoc will fail if the IDs conflict after rebase/merge from master.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
##########
@@ -100,6 +100,14 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
 
       OmVolumeArgs omVolumeArgs = getVolumeInfo(omMetadataManager, volume);
 
+      // Check reference count
+      if (omVolumeArgs.getRefCount() != 0L) {
+        LOG.debug("volume: {} has non-zero ref count. won't delete", volume);
+        throw new OMException("Volume is being used. Use " +
+            "`ozone tenant delete` CLI to delete the volume instead.",
+            OMException.ResultCodes.VOLUME_IN_USE);

Review comment:
       Maybe we could say "volume is referenced" instead of "volume is in use" since we are calling the new field a reference count.

##########
File path: hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDBTenantInfo.java
##########
@@ -117,6 +117,13 @@ public String getBucketPolicyGroupName() {
     return bucketPolicyGroupName;
   }
 
+  // NOTE: Should return an empty string "" if somehow the tenant is not
+  // associated with a volume. Never return null.
+  public String getAssociatedVolumeName() {

Review comment:
       What's the distinction between this and the bucket namespace? Does it need to be a different method?

##########
File path: hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmTenantArgs.java
##########
@@ -22,14 +22,26 @@
  * This class is used for storing Ozone tenant arguments.
  */
 public class OmTenantArgs {
-  /* Tenant name */
+  /**
+   * Tenant name.
+   */
   private final String tenantId;
+  /**
+   * Volume name to be created for this tenant.
+   * Default volume name would be the same as tenant name if unspecified.

Review comment:
       Can we do this with a constructor overload instead of leaving it up to interpretation of whoever uses this class?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
##########
@@ -408,7 +410,7 @@ public void testOzoneTenantBasicOperations() throws IOException {
     executeHA(tenantShell, new String[] {"create", "finance"});

Review comment:
       Side note: we should have a jira for the fixme above so we don't forget about it.

##########
File path: hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
##########
@@ -60,6 +60,14 @@ Secure Tenant GetSecret Success
     ${output} =         Execute          ozone tenant user getsecret 'tenantone$bob' --export
                         Should contain   ${output}         export AWS_SECRET_ACCESS_KEY='somesecret1'
 
+Secure Tenant Delete Tenant Failure Tenant Not Empty
+    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant delete tenantone
+                        Should contain   ${output}         Failed to delete tenant 'tenantone': Tenant 'tenantone' is not empty
+
+Secure Tenant Revoke User AccessId Success
+    ${output} =         Execute          ozone tenant user revoke 'tenantone$bob'
+                        Should contain   ${output}         Revoked accessId 'tenantone$bob'.

Review comment:
       Probably want a negative test case here, since a bug would be very bad. Try to use Bob's access ID and make sure it doesn't work.

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
##########
@@ -444,6 +471,14 @@ public void deleteBucket(String bucketName) throws IOException {
     proxy.deleteBucket(name, bucketName);
   }
 
+  public long getRefCount() {

Review comment:
       This class is returned to the client. Do we want to expose ref count to the client? I think at least the setter should not be here, but I'm not sure whether or not we want the getter.

##########
File path: hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
##########
@@ -84,6 +97,14 @@ private OmVolumeArgs(String adminName, String ownerName, String volume,
     this.updateID = updateID;
   }
 
+  public long getRefCount() {
+    assert (refCount >= 0);
+    return refCount;
+  }
+
+  public void setRefCount(long refCount) {

Review comment:
       This should be an increment/decrement so requests cannot (accidentally) overwrite the current volume's ref count.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
##########
@@ -591,7 +622,14 @@ public void testListTenantUsers() throws IOException {
     checkOutput(out, "", true);
     checkOutput(err, "Revoked accessId", false);
 
-    // TODO: Clean up: remove tenant when tenant remove CLI is implemented
+    executeHA(tenantShell, new String[] {
+        "user", "revoke", "tenant1$bob"});
+    checkOutput(out, "", true);
+    checkOutput(err, "Revoked accessId", false);
+
+    executeHA(tenantShell, new String[] {"delete", "tenant1"});
+    checkOutput(out, "Deleted tenant 'tenant1'.\n", true);
+    checkOutput(err, "", true);

Review comment:
       Since we are testing list tenant here, a list after the delete for this test might be good too.

##########
File path: hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
##########
@@ -72,3 +80,8 @@ Secure Tenant Create Tenant Failure with Regular (non-admin) user
 Secure Tenant SetSecret Failure with Regular (non-admin) user
     ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user set-secret 'tenantone$bob' --secret=somesecret2 --export
                         Should contain   ${output}         Permission denied. Requested accessId
+
+Secure Tenant Delete Tenant Success
+    Run Keyword   Kinit test user     testuser     testuser.keytab
+    ${output} =         Execute          ozone tenant delete tenantone
+                        Should contain   ${output}         Deleted tenant 'tenantone'.

Review comment:
       A negative test might be good here too. Like following this with a tenant list command and making sure the tenant is actually gone.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
##########
@@ -18,33 +18,260 @@
  */
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantDeleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_IN_USE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 /**
  * Handles OMTenantDelete request.
  */
 public class OMTenantDeleteRequest extends OMVolumeRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMTenantDeleteRequest.class);
 
   public OMTenantDeleteRequest(OMRequest omRequest) {
     super(omRequest);
   }
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    return getOmRequest();
+

Review comment:
       Why are we checking all this DB state in preExecute? Minor performance optimization? We just have to check it all again in validateAndUpdateCache if these checks pass. Is this pattern used elsewhere for existing OM requests?

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
##########
@@ -141,7 +142,9 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
     // A caveat is that this assumes OM's auth_to_local is the same as
     //  the client's. Maybe move this logic to the client and pass VolumeArgs?
     final String owner = ugi.getShortUserName();
-    final String volumeName = tenantId;  // TODO: Configurable
+    // Volume name defaults to tenant name if unspecified in the request

Review comment:
       Similar to above, do we want to handle this on the client side? We could have the volume name always be set, and it's up to the client whether or not that is the same as the tenant name?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
##########
@@ -344,7 +344,7 @@ public void testAssignAdmin() throws IOException {
     checkOutput(err, "", true);
 
     // Loop assign-revoke 3 times
-    for (int i = 0; i < 3; i++) {
+    for (int i = 0; i < 1; i++) {

Review comment:
       Why the one iteration for loop?

##########
File path: hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
##########
@@ -463,6 +466,7 @@ message VolumeInfo {
     optional uint64 modificationTime = 10;
     optional int64 quotaInNamespace = 11 [default = -2];
     optional uint64 usedNamespace = 12;
+    optional int64 refCount = 13;

Review comment:
       Same as above. If we decide not to expose ref count to the client we can remove this.

##########
File path: hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
##########
@@ -72,3 +76,12 @@ Secure Tenant Create Tenant Failure with Regular (non-admin) user
 Secure Tenant SetSecret Failure with Regular (non-admin) user
     ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user set-secret 'tenantone$bob' --secret=somesecret2 --export
                         Should contain   ${output}         Permission denied. Requested accessId
+
+Secure Tenant Revoke User AccessId Success
+    Run Keyword   Kinit test user     testuser     testuser.keytab
+    ${output} =         Execute          ozone tenant user revoke 'tenantone$bob'
+                        Should contain   ${output}         Revoked accessId 'tenantone$bob'.

Review comment:
       We should probably have a negative test case here too, since a bug in this would be bad. Try to use Bob's access ID and make sure it no longer works.

##########
File path: hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantCreateHandler.java
##########
@@ -33,26 +32,19 @@
     description = "Create one or more tenants")
 public class TenantCreateHandler extends TenantHandler {
 
-  @CommandLine.Spec
-  private CommandLine.Model.CommandSpec spec;
-
-  @CommandLine.Parameters(description = "List of tenant names")
+  @CommandLine.Parameters(description = "List of tenant names", arity = "1..")
   private List<String> tenants = new ArrayList<>();
 
   @Override
   protected void execute(OzoneClient client, OzoneAddress address) {
-    if (tenants.size() > 0) {
-      for (String tenantName : tenants) {
-        try {
-          client.getObjectStore().createTenant(tenantName);
-          out().println("Created tenant '" + tenantName + "'.");
-        } catch (IOException e) {
-          err().println("Failed to create tenant '" + tenantName + "': " +
-              e.getMessage());
-        }
+    for (String tenantId : tenants) {

Review comment:
       Do we have other CLIs like this that issue multiple OM requests? If nobody else is doing this, I would say we avoid it. It's confusing to the user what state is created if one of the multiple requests fails.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
##########
@@ -18,33 +18,260 @@
  */
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantDeleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_IN_USE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 /**
  * Handles OMTenantDelete request.
  */
 public class OMTenantDeleteRequest extends OMVolumeRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMTenantDeleteRequest.class);
 
   public OMTenantDeleteRequest(OMRequest omRequest) {
     super(omRequest);
   }
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    return getOmRequest();
+
+    // Check Ozone cluster admin privilege
+    OMTenantRequestHelper.checkAdmin(ozoneManager);
+
+    final OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+
+    // TODO: Check tenantId validity? Maybe not
+
+    // Check tenant existence in tenantStateTable
+    if (!metadataManager.getTenantStateTable().isExist(tenantId)) {
+      LOG.debug("tenant: {} does not exist", tenantId);
+      throw new OMException("Tenant '" + tenantId + "' does not exist",
+          TENANT_NOT_FOUND);
+    }
+
+    // Check if the tenant volume is empty (when the volume exists)
+    final OmDBTenantInfo dbTenantInfo =
+        metadataManager.getTenantStateTable().get(tenantId);
+    final String volumeName = dbTenantInfo.getAssociatedVolumeName();
+    // Note: Looks like isVolumeEmpty() returns true if volume doesn't exist
+    if (volumeName.length() > 0 && !metadataManager.isVolumeEmpty(volumeName)) {
+      LOG.debug("volume: {} is not empty", volumeName);
+      throw new OMException("Tenant volume '" + volumeName + "' is not empty." +
+          " Volume must be emptied before the tenant can be deleted.",
+          VOLUME_NOT_EMPTY);
+    }
+
+    // Check if there are any accessIds in the tenant
+    final OMMultiTenantManager tenantManager =
+        ozoneManager.getMultiTenantManager();
+    if (!OMTenantRequestHelper.isTenantEmpty(tenantManager, tenantId)) {
+      LOG.warn("tenant: '{}' is not empty. Unable to delete the tenant",
+          tenantId);
+      throw new OMException("Tenant '" + tenantId + "' is not empty. " +
+          "All accessIds associated to this tenant must be revoked before " +
+          "the tenant can be deleted.", TENANT_NOT_EMPTY);
+    }
+
+    // TODO: TBD: Call ozoneManager.getMultiTenantManager().deleteTenant() ?
+
+    // Regenerate request with the volumeName
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()
+        .setDeleteTenantRequest(DeleteTenantRequest.newBuilder()
+            .setTenantId(tenantId))
+        .setDeleteVolumeRequest(DeleteVolumeRequest.newBuilder()
+            .setVolumeName(volumeName))
+        // TODO: Can the three lines below be ignored?
+        .setUserInfo(getUserInfo())
+        .setCmdType(getOmRequest().getCmdType())
+        .setClientId(getOmRequest().getClientId());
+
+    return omRequestBuilder.build();
   }
 
   @Override
+  public void handleRequestFailure(OzoneManager ozoneManager) {
+    super.handleRequestFailure(ozoneManager);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
-    return null;
+    OMClientResponse omClientResponse = null;
+    final OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    boolean acquiredVolumeLock = false, acquiredUserLock = false;
+    final Map<String, String> auditMap = new HashMap<>();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+    // NOTE: volumeName might be a zero-length string
+    final String volumeName =
+        getOmRequest().getDeleteVolumeRequest().getVolumeName();
+
+    IOException exception = null;
+    OmVolumeArgs omVolumeArgs;
+    String volumeOwner = null;
+    // deleteVolume is true if volumeName is not empty string
+    boolean deleteVolume = volumeName.length() > 0;
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo newVolumeList = null;
+
+    try {
+      if (deleteVolume) {
+        // check Acl
+        if (ozoneManager.getAclsEnabled()) {
+          checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+              OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.DELETE,
+              volumeName, null, null);
+        }
+
+        acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        omVolumeArgs = getVolumeInfo(omMetadataManager, volumeName);
+        // Check volume ref count
+        long volRefCount = omVolumeArgs.getRefCount();
+        if (volRefCount > 1L) {
+          LOG.warn("Volume '{}' has a greater than 1 reference count of " +
+                  "'{}'", volumeName, volRefCount);
+          throw new OMException("Volume '" + volumeName + "' has a greater " +
+              "than 1 reference count of '" + volRefCount + "'. This may " +
+              "indicate the volume is in use by some other Ozone features. " +
+              "Please disable such other features before trying to delete " +
+              "the tenant again.", VOLUME_IN_USE);
+        }
+
+        volumeOwner = omVolumeArgs.getOwnerName();
+        acquiredUserLock = omMetadataManager.getLock().acquireWriteLock(
+            USER_LOCK, volumeOwner);
+
+        // Check volume emptiness, again
+        if (!omMetadataManager.isVolumeEmpty(volumeName)) {
+          LOG.debug("volume: '{}' is not empty", volumeName);
+          throw new OMException("Aborting tenant deletion. " +
+              "Volume becomes non-empty somewhere between" +
+              "preExecute and validateAndUpdateCache", VOLUME_NOT_EMPTY);
+        }
+
+        // Actual volume deletion, follows OMVolumeDeleteRequest
+        newVolumeList =
+            omMetadataManager.getUserTable().get(volumeOwner);
+        newVolumeList = delVolumeFromOwnerList(newVolumeList, volumeName,
+            volumeOwner, transactionLogIndex);
+        final String dbUserKey = omMetadataManager.getUserKey(volumeOwner);
+        omMetadataManager.getUserTable().addCacheEntry(
+            new CacheKey<>(dbUserKey),
+            new CacheValue<>(Optional.of(newVolumeList), transactionLogIndex));
+        final String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+        omMetadataManager.getVolumeTable().addCacheEntry(
+            new CacheKey<>(dbVolumeKey),
+            new CacheValue<>(Optional.absent(), transactionLogIndex));
+
+        // TODO: Set response dbVolumeKey?
+      }
+
+      // TODO: Should hold some tenant lock here. Just in case !deleteVolume
+//      acquiredTenantLock = omMetadataManager.getLock().acquireWriteLock(
+//          TENANT_LOCK, tenantId);
+
+      // Double check tenant emptiness
+
+      // Invalidate cache entries for tenant
+      omMetadataManager.getTenantStateTable().addCacheEntry(
+          new CacheKey<>(tenantId),
+          new CacheValue<>(Optional.absent(), transactionLogIndex));
+      final String userPolicyGroupName =
+          tenantId + OzoneConsts.DEFAULT_TENANT_USER_POLICY_SUFFIX;
+      final String bucketPolicyGroupName =
+          tenantId + OzoneConsts.DEFAULT_TENANT_BUCKET_POLICY_SUFFIX;

Review comment:
       Can we do the concatenation to create the keys with a helper method in OMTenantRequestHelper? Less error prone for other devs to work with.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
##########
@@ -18,33 +18,260 @@
  */
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantDeleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_IN_USE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 /**
  * Handles OMTenantDelete request.
  */
 public class OMTenantDeleteRequest extends OMVolumeRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMTenantDeleteRequest.class);
 
   public OMTenantDeleteRequest(OMRequest omRequest) {
     super(omRequest);
   }
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    return getOmRequest();
+
+    // Check Ozone cluster admin privilege
+    OMTenantRequestHelper.checkAdmin(ozoneManager);
+
+    final OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+
+    // TODO: Check tenantId validity? Maybe not
+
+    // Check tenant existence in tenantStateTable
+    if (!metadataManager.getTenantStateTable().isExist(tenantId)) {
+      LOG.debug("tenant: {} does not exist", tenantId);
+      throw new OMException("Tenant '" + tenantId + "' does not exist",
+          TENANT_NOT_FOUND);
+    }
+
+    // Check if the tenant volume is empty (when the volume exists)
+    final OmDBTenantInfo dbTenantInfo =
+        metadataManager.getTenantStateTable().get(tenantId);
+    final String volumeName = dbTenantInfo.getAssociatedVolumeName();
+    // Note: Looks like isVolumeEmpty() returns true if volume doesn't exist
+    if (volumeName.length() > 0 && !metadataManager.isVolumeEmpty(volumeName)) {
+      LOG.debug("volume: {} is not empty", volumeName);
+      throw new OMException("Tenant volume '" + volumeName + "' is not empty." +
+          " Volume must be emptied before the tenant can be deleted.",
+          VOLUME_NOT_EMPTY);
+    }
+
+    // Check if there are any accessIds in the tenant
+    final OMMultiTenantManager tenantManager =
+        ozoneManager.getMultiTenantManager();
+    if (!OMTenantRequestHelper.isTenantEmpty(tenantManager, tenantId)) {
+      LOG.warn("tenant: '{}' is not empty. Unable to delete the tenant",
+          tenantId);
+      throw new OMException("Tenant '" + tenantId + "' is not empty. " +
+          "All accessIds associated to this tenant must be revoked before " +
+          "the tenant can be deleted.", TENANT_NOT_EMPTY);
+    }
+
+    // TODO: TBD: Call ozoneManager.getMultiTenantManager().deleteTenant() ?
+
+    // Regenerate request with the volumeName
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()
+        .setDeleteTenantRequest(DeleteTenantRequest.newBuilder()
+            .setTenantId(tenantId))
+        .setDeleteVolumeRequest(DeleteVolumeRequest.newBuilder()
+            .setVolumeName(volumeName))
+        // TODO: Can the three lines below be ignored?
+        .setUserInfo(getUserInfo())
+        .setCmdType(getOmRequest().getCmdType())
+        .setClientId(getOmRequest().getClientId());
+
+    return omRequestBuilder.build();
   }
 
   @Override
+  public void handleRequestFailure(OzoneManager ozoneManager) {
+    super.handleRequestFailure(ozoneManager);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
-    return null;
+    OMClientResponse omClientResponse = null;
+    final OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    boolean acquiredVolumeLock = false, acquiredUserLock = false;
+    final Map<String, String> auditMap = new HashMap<>();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+    // NOTE: volumeName might be a zero-length string
+    final String volumeName =
+        getOmRequest().getDeleteVolumeRequest().getVolumeName();
+
+    IOException exception = null;
+    OmVolumeArgs omVolumeArgs;
+    String volumeOwner = null;
+    // deleteVolume is true if volumeName is not empty string
+    boolean deleteVolume = volumeName.length() > 0;
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo newVolumeList = null;
+
+    try {
+      if (deleteVolume) {
+        // check Acl
+        if (ozoneManager.getAclsEnabled()) {
+          checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+              OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.DELETE,
+              volumeName, null, null);
+        }
+
+        acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        omVolumeArgs = getVolumeInfo(omMetadataManager, volumeName);
+        // Check volume ref count
+        long volRefCount = omVolumeArgs.getRefCount();
+        if (volRefCount > 1L) {
+          LOG.warn("Volume '{}' has a greater than 1 reference count of " +
+                  "'{}'", volumeName, volRefCount);
+          throw new OMException("Volume '" + volumeName + "' has a greater " +
+              "than 1 reference count of '" + volRefCount + "'. This may " +
+              "indicate the volume is in use by some other Ozone features. " +
+              "Please disable such other features before trying to delete " +
+              "the tenant again.", VOLUME_IN_USE);
+        }
+
+        volumeOwner = omVolumeArgs.getOwnerName();
+        acquiredUserLock = omMetadataManager.getLock().acquireWriteLock(
+            USER_LOCK, volumeOwner);
+
+        // Check volume emptiness, again
+        if (!omMetadataManager.isVolumeEmpty(volumeName)) {
+          LOG.debug("volume: '{}' is not empty", volumeName);
+          throw new OMException("Aborting tenant deletion. " +
+              "Volume becomes non-empty somewhere between" +
+              "preExecute and validateAndUpdateCache", VOLUME_NOT_EMPTY);
+        }
+
+        // Actual volume deletion, follows OMVolumeDeleteRequest
+        newVolumeList =
+            omMetadataManager.getUserTable().get(volumeOwner);
+        newVolumeList = delVolumeFromOwnerList(newVolumeList, volumeName,
+            volumeOwner, transactionLogIndex);
+        final String dbUserKey = omMetadataManager.getUserKey(volumeOwner);
+        omMetadataManager.getUserTable().addCacheEntry(
+            new CacheKey<>(dbUserKey),
+            new CacheValue<>(Optional.of(newVolumeList), transactionLogIndex));
+        final String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+        omMetadataManager.getVolumeTable().addCacheEntry(
+            new CacheKey<>(dbVolumeKey),
+            new CacheValue<>(Optional.absent(), transactionLogIndex));
+
+        // TODO: Set response dbVolumeKey?
+      }
+
+      // TODO: Should hold some tenant lock here. Just in case !deleteVolume
+//      acquiredTenantLock = omMetadataManager.getLock().acquireWriteLock(
+//          TENANT_LOCK, tenantId);

Review comment:
       What would the lock be for? If we need it we should add it now or we'll forget about it.

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java
##########
@@ -115,6 +117,10 @@ public long getQuotaInNamespace() {
     return new VolumeArgs.Builder();
   }
 
+  public long getRefCount() {

Review comment:
       This class is used by the client to create a new volume. Probably no need to have ref count here.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
##########
@@ -18,33 +18,260 @@
  */
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantDeleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_IN_USE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 /**
  * Handles OMTenantDelete request.
  */
 public class OMTenantDeleteRequest extends OMVolumeRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMTenantDeleteRequest.class);
 
   public OMTenantDeleteRequest(OMRequest omRequest) {
     super(omRequest);
   }
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    return getOmRequest();
+
+    // Check Ozone cluster admin privilege
+    OMTenantRequestHelper.checkAdmin(ozoneManager);
+
+    final OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+
+    // TODO: Check tenantId validity? Maybe not
+
+    // Check tenant existence in tenantStateTable
+    if (!metadataManager.getTenantStateTable().isExist(tenantId)) {
+      LOG.debug("tenant: {} does not exist", tenantId);
+      throw new OMException("Tenant '" + tenantId + "' does not exist",
+          TENANT_NOT_FOUND);
+    }
+
+    // Check if the tenant volume is empty (when the volume exists)
+    final OmDBTenantInfo dbTenantInfo =
+        metadataManager.getTenantStateTable().get(tenantId);
+    final String volumeName = dbTenantInfo.getAssociatedVolumeName();
+    // Note: Looks like isVolumeEmpty() returns true if volume doesn't exist
+    if (volumeName.length() > 0 && !metadataManager.isVolumeEmpty(volumeName)) {
+      LOG.debug("volume: {} is not empty", volumeName);
+      throw new OMException("Tenant volume '" + volumeName + "' is not empty." +
+          " Volume must be emptied before the tenant can be deleted.",
+          VOLUME_NOT_EMPTY);
+    }
+
+    // Check if there are any accessIds in the tenant
+    final OMMultiTenantManager tenantManager =
+        ozoneManager.getMultiTenantManager();
+    if (!OMTenantRequestHelper.isTenantEmpty(tenantManager, tenantId)) {
+      LOG.warn("tenant: '{}' is not empty. Unable to delete the tenant",
+          tenantId);
+      throw new OMException("Tenant '" + tenantId + "' is not empty. " +
+          "All accessIds associated to this tenant must be revoked before " +
+          "the tenant can be deleted.", TENANT_NOT_EMPTY);
+    }
+
+    // TODO: TBD: Call ozoneManager.getMultiTenantManager().deleteTenant() ?
+
+    // Regenerate request with the volumeName
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()
+        .setDeleteTenantRequest(DeleteTenantRequest.newBuilder()
+            .setTenantId(tenantId))
+        .setDeleteVolumeRequest(DeleteVolumeRequest.newBuilder()
+            .setVolumeName(volumeName))
+        // TODO: Can the three lines below be ignored?
+        .setUserInfo(getUserInfo())
+        .setCmdType(getOmRequest().getCmdType())
+        .setClientId(getOmRequest().getClientId());
+
+    return omRequestBuilder.build();
   }
 
   @Override
+  public void handleRequestFailure(OzoneManager ozoneManager) {
+    super.handleRequestFailure(ozoneManager);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
-    return null;
+    OMClientResponse omClientResponse = null;
+    final OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    boolean acquiredVolumeLock = false, acquiredUserLock = false;
+    final Map<String, String> auditMap = new HashMap<>();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+    // NOTE: volumeName might be a zero-length string
+    final String volumeName =
+        getOmRequest().getDeleteVolumeRequest().getVolumeName();
+
+    IOException exception = null;
+    OmVolumeArgs omVolumeArgs;
+    String volumeOwner = null;
+    // deleteVolume is true if volumeName is not empty string
+    boolean deleteVolume = volumeName.length() > 0;
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo newVolumeList = null;
+
+    try {
+      if (deleteVolume) {
+        // check Acl
+        if (ozoneManager.getAclsEnabled()) {
+          checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+              OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.DELETE,
+              volumeName, null, null);
+        }
+
+        acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        omVolumeArgs = getVolumeInfo(omMetadataManager, volumeName);
+        // Check volume ref count
+        long volRefCount = omVolumeArgs.getRefCount();
+        if (volRefCount > 1L) {
+          LOG.warn("Volume '{}' has a greater than 1 reference count of " +
+                  "'{}'", volumeName, volRefCount);
+          throw new OMException("Volume '" + volumeName + "' has a greater " +
+              "than 1 reference count of '" + volRefCount + "'. This may " +
+              "indicate the volume is in use by some other Ozone features. " +
+              "Please disable such other features before trying to delete " +
+              "the tenant again.", VOLUME_IN_USE);
+        }
+
+        volumeOwner = omVolumeArgs.getOwnerName();
+        acquiredUserLock = omMetadataManager.getLock().acquireWriteLock(
+            USER_LOCK, volumeOwner);
+
+        // Check volume emptiness, again
+        if (!omMetadataManager.isVolumeEmpty(volumeName)) {
+          LOG.debug("volume: '{}' is not empty", volumeName);
+          throw new OMException("Aborting tenant deletion. " +
+              "Volume becomes non-empty somewhere between" +
+              "preExecute and validateAndUpdateCache", VOLUME_NOT_EMPTY);

Review comment:
       Can we make this more user friendly? User does not need to know about our method names, that's for log messages.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
##########
@@ -18,33 +18,260 @@
  */
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantDeleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_IN_USE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 /**
  * Handles OMTenantDelete request.
  */
 public class OMTenantDeleteRequest extends OMVolumeRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMTenantDeleteRequest.class);
 
   public OMTenantDeleteRequest(OMRequest omRequest) {
     super(omRequest);
   }
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    return getOmRequest();
+
+    // Check Ozone cluster admin privilege
+    OMTenantRequestHelper.checkAdmin(ozoneManager);
+
+    final OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+
+    // TODO: Check tenantId validity? Maybe not
+
+    // Check tenant existence in tenantStateTable
+    if (!metadataManager.getTenantStateTable().isExist(tenantId)) {
+      LOG.debug("tenant: {} does not exist", tenantId);
+      throw new OMException("Tenant '" + tenantId + "' does not exist",
+          TENANT_NOT_FOUND);
+    }
+
+    // Check if the tenant volume is empty (when the volume exists)
+    final OmDBTenantInfo dbTenantInfo =
+        metadataManager.getTenantStateTable().get(tenantId);
+    final String volumeName = dbTenantInfo.getAssociatedVolumeName();
+    // Note: Looks like isVolumeEmpty() returns true if volume doesn't exist
+    if (volumeName.length() > 0 && !metadataManager.isVolumeEmpty(volumeName)) {
+      LOG.debug("volume: {} is not empty", volumeName);
+      throw new OMException("Tenant volume '" + volumeName + "' is not empty." +
+          " Volume must be emptied before the tenant can be deleted.",
+          VOLUME_NOT_EMPTY);
+    }
+
+    // Check if there are any accessIds in the tenant
+    final OMMultiTenantManager tenantManager =
+        ozoneManager.getMultiTenantManager();
+    if (!OMTenantRequestHelper.isTenantEmpty(tenantManager, tenantId)) {
+      LOG.warn("tenant: '{}' is not empty. Unable to delete the tenant",
+          tenantId);
+      throw new OMException("Tenant '" + tenantId + "' is not empty. " +
+          "All accessIds associated to this tenant must be revoked before " +
+          "the tenant can be deleted.", TENANT_NOT_EMPTY);
+    }
+
+    // TODO: TBD: Call ozoneManager.getMultiTenantManager().deleteTenant() ?
+
+    // Regenerate request with the volumeName
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()
+        .setDeleteTenantRequest(DeleteTenantRequest.newBuilder()
+            .setTenantId(tenantId))
+        .setDeleteVolumeRequest(DeleteVolumeRequest.newBuilder()
+            .setVolumeName(volumeName))
+        // TODO: Can the three lines below be ignored?
+        .setUserInfo(getUserInfo())
+        .setCmdType(getOmRequest().getCmdType())
+        .setClientId(getOmRequest().getClientId());
+
+    return omRequestBuilder.build();
   }
 
   @Override
+  public void handleRequestFailure(OzoneManager ozoneManager) {
+    super.handleRequestFailure(ozoneManager);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
-    return null;
+    OMClientResponse omClientResponse = null;
+    final OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    boolean acquiredVolumeLock = false, acquiredUserLock = false;
+    final Map<String, String> auditMap = new HashMap<>();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+    // NOTE: volumeName might be a zero-length string
+    final String volumeName =
+        getOmRequest().getDeleteVolumeRequest().getVolumeName();
+
+    IOException exception = null;
+    OmVolumeArgs omVolumeArgs;
+    String volumeOwner = null;
+    // deleteVolume is true if volumeName is not empty string
+    boolean deleteVolume = volumeName.length() > 0;
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo newVolumeList = null;
+
+    try {
+      if (deleteVolume) {
+        // check Acl
+        if (ozoneManager.getAclsEnabled()) {
+          checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+              OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.DELETE,
+              volumeName, null, null);
+        }
+
+        acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        omVolumeArgs = getVolumeInfo(omMetadataManager, volumeName);
+        // Check volume ref count
+        long volRefCount = omVolumeArgs.getRefCount();
+        if (volRefCount > 1L) {

Review comment:
       This > check is why this request did not catch that this value is never persisted. It is never initialized and defaults to 0. Actually if the value is 0 here we have a problem and should at least log a warning.

##########
File path: hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
##########
@@ -48,6 +48,19 @@
   private long quotaInNamespace;
   private long usedNamespace;
   private List<OzoneAcl> acls;
+  /**
+   * Reference count on this Ozone volume.
+   *
+   * When reference count is larger than zero, it indicates that at least one
+   * "lock" is held on the volume by some Ozone feature (e.g. multi-tenancy).
+   * Volume delete operation will be denied in this case, and user should be
+   * prompted to release the lock first via the interface provided by that
+   * feature.
+   *
+   * Volumes created using CLI, ObjectStore API or upgraded from older OM DB
+   * will have reference count set to zero by default.
+   */
+  private long refCount;

Review comment:
       This needs to be added to `getFromProtobuf`, `getProtobuf`, and `VolumeInfo` proto. If the value is not present, we can just set it to 0. Right now this does nothing, which indicates we need to add testing for existing volume create/delete requests and delete of a tenant volume, which would have caught this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org