You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/05/29 11:17:00 UTC

[geode] branch develop updated: GEODE-6786: Provide ability to delete a region using V2 REST API (#3610)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new ce10ede  GEODE-6786: Provide ability to delete a region using V2 REST API (#3610)
ce10ede is described below

commit ce10ede1dd8a8a085d332bde0389473f7a57ba9d
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Wed May 29 04:16:36 2019 -0700

    GEODE-6786: Provide ability to delete a region using V2 REST API (#3610)
---
 .../rest/RegionManagementOnMultipleGroup.java      |  62 ---------
 .../RegionConfigRealizerIntegrationTest.java       |  16 +++
 .../ConfigurationPersistenceService.java           |   1 -
 .../apache/geode/management/cli/CliFunction.java   |   3 +
 ...ntValidator.java => CacheElementOperation.java} |  23 +---
 .../api/LocatorClusterManagementService.java       | 126 ++++++++++++++---
 .../cli/functions/UpdateCacheFunction.java         |  13 +-
 .../mutators/RegionConfigManager.java              |   6 +-
 .../realizers/RegionConfigRealizer.java            |  17 ++-
 .../validators/CacheElementValidator.java          |  14 +-
 .../validators/ConfigurationValidator.java         |   8 +-
 .../validators/RegionConfigValidator.java          |  31 ++++-
 .../sanctioned-geode-core-serializables.txt        |   2 +-
 .../cache/configuration/RegionConfigTest.java      |   3 +-
 .../api/LocatorClusterManagementServiceTest.java   | 146 +++++++++++++++++---
 .../realizers/RegionConfigRealizerTest.java        |   5 +-
 .../validators/CacheElementValidatorTest.java      |   5 +-
 .../validators/RegionConfigValidatorTest.java      |  41 +++---
 .../internal/ClientClusterManagementService.java   |  10 +-
 .../ClientClusterManagementServiceDUnitTest.java   | 153 +++++++++++++++++++--
 .../rest/RegionManagementIntegrationTest.java      |   4 +-
 .../RegionManagementSecurityIntegrationTest.java   |   2 +-
 .../controllers/RegionManagementController.java    |   9 ++
 23 files changed, 516 insertions(+), 184 deletions(-)

diff --git a/geode-assembly/src/distributedTest/java/org/apache/geode/management/internal/rest/RegionManagementOnMultipleGroup.java b/geode-assembly/src/distributedTest/java/org/apache/geode/management/internal/rest/RegionManagementOnMultipleGroup.java
deleted file mode 100644
index 7c19063..0000000
--- a/geode-assembly/src/distributedTest/java/org/apache/geode/management/internal/rest/RegionManagementOnMultipleGroup.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.management.internal.rest;
-
-import static org.apache.geode.test.junit.assertions.ClusterManagementResultAssert.assertManagementResult;
-
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import org.apache.geode.cache.configuration.RegionConfig;
-import org.apache.geode.cache.configuration.RegionType;
-import org.apache.geode.management.api.ClusterManagementResult;
-import org.apache.geode.management.api.ClusterManagementService;
-import org.apache.geode.management.client.ClusterManagementServiceBuilder;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.dunit.rules.MemberVM;
-
-public class RegionManagementOnMultipleGroup {
-  @ClassRule
-  public static ClusterStartupRule cluster = new ClusterStartupRule();
-
-  @Test
-  public void createRegion() throws Exception {
-    MemberVM locator = cluster.startLocatorVM(0, l -> l.withHttpService());
-
-    cluster.startServerVM(1, "group1", locator.getPort());
-    cluster.startServerVM(2, "group2", locator.getPort());
-    cluster.startServerVM(3, "group1,group2", locator.getPort());
-
-    ClusterManagementService cms =
-        ClusterManagementServiceBuilder.buildWithHostAddress()
-            .setHostAddress("localhost", locator.getHttpPort())
-            .build();
-    RegionConfig region = new RegionConfig();
-    region.setName("test");
-    region.setGroup("group1");
-    region.setType(RegionType.REPLICATE);
-
-    ClusterManagementResult result = cms.create(region);
-    assertManagementResult(result).isSuccessful().hasMemberStatus().containsOnlyKeys("server-1",
-        "server-3");
-
-    // create the same region on group2 will be successful even though the region already exists
-    // on one member
-    region.setGroup("group2");
-    assertManagementResult(cms.create(region)).isSuccessful().hasMemberStatus()
-        .containsOnlyKeys("server-2", "server-3");
-  }
-}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerIntegrationTest.java
index b87d36f..a0c03d7 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerIntegrationTest.java
@@ -65,4 +65,20 @@ public class RegionConfigRealizerIntegrationTest {
     // the 2nd time with same name and type will not throw an error
     realizer.create(config, server.getCache());
   }
+
+  @Test
+  public void deleteRegion() {
+    config.setName("foo");
+    config.setType(RegionType.REPLICATE);
+    RegionConfigValidator.setShortcutAttributes(config);
+    realizer.create(config, server.getCache());
+
+    Region region = server.getCache().getRegion(config.getName());
+    assertThat(region).isNotNull();
+
+    realizer.delete(config, server.getCache());
+
+    region = server.getCache().getRegion(config.getName());
+    assertThat(region).isNull();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ConfigurationPersistenceService.java b/geode-core/src/main/java/org/apache/geode/distributed/ConfigurationPersistenceService.java
index 44ae6a6..3d5fcf3 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/ConfigurationPersistenceService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/ConfigurationPersistenceService.java
@@ -30,7 +30,6 @@ public interface ConfigurationPersistenceService {
   /**
    * retrieves all the group names in the cluster
    */
-
   Set<String> getGroups();
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/management/cli/CliFunction.java b/geode-core/src/main/java/org/apache/geode/management/cli/CliFunction.java
index 65c41e6..3eb2946 100644
--- a/geode-core/src/main/java/org/apache/geode/management/cli/CliFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/cli/CliFunction.java
@@ -22,6 +22,7 @@ import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.management.internal.exceptions.EntityNotFoundException;
 
 /**
  * An abstract function implementation to be extended by cli functions. Any cli function extending
@@ -34,6 +35,8 @@ public abstract class CliFunction<T> implements InternalFunction<T> {
   public final void execute(FunctionContext<T> context) {
     try {
       context.getResultSender().lastResult(executeFunction(context));
+    } catch (EntityNotFoundException nfe) {
+      context.getResultSender().lastResult(new CliFunctionResult(context.getMemberName(), nfe));
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
       context.getResultSender().lastResult(new CliFunctionResult(context.getMemberName(), e));
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidator.java b/geode-core/src/main/java/org/apache/geode/management/internal/CacheElementOperation.java
similarity index 51%
copy from geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidator.java
copy to geode-core/src/main/java/org/apache/geode/management/internal/CacheElementOperation.java
index 085fce3..e9b07ff 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidator.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/CacheElementOperation.java
@@ -13,25 +13,8 @@
  * the License.
  */
 
-package org.apache.geode.management.internal.configuration.validators;
+package org.apache.geode.management.internal;
 
-import org.apache.geode.cache.configuration.CacheConfig;
-import org.apache.geode.cache.configuration.CacheElement;
-
-/**
- * this is used to validate all the common attributes of CacheElement, eg. group
- */
-public class CacheElementValidator implements ConfigurationValidator<CacheElement> {
-  @Override
-  public void validate(CacheElement config) throws IllegalArgumentException {
-    if ("cluster".equalsIgnoreCase(config.getGroup())) {
-      throw new IllegalArgumentException(
-          "cluster is a reserved group name. Do not use it for member groups.");
-    }
-  }
-
-  @Override
-  public boolean exists(CacheElement config, CacheConfig persistedConfig) {
-    return false;
-  }
+public enum CacheElementOperation {
+  CREATE, DELETE, UPDATE, LIST
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
index 34fed9c..fdad9f9 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
@@ -20,6 +20,7 @@ package org.apache.geode.management.internal.api;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +45,7 @@ import org.apache.geode.management.api.ClusterManagementResult;
 import org.apache.geode.management.api.ClusterManagementService;
 import org.apache.geode.management.configuration.MemberConfig;
 import org.apache.geode.management.configuration.RuntimeCacheElement;
+import org.apache.geode.management.internal.CacheElementOperation;
 import org.apache.geode.management.internal.cli.CliUtil;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
 import org.apache.geode.management.internal.cli.functions.UpdateCacheFunction;
@@ -86,7 +88,8 @@ public class LocatorClusterManagementService implements ClusterManagementService
 
   @Override
   public ClusterManagementResult create(CacheElement config) {
-    String group = config.getConfigGroup();
+    // validate that user used the correct config object type
+    ConfigurationManager configurationManager = getConfigurationManager(config);
 
     if (persistenceService == null) {
       return new ClusterManagementResult(false,
@@ -94,38 +97,32 @@ public class LocatorClusterManagementService implements ClusterManagementService
     }
 
     // first validate common attributes of all configuration object
-    validators.get(CacheElement.class).validate(config);
+    validators.get(CacheElement.class).validate(CacheElementOperation.CREATE, config);
 
+    String group = config.getConfigGroup();
     ConfigurationValidator validator = validators.get(config.getClass());
     if (validator != null) {
-      validator.validate(config);
+      validator.validate(CacheElementOperation.CREATE, config);
       // exit early if config element already exists in cache config
       CacheConfig currentPersistedConfig = persistenceService.getCacheConfig(group, true);
-      if (validator.exists(config, currentPersistedConfig)) {
-        throw new EntityExistsException("cache element " + config.getId() + " already exists.");
+      if (validator.exists(config.getId(), currentPersistedConfig)) {
+        throw new EntityExistsException("Cache element '" + config.getId() + "' already exists");
       }
     }
 
-    // validate that user used the correct config object type
-    ConfigurationManager configurationManager = managers.get(config.getClass());
-    if (configurationManager == null) {
-      throw new IllegalArgumentException(String.format("Configuration type %s is not supported.",
-          config.getClass().getSimpleName()));
-    }
-
     // execute function on all members
     Set<DistributedMember> targetedMembers = findMembers(group);
 
     if (targetedMembers.size() == 0) {
       return new ClusterManagementResult(false,
-          "no members found in " + group + " to create cache element");
+          "No members found in group '" + group + "' to create cache element");
     }
 
     ClusterManagementResult result = new ClusterManagementResult();
 
     List<CliFunctionResult> functionResults = executeAndGetFunctionResult(
         new UpdateCacheFunction(),
-        Arrays.asList(config, UpdateCacheFunction.CacheElementOperation.ADD),
+        Arrays.asList(config, CacheElementOperation.CREATE),
         targetedMembers);
     functionResults
         .forEach(functionResult -> result.addMemberStatus(functionResult.getMemberIdOrName(),
@@ -134,7 +131,7 @@ public class LocatorClusterManagementService implements ClusterManagementService
 
     // if any false result is added to the member list
     if (result.getStatusCode() != ClusterManagementResult.StatusCode.OK) {
-      result.setStatus(false, "Failed to apply the update on all members.");
+      result.setStatus(false, "Failed to apply the update on all members");
       return result;
     }
 
@@ -144,9 +141,9 @@ public class LocatorClusterManagementService implements ClusterManagementService
       try {
         configurationManager.add(config, cacheConfigForGroup);
         result.setStatus(true,
-            "successfully persisted config for " + finalGroup);
+            "Successfully updated config for " + finalGroup);
       } catch (Exception e) {
-        String message = "failed to update cluster config for " + finalGroup;
+        String message = "Failed to update cluster config for " + finalGroup;
         logger.error(message, e);
         result.setStatus(ClusterManagementResult.StatusCode.FAIL_TO_PERSIST, message);
         return null;
@@ -158,7 +155,84 @@ public class LocatorClusterManagementService implements ClusterManagementService
 
   @Override
   public ClusterManagementResult delete(CacheElement config) {
-    throw new NotImplementedException("Not implemented");
+    // validate that user used the correct config object type
+    ConfigurationManager configurationManager = getConfigurationManager(config);
+
+    if (persistenceService == null) {
+      return new ClusterManagementResult(false,
+          "Cluster configuration service needs to be enabled");
+    }
+
+    // first validate common attributes of all configuration object
+    validators.get(CacheElement.class).validate(CacheElementOperation.DELETE, config);
+
+    ConfigurationValidator validator = validators.get(config.getClass());
+    validator.validate(CacheElementOperation.DELETE, config);
+
+    List<String> relevantGroups = persistenceService.getGroups().stream().filter(g -> {
+      CacheConfig currentPersistedConfig = persistenceService.getCacheConfig(g);
+      if (currentPersistedConfig != null && validator != null) {
+        return validator.exists(config.getId(), currentPersistedConfig);
+      } else {
+        return false;
+      }
+    }).collect(Collectors.toList());
+
+    if (relevantGroups.isEmpty()) {
+      throw new EntityNotFoundException("Cache element '" + config.getId() + "' does not exist");
+    }
+
+    // execute function on all members
+    Set<DistributedMember> targetedMembers = new HashSet<>();
+    relevantGroups.forEach(g -> targetedMembers.addAll(findMembers(g)));
+
+    if (targetedMembers.size() == 0) {
+      return new ClusterManagementResult(false,
+          "No members found to delete cache element");
+    }
+
+    ClusterManagementResult result = new ClusterManagementResult();
+
+    List<CliFunctionResult> functionResults = executeAndGetFunctionResult(
+        new UpdateCacheFunction(),
+        Arrays.asList(config, CacheElementOperation.DELETE),
+        targetedMembers);
+    functionResults
+        .forEach(functionResult -> result.addMemberStatus(functionResult.getMemberIdOrName(),
+            functionResult.isSuccessful(),
+            functionResult.getStatusMessage()));
+
+    // if any false result is added to the member list
+    if (result.getStatusCode() != ClusterManagementResult.StatusCode.OK) {
+      result.setStatus(false, "Failed to apply the update on all members");
+      return result;
+    }
+
+    // persist configuration in cache config
+    List<String> updatedGroups = new ArrayList<>();
+    List<String> failedGroups = new ArrayList<>();
+    for (String finalGroup : relevantGroups) {
+      persistenceService.updateCacheConfig(finalGroup, cacheConfigForGroup -> {
+        try {
+          configurationManager.delete(config, cacheConfigForGroup);
+          updatedGroups.add(finalGroup);
+        } catch (Exception e) {
+          logger.error("Failed to update cluster config for " + finalGroup, e);
+          failedGroups.add(finalGroup);
+          return null;
+        }
+        return cacheConfigForGroup;
+      });
+    }
+
+    if (failedGroups.isEmpty()) {
+      result.setStatus(true, "Successfully removed config for " + updatedGroups);
+    } else {
+      String message = "Failed to update cluster config for " + failedGroups;
+      result.setStatus(ClusterManagementResult.StatusCode.FAIL_TO_PERSIST, message);
+    }
+
+    return result;
   }
 
   @Override
@@ -244,13 +318,13 @@ public class LocatorClusterManagementService implements ClusterManagementService
 
   @VisibleForTesting
   Set<DistributedMember> findMembers(String group) {
-    Stream<DistributedMember> stream =
+    Stream<DistributedMember> memberStream =
         cache.getDistributionManager().getNormalDistributionManagerIds()
             .stream().map(DistributedMember.class::cast);
     if (!"cluster".equals(group)) {
-      stream = stream.filter(m -> m.getGroups().contains(group));
+      memberStream = memberStream.filter(m -> m.getGroups().contains(group));
     }
-    return stream.collect(Collectors.toSet());
+    return memberStream.collect(Collectors.toSet());
   }
 
   @VisibleForTesting
@@ -259,4 +333,14 @@ public class LocatorClusterManagementService implements ClusterManagementService
     ResultCollector rc = CliUtil.executeFunction(function, args, targetMembers);
     return CliFunctionResult.cleanResults((List<?>) rc.getResult());
   }
+
+  private ConfigurationManager getConfigurationManager(CacheElement config) {
+    ConfigurationManager configurationManager = managers.get(config.getClass());
+    if (configurationManager == null) {
+      throw new IllegalArgumentException(String.format("Configuration type %s is not supported",
+          config.getClass().getSimpleName()));
+    }
+
+    return configurationManager;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/UpdateCacheFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/UpdateCacheFunction.java
index 9749732..0e5cfc0 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/UpdateCacheFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/UpdateCacheFunction.java
@@ -17,31 +17,28 @@
 
 package org.apache.geode.management.internal.cli.functions;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.collections.map.HashedMap;
-
 import org.apache.geode.annotations.Immutable;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.configuration.CacheElement;
 import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.CacheElementOperation;
 import org.apache.geode.management.internal.configuration.realizers.ConfigurationRealizer;
 import org.apache.geode.management.internal.configuration.realizers.RegionConfigRealizer;
 
 public class UpdateCacheFunction extends CliFunction<List> {
   @Immutable
-  private static final Map<Class, ConfigurationRealizer> realizers = new HashedMap();
+  private static final Map<Class, ConfigurationRealizer> realizers = new HashMap<>();
+
   static {
     realizers.put(RegionConfig.class, new RegionConfigRealizer());
   }
 
-  public enum CacheElementOperation {
-    ADD, DELETE, UPDATE
-  }
-
   @Override
   public CliFunctionResult executeFunction(FunctionContext<List> context) throws Exception {
     CacheElement cacheElement = (CacheElement) context.getArguments().get(0);
@@ -50,7 +47,7 @@ public class UpdateCacheFunction extends CliFunction<List> {
 
     ConfigurationRealizer realizer = realizers.get(cacheElement.getClass());
     switch (operation) {
-      case ADD:
+      case CREATE:
         realizer.create(cacheElement, cache);
         break;
       case DELETE:
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/RegionConfigManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/RegionConfigManager.java
index 1cb8b7d..34a769d 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/RegionConfigManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/RegionConfigManager.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.configuration.CacheConfig;
 import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.internal.cache.InternalCache;
@@ -35,6 +36,9 @@ public class RegionConfigManager
     implements ConfigurationManager<RegionConfig> {
   private InternalCache cache;
 
+  @VisibleForTesting
+  RegionConfigManager() {}
+
   public RegionConfigManager(InternalCache cache) {
     this.cache = cache;
   }
@@ -55,7 +59,7 @@ public class RegionConfigManager
 
   @Override
   public void delete(RegionConfig config, CacheConfig existing) {
-    throw new NotImplementedException("Not implemented yet");
+    existing.getRegions().removeIf(i -> i.getId().equals(config.getId()));
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java
index 7f8a644..7a9514e 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java
@@ -26,6 +26,7 @@ import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.ExpirationAction;
 import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.Scope;
@@ -79,7 +80,7 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
     factory.createSubregion(parentRegion, regionName);
   }
 
-  RegionFactory getRegionFactory(Cache cache, RegionAttributesType regionAttributes) {
+  private RegionFactory getRegionFactory(Cache cache, RegionAttributesType regionAttributes) {
     RegionFactory factory = cache.createRegionFactory();
 
     factory.setDataPolicy(DataPolicy.fromString(regionAttributes.getDataPolicy().name()));
@@ -298,7 +299,19 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
   public void update(RegionConfig config, Cache cache) {}
 
   @Override
-  public void delete(RegionConfig config, Cache cache) {}
+  public void delete(RegionConfig config, Cache cache) {
+    Region region = cache.getRegion(config.getName());
+    if (region == null) {
+      // Since we are trying to delete this region, we can return early
+      return;
+    }
 
+    try {
+      region.destroyRegion();
+    } catch (RegionDestroyedException dex) {
+      // Probably happened as a distirbuted op but it still reflects our current desired action
+      // which is why it can be ignored here.
+    }
+  }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidator.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidator.java
index 085fce3..8fdcdd8 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidator.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidator.java
@@ -15,23 +15,31 @@
 
 package org.apache.geode.management.internal.configuration.validators;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.geode.cache.configuration.CacheConfig;
 import org.apache.geode.cache.configuration.CacheElement;
+import org.apache.geode.management.internal.CacheElementOperation;
 
 /**
  * this is used to validate all the common attributes of CacheElement, eg. group
  */
 public class CacheElementValidator implements ConfigurationValidator<CacheElement> {
   @Override
-  public void validate(CacheElement config) throws IllegalArgumentException {
+  public void validate(CacheElementOperation operation, CacheElement config)
+      throws IllegalArgumentException {
+    if (StringUtils.isBlank(config.getId())) {
+      throw new IllegalArgumentException("id cannot be null or blank");
+    }
+
     if ("cluster".equalsIgnoreCase(config.getGroup())) {
       throw new IllegalArgumentException(
-          "cluster is a reserved group name. Do not use it for member groups.");
+          "'cluster' is a reserved group name. Do not use it for member groups.");
     }
   }
 
   @Override
-  public boolean exists(CacheElement config, CacheConfig persistedConfig) {
+  public boolean exists(String id, CacheConfig persistedConfig) {
     return false;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/ConfigurationValidator.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/ConfigurationValidator.java
index 5c789ee..9506ffe 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/ConfigurationValidator.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/ConfigurationValidator.java
@@ -16,6 +16,7 @@ package org.apache.geode.management.internal.configuration.validators;
 
 import org.apache.geode.cache.configuration.CacheConfig;
 import org.apache.geode.cache.configuration.CacheElement;
+import org.apache.geode.management.internal.CacheElementOperation;
 
 public interface ConfigurationValidator<T extends CacheElement> {
 
@@ -25,17 +26,18 @@ public interface ConfigurationValidator<T extends CacheElement> {
    * This will be called after the ClusterManagementService received the configuration object from
    * the api call and before passing it to the realizers and mutators.
    *
-   *
+   * @param operation the operation being performed. Different validation may be required depending
+   *        on the operation.
    * @param config the user defined configuration object. It is mutable. you can modify the
    *        values in the configuration object. e.g. add default values
    *
    */
-  void validate(T config) throws IllegalArgumentException;
+  void validate(CacheElementOperation operation, T config) throws IllegalArgumentException;
 
   /**
    * check to see if this configuration already exists
    *
    * @return true if this config already exists in the persisted cache configuration
    */
-  boolean exists(T config, CacheConfig persistedConfig);
+  boolean exists(String id, CacheConfig persistedConfig);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidator.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidator.java
index 8d7f264..3ce8239 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidator.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidator.java
@@ -15,6 +15,9 @@
 
 package org.apache.geode.management.internal.configuration.validators;
 
+
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.geode.cache.configuration.CacheConfig;
 import org.apache.geode.cache.configuration.CacheElement;
 import org.apache.geode.cache.configuration.EnumActionDestroyOverflow;
@@ -25,6 +28,7 @@ import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.cache.configuration.RegionType;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.RegionNameValidation;
+import org.apache.geode.management.internal.CacheElementOperation;
 import org.apache.geode.security.ResourcePermission;
 
 public class RegionConfigValidator implements ConfigurationValidator<RegionConfig> {
@@ -35,13 +39,26 @@ public class RegionConfigValidator implements ConfigurationValidator<RegionConfi
   }
 
   @Override
-  public void validate(RegionConfig config)
+  public void validate(CacheElementOperation operation, RegionConfig config)
       throws IllegalArgumentException {
 
     if (config.getName() == null) {
       throw new IllegalArgumentException("Name of the region has to be specified.");
     }
 
+    switch (operation) {
+      case UPDATE:
+      case CREATE:
+        validateCreate(config);
+        break;
+      case DELETE:
+        validateDelete(config);
+        break;
+      default:
+    }
+  }
+
+  private void validateCreate(RegionConfig config) {
     if (config.getType() == null) {
       throw new IllegalArgumentException("Type of the region has to be specified.");
     }
@@ -50,7 +67,6 @@ public class RegionConfigValidator implements ConfigurationValidator<RegionConfi
     // by management v2 api.
     try {
       RegionType.valueOf(config.getType());
-
     } catch (IllegalArgumentException e) {
       throw new IllegalArgumentException(
           String.format("Type %s is not supported in Management V2 API.", config.getType()));
@@ -68,6 +84,12 @@ public class RegionConfigValidator implements ConfigurationValidator<RegionConfi
     }
   }
 
+  private void validateDelete(RegionConfig config) {
+    if (StringUtils.isNotBlank(config.getGroup())) {
+      throw new IllegalArgumentException("Group is invalid option when deleting a region");
+    }
+  }
+
   public static void setShortcutAttributes(RegionConfig config) {
     String type = config.getType();
     RegionAttributesType regionAttributes;
@@ -274,9 +296,8 @@ public class RegionConfigValidator implements ConfigurationValidator<RegionConfi
     }
   }
 
-
   @Override
-  public boolean exists(RegionConfig config, CacheConfig existing) {
-    return CacheElement.exists(existing.getRegions(), config.getId());
+  public boolean exists(String id, CacheConfig existing) {
+    return CacheElement.exists(existing.getRegions(), id);
   }
 }
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index bc63c18..ad7b61b 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -440,6 +440,7 @@ org/apache/geode/management/cli/CommandProcessingException,true,-139877952163957
 org/apache/geode/management/cli/CommandServiceException,true,7316102209844678329
 org/apache/geode/management/cli/Result$Status,false,code:int
 org/apache/geode/management/internal/BackupStatusImpl,true,3704172840296221840,backedUpDiskStores:java/util/Map,offlineDiskStores:java/util/Set
+org/apache/geode/management/internal/CacheElementOperation,false
 org/apache/geode/management/internal/ContextAwareSSLRMIClientSocketFactory,true,8159615071011918570
 org/apache/geode/management/internal/JmxManagerLocator$StartJmxManagerFunction,true,-2860286061903069789
 org/apache/geode/management/internal/ManagementAgent$GemFireRMIServerSocketFactory,true,-811909050641332716,bindAddr:java/net/InetAddress
@@ -547,7 +548,6 @@ org/apache/geode/management/internal/cli/functions/SizeExportLogsFunction,true,1
 org/apache/geode/management/internal/cli/functions/UndeployFunction,true,1
 org/apache/geode/management/internal/cli/functions/UnregisterFunction,true,1
 org/apache/geode/management/internal/cli/functions/UpdateCacheFunction,false
-org/apache/geode/management/internal/cli/functions/UpdateCacheFunction$CacheElementOperation,false
 org/apache/geode/management/internal/cli/functions/UserFunctionExecution,true,1
 org/apache/geode/management/internal/cli/result/CommandResultException,true,1,result:org/apache/geode/management/cli/Result
 org/apache/geode/management/internal/cli/result/TableBuilder$Align,false
diff --git a/geode-core/src/test/java/org/apache/geode/cache/configuration/RegionConfigTest.java b/geode-core/src/test/java/org/apache/geode/cache/configuration/RegionConfigTest.java
index e471c96..385b481 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/configuration/RegionConfigTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/configuration/RegionConfigTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.config.JAXBService;
+import org.apache.geode.management.internal.CacheElementOperation;
 import org.apache.geode.management.internal.configuration.validators.RegionConfigValidator;
 import org.apache.geode.util.internal.GeodeJsonMapper;
 
@@ -254,7 +255,7 @@ public class RegionConfigTest {
     config.setRegionAttributes(attributes);
 
     RegionConfigValidator validator = new RegionConfigValidator(mock(InternalCache.class));
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
index 4dec4a9..54438f3 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
@@ -19,8 +19,11 @@ import static org.apache.geode.test.junit.assertions.ClusterManagementResultAsse
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -32,33 +35,39 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.configuration.CacheConfig;
 import org.apache.geode.cache.configuration.CacheElement;
 import org.apache.geode.cache.configuration.RegionConfig;
-import org.apache.geode.distributed.ConfigurationPersistenceService;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.management.api.ClusterManagementResult;
 import org.apache.geode.management.configuration.MemberConfig;
 import org.apache.geode.management.configuration.RuntimeRegionConfig;
+import org.apache.geode.management.internal.CacheElementOperation;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
 import org.apache.geode.management.internal.configuration.mutators.ConfigurationManager;
+import org.apache.geode.management.internal.configuration.mutators.RegionConfigManager;
 import org.apache.geode.management.internal.configuration.validators.CacheElementValidator;
 import org.apache.geode.management.internal.configuration.validators.ConfigurationValidator;
 import org.apache.geode.management.internal.configuration.validators.RegionConfigValidator;
+import org.apache.geode.management.internal.exceptions.EntityNotFoundException;
 
 public class LocatorClusterManagementServiceTest {
 
   private LocatorClusterManagementService service;
   private InternalCache cache;
-  private ConfigurationPersistenceService persistenceService;
+  private InternalConfigurationPersistenceService persistenceService;
   private RegionConfig regionConfig;
   private ClusterManagementResult result;
   private Map<Class, ConfigurationValidator> validators = new HashMap<>();
@@ -70,21 +79,30 @@ public class LocatorClusterManagementServiceTest {
   @Before
   public void before() throws Exception {
     regionValidator = mock(RegionConfigValidator.class);
-    regionManager = mock(ConfigurationManager.class);
-    cacheElementValidator = mock(CacheElementValidator.class);
+    doCallRealMethod().when(regionValidator).validate(eq(CacheElementOperation.DELETE), any());
+    regionManager = spy(RegionConfigManager.class);
+    cacheElementValidator = spy(CacheElementValidator.class);
     validators.put(RegionConfig.class, regionValidator);
     validators.put(CacheElement.class, cacheElementValidator);
     managers.put(RegionConfig.class, regionManager);
 
     cache = mock(InternalCache.class);
-    persistenceService = mock(ConfigurationPersistenceService.class);
+    persistenceService = spy(InternalConfigurationPersistenceService.class);
+
+    Set<String> groups = new HashSet<>();
+    groups.add("cluster");
+    doReturn(groups).when(persistenceService).getGroups();
+    doReturn(new CacheConfig()).when(persistenceService).getCacheConfig(any(), anyBoolean());
+    doReturn(true).when(persistenceService).lockSharedConfiguration();
+    doNothing().when(persistenceService).unlockSharedConfiguration();
     service =
         spy(new LocatorClusterManagementService(cache, persistenceService, managers, validators));
     regionConfig = new RegionConfig();
+    regionConfig.setName("region1");
   }
 
   @Test
-  public void persistenceIsNull() throws Exception {
+  public void create_persistenceIsNull() throws Exception {
     service = new LocatorClusterManagementService(cache, null);
     result = service.create(regionConfig);
     assertThat(result.isSuccessful()).isFalse();
@@ -93,18 +111,17 @@ public class LocatorClusterManagementServiceTest {
   }
 
   @Test
-  public void validatorIsCalledCorrectly() throws Exception {
+  public void create_validatorIsCalledCorrectly() throws Exception {
     doReturn(Collections.emptySet()).when(service).findMembers(anyString());
     assertManagementResult(service.create(regionConfig))
         .failed().hasStatusCode(ClusterManagementResult.StatusCode.ERROR)
-        .containsStatusMessage("no members found");
-    verify(cacheElementValidator).validate(regionConfig);
-    verify(regionValidator).validate(regionConfig);
-    verify(regionValidator).exists(eq(regionConfig), any());
+        .containsStatusMessage("No members found in group");
+    verify(cacheElementValidator).validate(CacheElementOperation.CREATE, regionConfig);
+    verify(regionValidator).validate(CacheElementOperation.CREATE, regionConfig);
   }
 
   @Test
-  public void partialFailureOnMembers() throws Exception {
+  public void create_partialFailureOnMembers() throws Exception {
     List<CliFunctionResult> functionResults = new ArrayList<>();
     functionResults.add(new CliFunctionResult("member1", true, "success"));
     functionResults.add(new CliFunctionResult("member2", false, "failed"));
@@ -121,16 +138,38 @@ public class LocatorClusterManagementServiceTest {
   }
 
   @Test
-  public void non_supportedConfigObject() throws Exception {
+  public void create_succeedsOnAllMembers() throws Exception {
+    List<CliFunctionResult> functionResults = new ArrayList<>();
+    functionResults.add(new CliFunctionResult("member1", true, "success"));
+    functionResults.add(new CliFunctionResult("member2", true, "failed"));
+    doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any());
+
+    doReturn(Collections.singleton(mock(DistributedMember.class))).when(service).findMembers(any());
+
+    CacheConfig cacheConfig = new CacheConfig();
+    when(persistenceService.getCacheConfig("cluster", true)).thenReturn(cacheConfig);
+    doReturn(null).when(persistenceService).getConfiguration(any());
+    Region mockRegion = mock(Region.class);
+    doReturn(mockRegion).when(persistenceService).getConfigurationRegion();
+
+    regionConfig.setName("test");
+    result = service.create(regionConfig);
+    assertThat(result.isSuccessful()).isTrue();
+
+    assertThat(cacheConfig.getRegions()).hasSize(1);
+  }
+
+  @Test
+  public void create_non_supportedConfigObject() throws Exception {
     MemberConfig config = new MemberConfig();
     assertThatThrownBy(() -> service.create(config)).isInstanceOf(IllegalArgumentException.class)
         .hasMessageContaining("Configuration type MemberConfig is not supported");
   }
 
   @Test
-  public void listOneGroup() throws Exception {
+  public void list_oneGroup() throws Exception {
     regionConfig.setGroup("cluster");
-    when(persistenceService.getGroups()).thenReturn(Sets.newHashSet("cluster", "group1"));
+    doReturn(Sets.newHashSet("cluster", "group1")).when(persistenceService).getGroups();
 
     service.list(regionConfig);
     // even we are listing regions in one group, we still need to go through all the groups
@@ -140,8 +179,8 @@ public class LocatorClusterManagementServiceTest {
   }
 
   @Test
-  public void aRegionInClusterAndGroup1() throws Exception {
-    when(persistenceService.getGroups()).thenReturn(Sets.newHashSet("cluster", "group1"));
+  public void list_aRegionInClusterAndGroup1() throws Exception {
+    doReturn(Sets.newHashSet("cluster", "group1")).when(persistenceService).getGroups();
     RuntimeRegionConfig region1 = new RuntimeRegionConfig();
     region1.setName("region1");
     region1.setType("REPLICATE");
@@ -151,16 +190,83 @@ public class LocatorClusterManagementServiceTest {
 
     List clusterRegions = Arrays.asList(region1);
     List group1Regions = Arrays.asList(region2);
-    when(regionManager.list(any(), any())).thenReturn(clusterRegions)
-        .thenReturn(group1Regions);
+    doReturn(clusterRegions, group1Regions).when(regionManager).list(any(), any());
 
     // this is to make sure when 'cluster" is in one of the group, it will show
     // the cluster and the other group name
     List<RuntimeRegionConfig> results =
         service.list(new RegionConfig()).getResult(RuntimeRegionConfig.class);
     assertThat(results).hasSize(1);
-    RuntimeRegionConfig result = (RuntimeRegionConfig) results.get(0);
+    RuntimeRegionConfig result = results.get(0);
     assertThat(result.getName()).isEqualTo("region1");
     assertThat(result.getGroups()).containsExactlyInAnyOrder("cluster", "group1");
   }
+
+  @Test
+  public void delete_unknownRegionFails() {
+    RegionConfig config = new RegionConfig();
+    config.setName("unknown");
+    assertThatThrownBy(() -> service.delete(config))
+        .isInstanceOf(EntityNotFoundException.class)
+        .hasMessage("Cache element 'unknown' does not exist");
+  }
+
+  @Test
+  public void delete_usingGroupFails() {
+    RegionConfig config = new RegionConfig();
+    config.setName("test");
+    config.setGroup("group1");
+    assertThatThrownBy(() -> service.delete(config))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Group is invalid option when deleting a region");
+  }
+
+  @Test
+  public void delete_partialFailureOnMembers() throws Exception {
+    List<CliFunctionResult> functionResults = new ArrayList<>();
+    functionResults.add(new CliFunctionResult("member1", true, "success"));
+    functionResults.add(new CliFunctionResult("member2", false, "failed"));
+    doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any());
+
+    doReturn(Collections.singleton(mock(DistributedMember.class))).when(service).findMembers(any());
+
+    CacheConfig config = new CacheConfig();
+    RegionConfig regionConfig = new RegionConfig();
+    regionConfig.setName("test");
+    config.getRegions().add(regionConfig);
+    doReturn(config).when(persistenceService).getCacheConfig(eq("cluster"), anyBoolean());
+    when(regionValidator.exists(eq("test"), any())).thenReturn(true);
+
+    result = service.delete(regionConfig);
+    assertThat(result.isSuccessful()).isFalse();
+    assertThat(result.getStatusMessage())
+        .contains("Failed to apply the update on all members");
+
+    assertThat(config.getRegions()).hasSize(1);
+  }
+
+  @Test
+  public void delete_succeedsOnAllMembers() throws Exception {
+    List<CliFunctionResult> functionResults = new ArrayList<>();
+    functionResults.add(new CliFunctionResult("member1", true, "success"));
+    functionResults.add(new CliFunctionResult("member2", true, "failed"));
+    doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any());
+
+    doReturn(Collections.singleton(mock(DistributedMember.class))).when(service).findMembers(any());
+
+    CacheConfig config = new CacheConfig();
+    RegionConfig regionConfig = new RegionConfig();
+    regionConfig.setName("test");
+    config.getRegions().add(regionConfig);
+    doReturn(config).when(persistenceService).getCacheConfig(eq("cluster"), anyBoolean());
+    doReturn(null).when(persistenceService).getConfiguration(any());
+    Region mockRegion = mock(Region.class);
+    doReturn(mockRegion).when(persistenceService).getConfigurationRegion();
+    when(regionValidator.exists(eq("test"), any())).thenReturn(true);
+
+    result = service.delete(regionConfig);
+    assertThat(result.isSuccessful()).isTrue();
+
+    assertThat(config.getRegions()).isEmpty();
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerTest.java
index 0e71b10..546d7f6 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizerTest.java
@@ -30,6 +30,7 @@ import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.internal.CacheElementOperation;
 import org.apache.geode.management.internal.configuration.validators.RegionConfigValidator;
 
 public class RegionConfigRealizerTest {
@@ -52,7 +53,7 @@ public class RegionConfigRealizerTest {
     RegionConfig config = new RegionConfig();
     config.setName("regionName");
     config.setType(RegionShortcut.PARTITION.name());
-    validator.validate(config);
+    validator.validate(CacheElementOperation.CREATE, config);
     realizer.create(config, cache);
 
     ArgumentCaptor<DataPolicy> dataPolicyArgumentCaptor = ArgumentCaptor.forClass(DataPolicy.class);
@@ -67,7 +68,7 @@ public class RegionConfigRealizerTest {
     RegionConfig config = new RegionConfig();
     config.setName("regionName");
     config.setType(RegionShortcut.REPLICATE.name());
-    validator.validate(config);
+    validator.validate(CacheElementOperation.CREATE, config);
     realizer.create(config, cache);
 
     ArgumentCaptor<DataPolicy> dataPolicyArgumentCaptor = ArgumentCaptor.forClass(DataPolicy.class);
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidatorTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidatorTest.java
index 1be5265..fef3778 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidatorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/CacheElementValidatorTest.java
@@ -21,6 +21,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.management.internal.CacheElementOperation;
 
 public class CacheElementValidatorTest {
 
@@ -37,9 +38,9 @@ public class CacheElementValidatorTest {
   public void invalidGroup_cluster() throws Exception {
     config.setName("test");
     config.setGroup("cluster");
-    assertThatThrownBy(() -> validator.validate(config)).isInstanceOf(
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config)).isInstanceOf(
         IllegalArgumentException.class)
         .hasMessageContaining(
-            "cluster is a reserved group name");
+            "'cluster' is a reserved group name");
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidatorTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidatorTest.java
index f98dbeb..88c4010 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidatorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/validators/RegionConfigValidatorTest.java
@@ -36,6 +36,7 @@ import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.cache.configuration.RegionType;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.management.internal.CacheElementOperation;
 import org.apache.geode.security.ResourcePermission;
 
 public class RegionConfigValidatorTest {
@@ -57,7 +58,7 @@ public class RegionConfigValidatorTest {
   public void checkSecurityForDiskAccess() {
     config.setName("regionName");
     config.setType(RegionType.REPLICATE_PERSISTENT);
-    validator.validate(config);
+    validator.validate(CacheElementOperation.CREATE, config);
 
     verify(securityService).authorize(ResourcePermission.Resource.CLUSTER,
         ResourcePermission.Operation.WRITE, ResourcePermission.Target.DISK);
@@ -68,7 +69,7 @@ public class RegionConfigValidatorTest {
   public void noChangesWhenTypeIsSet() {
     config.setName("regionName");
     config.setType(RegionType.REPLICATE);
-    validator.validate(config);
+    validator.validate(CacheElementOperation.CREATE, config);
 
     verify(securityService, times(0)).authorize(
         any(ResourcePermission.Resource.class),
@@ -80,14 +81,14 @@ public class RegionConfigValidatorTest {
   public void invalidType() throws Exception {
     config.setName("regionName");
     config.setType("LOCAL");
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Type LOCAL is not supported in Management V2 API.");
   }
 
   @Test
   public void noName() {
-    assertThatThrownBy(() -> validator.validate(config)).isInstanceOf(
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config)).isInstanceOf(
         IllegalArgumentException.class)
         .hasMessageContaining("Name of the region has to be specified");
   }
@@ -96,7 +97,7 @@ public class RegionConfigValidatorTest {
   public void invalidName1() {
     config.setName("__test");
     config.setType("REPLICATE");
-    assertThatThrownBy(() -> validator.validate(config)).isInstanceOf(
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config)).isInstanceOf(
         IllegalArgumentException.class)
         .hasMessageContaining("Region names may not begin with a double-underscore");
   }
@@ -105,7 +106,7 @@ public class RegionConfigValidatorTest {
   public void invalidName2() {
     config.setName("a!&b");
     config.setType("REPLICATE");
-    assertThatThrownBy(() -> validator.validate(config)).isInstanceOf(
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config)).isInstanceOf(
         IllegalArgumentException.class)
         .hasMessageContaining(
             "Region names may only be alphanumeric and may contain hyphens or underscores");
@@ -114,7 +115,7 @@ public class RegionConfigValidatorTest {
   @Test
   public void missingType() {
     config.setName("test");
-    assertThatThrownBy(() -> validator.validate(config)).isInstanceOf(
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config)).isInstanceOf(
         IllegalArgumentException.class)
         .hasMessageContaining(
             "Type of the region has to be specified");
@@ -127,7 +128,7 @@ public class RegionConfigValidatorTest {
     RegionAttributesType attributes = new RegionAttributesType();
     attributes.setDataPolicy(RegionAttributesDataPolicy.REPLICATE);
     config.setRegionAttributes(attributes);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
   }
 
@@ -139,12 +140,12 @@ public class RegionConfigValidatorTest {
     config.setRegionAttributes(attributes);
 
     attributes.setDataPolicy(RegionAttributesDataPolicy.PARTITION);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
 
     attributes.setDataPolicy(RegionAttributesDataPolicy.REPLICATE);
     attributes.setScope(RegionAttributesScope.DISTRIBUTED_NO_ACK);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
   }
 
@@ -155,7 +156,7 @@ public class RegionConfigValidatorTest {
     RegionAttributesType attributes = new RegionAttributesType();
     attributes.setDataPolicy(RegionAttributesDataPolicy.REPLICATE);
     config.setRegionAttributes(attributes);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
   }
 
@@ -166,7 +167,7 @@ public class RegionConfigValidatorTest {
     RegionAttributesType attributes = new RegionAttributesType();
     attributes.setDataPolicy(RegionAttributesDataPolicy.REPLICATE);
     config.setRegionAttributes(attributes);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
   }
 
@@ -177,16 +178,16 @@ public class RegionConfigValidatorTest {
     RegionAttributesType attributes = new RegionAttributesType();
     attributes.setDataPolicy(RegionAttributesDataPolicy.REPLICATE);
     config.setRegionAttributes(attributes);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
     attributes.setDataPolicy(RegionAttributesDataPolicy.PERSISTENT_PARTITION);
     attributes.setRedundantCopy("0");
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
 
     // valid redundancy copy
     attributes.setRedundantCopy("2");
-    validator.validate(config);
+    validator.validate(CacheElementOperation.CREATE, config);
     assertThat(config.getRegionAttributes().getPartitionAttributes().getRedundantCopies())
         .isEqualTo("2");
   }
@@ -198,12 +199,12 @@ public class RegionConfigValidatorTest {
     RegionAttributesType attributes = new RegionAttributesType();
     attributes.setDataPolicy(RegionAttributesDataPolicy.REPLICATE);
     config.setRegionAttributes(attributes);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
 
     attributes.setDataPolicy(RegionAttributesDataPolicy.PARTITION);
     attributes.setLruHeapPercentageEvictionAction(EnumActionDestroyOverflow.LOCAL_DESTROY);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
   }
 
@@ -214,17 +215,17 @@ public class RegionConfigValidatorTest {
     RegionAttributesType attributes = new RegionAttributesType();
     attributes.setDataPolicy(RegionAttributesDataPolicy.REPLICATE);
     config.setRegionAttributes(attributes);
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
 
     attributes.setDataPolicy(RegionAttributesDataPolicy.PARTITION);
     attributes.setLocalMaxMemory("5000");
-    assertThatThrownBy(() -> validator.validate(config))
+    assertThatThrownBy(() -> validator.validate(CacheElementOperation.CREATE, config))
         .isInstanceOf(IllegalArgumentException.class);
 
     // validator will use the type to set the local max memory to be 0
     attributes.setLocalMaxMemory(null);
-    validator.validate(config);
+    validator.validate(CacheElementOperation.CREATE, config);
     assertThat(attributes.getPartitionAttributes().getLocalMaxMemory()).isEqualTo("0");
   }
 
diff --git a/geode-management/src/main/java/org/apache/geode/management/internal/ClientClusterManagementService.java b/geode-management/src/main/java/org/apache/geode/management/internal/ClientClusterManagementService.java
index 4d72d94..802de99 100644
--- a/geode-management/src/main/java/org/apache/geode/management/internal/ClientClusterManagementService.java
+++ b/geode-management/src/main/java/org/apache/geode/management/internal/ClientClusterManagementService.java
@@ -18,6 +18,7 @@ package org.apache.geode.management.internal;
 
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.http.HttpMethod;
 import org.springframework.web.client.ResponseErrorHandler;
 import org.springframework.web.client.RestTemplate;
 
@@ -66,7 +67,14 @@ public class ClientClusterManagementService implements ClusterManagementService
 
   @Override
   public ClusterManagementResult delete(CacheElement config) {
-    throw new NotImplementedException("Not Implemented");
+    String endPoint = getEndpoint(config);
+    return restTemplate
+        .exchange(VERSION + endPoint + "/{id}?group={group}",
+            HttpMethod.DELETE,
+            null,
+            ClusterManagementResult.class,
+            config.getId(), config.getGroup())
+        .getBody();
   }
 
   @Override
diff --git a/geode-web-management/src/distributedTest/java/org/apache/geode/management/client/ClientClusterManagementServiceDUnitTest.java b/geode-web-management/src/distributedTest/java/org/apache/geode/management/client/ClientClusterManagementServiceDUnitTest.java
index f7dd071..2cccf8a 100644
--- a/geode-web-management/src/distributedTest/java/org/apache/geode/management/client/ClientClusterManagementServiceDUnitTest.java
+++ b/geode-web-management/src/distributedTest/java/org/apache/geode/management/client/ClientClusterManagementServiceDUnitTest.java
@@ -17,8 +17,11 @@ package org.apache.geode.management.client;
 
 
 import static org.apache.geode.test.junit.assertions.ClusterManagementResultAssert.assertManagementResult;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.List;
+
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -34,10 +37,10 @@ import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.cache.configuration.RegionType;
 import org.apache.geode.management.api.ClusterManagementResult;
 import org.apache.geode.management.api.ClusterManagementService;
+import org.apache.geode.management.configuration.RuntimeRegionConfig;
 import org.apache.geode.management.internal.rest.LocatorWebContext;
 import org.apache.geode.management.internal.rest.PlainLocatorContextLoader;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.dunit.rules.MemberVM;
 
 @RunWith(SpringRunner.class)
 @ContextConfiguration(locations = {"classpath*:WEB-INF/geode-management-servlet.xml"},
@@ -51,7 +54,6 @@ public class ClientClusterManagementServiceDUnitTest {
   @Rule
   public ClusterStartupRule cluster = new ClusterStartupRule(1);
 
-  private MemberVM server1;
   private ClusterManagementService client;
   private LocatorWebContext webContext;
 
@@ -59,25 +61,160 @@ public class ClientClusterManagementServiceDUnitTest {
   public void before() {
     cluster.setSkipLocalDistributedSystemCleanup(true);
     webContext = new LocatorWebContext(webApplicationContext);
+
     client = ClusterManagementServiceBuilder.buildWithRequestFactory()
         .setRequestFactory(webContext.getRequestFactory()).build();
-    server1 = cluster.startServerVM(0, webContext.getLocator().getPort());
+    cluster.startServerVM(1, "group1", webContext.getLocator().getPort());
+    cluster.startServerVM(2, "group2", webContext.getLocator().getPort());
+    cluster.startServerVM(3, "group1,group2", webContext.getLocator().getPort());
+  }
+
+  @After
+  public void deleteAllRegions() {
+    List<RuntimeRegionConfig> regions = client.list(new RegionConfig())
+        .getResult(RuntimeRegionConfig.class);
+
+    regions.forEach(r -> client.delete(r));
+
+    List<RuntimeRegionConfig> moreRegions = client.list(new RegionConfig())
+        .getResult(RuntimeRegionConfig.class);
+    assertThat(moreRegions).isEmpty();
   }
 
   @Test
   @WithMockUser
-  public void createRegion() {
+  public void createAndDeleteRegion() {
     RegionConfig region = new RegionConfig();
     region.setName("customer");
     region.setType(RegionType.REPLICATE);
 
+    ClusterManagementResult createResult = client.create(region);
+    assertManagementResult(createResult).hasStatusCode(ClusterManagementResult.StatusCode.OK);
+
+    ClusterManagementResult deleteResult = client.delete(region);
+    assertManagementResult(deleteResult)
+        .hasStatusCode(ClusterManagementResult.StatusCode.OK);
+
+    ClusterManagementResult listResult = client.list(new RegionConfig());
+    assertManagementResult(listResult)
+        .hasStatusCode(ClusterManagementResult.StatusCode.OK)
+        .hasListResult()
+        .isEmpty();
+  }
+
+  @Test
+  @WithMockUser
+  public void createAndDeleteRegionOnGroup() {
+    RegionConfig region = new RegionConfig();
+    region.setName("customer");
+    region.setGroup("group1");
+    region.setType(RegionType.REPLICATE);
+
+    ClusterManagementResult createResult = client.create(region);
+    assertManagementResult(createResult).hasStatusCode(ClusterManagementResult.StatusCode.OK);
+
+    region.setGroup(null);
+    ClusterManagementResult deleteResult = client.delete(region);
+    assertManagementResult(deleteResult)
+        .hasStatusCode(ClusterManagementResult.StatusCode.OK);
+
+    ClusterManagementResult listResult = client.list(new RegionConfig());
+    assertManagementResult(listResult)
+        .hasStatusCode(ClusterManagementResult.StatusCode.OK)
+        .hasListResult()
+        .isEmpty();
+  }
+
+  @Test
+  @WithMockUser
+  public void createSameRegionOnDifferentGroup() {
+    RegionConfig region = new RegionConfig();
+    region.setName("test");
+    region.setGroup("group1");
+    region.setType(RegionType.REPLICATE);
+
+    ClusterManagementResult result = client.create(region);
+    assertManagementResult(result).isSuccessful().hasMemberStatus().containsOnlyKeys("server-1",
+        "server-3");
+
+    // creating the same region on group2 will be successful even though the region already exists
+    // on another member
+    region.setGroup("group2");
+    assertManagementResult(client.create(region)).isSuccessful().hasMemberStatus()
+        .containsOnlyKeys("server-2", "server-3");
+  }
+
+  @Test
+  @WithMockUser
+  public void deleteRegionCreatedOnMultipleGroups() {
+    RegionConfig region = new RegionConfig();
+    region.setName("test");
+    region.setGroup("group1");
+    region.setType(RegionType.REPLICATE);
+
     ClusterManagementResult result = client.create(region);
+    assertManagementResult(result).isSuccessful().hasMemberStatus().containsOnlyKeys("server-1",
+        "server-3");
 
-    // in StressNewTest, this will be run multiple times without restarting the locator
-    assertManagementResult(result).hasStatusCode(ClusterManagementResult.StatusCode.OK,
-        ClusterManagementResult.StatusCode.ENTITY_EXISTS);
+    // creating the same region on group2 will be successful even though the region already exists
+    // on another member
+    region.setGroup("group2");
+    assertManagementResult(client.create(region)).isSuccessful().hasMemberStatus()
+        .containsOnlyKeys("server-2", "server-3");
+
+    region.setGroup(null);
+    ClusterManagementResult deleteResult = client.delete(region);
+
+    assertManagementResult(deleteResult).isSuccessful();
+
+    List<RuntimeRegionConfig> listResult = client.list(new RegionConfig())
+        .getResult(RuntimeRegionConfig.class);
+
+    assertThat(listResult).hasSize(0);
   }
 
+  @Test
+  @WithMockUser
+  public void deleteUnknownRegion() {
+    RegionConfig region = new RegionConfig();
+    region.setName("unknown");
+
+    ClusterManagementResult result = client.delete(region);
+    assertManagementResult(result).failed()
+        .hasStatusCode(ClusterManagementResult.StatusCode.ENTITY_NOT_FOUND);
+  }
+
+  @Test
+  @WithMockUser
+  public void deleteRegionOnSpecificGroup() {
+    RegionConfig region = new RegionConfig();
+    region.setName("region1");
+    region.setGroup("group1");
+    region.setType(RegionType.REPLICATE);
+
+    ClusterManagementResult result = client.create(region);
+    assertManagementResult(result).isSuccessful().hasMemberStatus().containsOnlyKeys("server-1",
+        "server-3");
+
+    region.setName("region2");
+    region.setGroup("group2");
+    result = client.create(region);
+    assertManagementResult(result).isSuccessful().hasMemberStatus().containsOnlyKeys("server-2",
+        "server-3");
+
+    region.setName("region2");
+    region.setGroup(null);
+    result = client.delete(region);
+    assertManagementResult(result)
+        .hasStatusCode(ClusterManagementResult.StatusCode.OK);
+
+    ClusterManagementResult listResult = client.list(new RegionConfig());
+    assertManagementResult(listResult)
+        .hasStatusCode(ClusterManagementResult.StatusCode.OK)
+        .hasListResult()
+        .extracting(x -> x.getId())
+        .containsExactly("region1");
+  }
 
   @Test
   @WithMockUser
diff --git a/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementIntegrationTest.java b/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementIntegrationTest.java
index 65150c3..ad17d06 100644
--- a/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementIntegrationTest.java
+++ b/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementIntegrationTest.java
@@ -67,7 +67,7 @@ public class RegionManagementIntegrationTest {
     assertManagementResult(client.create(regionConfig))
         .failed()
         .hasStatusCode(ClusterManagementResult.StatusCode.ERROR)
-        .containsStatusMessage("no members found in cluster to create cache element");
+        .containsStatusMessage("No members found in group 'cluster' to create cache element");
   }
 
   @Test
@@ -92,7 +92,7 @@ public class RegionManagementIntegrationTest {
     assertManagementResult(client.create(regionConfig))
         .failed()
         .hasStatusCode(ClusterManagementResult.StatusCode.ILLEGAL_ARGUMENT)
-        .containsStatusMessage("cluster is a reserved group name");
+        .containsStatusMessage("'cluster' is a reserved group name");
   }
 
   @Test
diff --git a/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementSecurityIntegrationTest.java b/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementSecurityIntegrationTest.java
index 8e90922..307350a 100644
--- a/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementSecurityIntegrationTest.java
+++ b/geode-web-management/src/integrationTest/java/org/apache/geode/management/internal/rest/RegionManagementSecurityIntegrationTest.java
@@ -99,7 +99,7 @@ public class RegionManagementSecurityIntegrationTest {
         .andExpect(status().isInternalServerError())
         .andExpect(jsonPath("$.statusCode", is("ERROR")))
         .andExpect(jsonPath("$.statusMessage",
-            is("no members found in cluster to create cache element")));
+            is("No members found in group 'cluster' to create cache element")));
   }
 
 }
diff --git a/geode-web-management/src/main/java/org/apache/geode/management/internal/rest/controllers/RegionManagementController.java b/geode-web-management/src/main/java/org/apache/geode/management/internal/rest/controllers/RegionManagementController.java
index 5a9f219..37b33ac 100644
--- a/geode-web-management/src/main/java/org/apache/geode/management/internal/rest/controllers/RegionManagementController.java
+++ b/geode-web-management/src/main/java/org/apache/geode/management/internal/rest/controllers/RegionManagementController.java
@@ -89,6 +89,15 @@ public class RegionManagementController extends AbstractManagementController {
     return clusterManagementService.get(config);
   }
 
+  @PreAuthorize("@securityService.authorize('DATA', 'MANAGE')")
+  @RequestMapping(method = RequestMethod.DELETE, value = REGION_CONFIG_ENDPOINT + "/{id}")
+  @ResponseBody
+  public ClusterManagementResult deleteRegion(@PathVariable(name = "id") String id) {
+    RegionConfig config = new RegionConfig();
+    config.setName(id);
+    return clusterManagementService.delete(config);
+  }
+
   @RequestMapping(method = RequestMethod.GET,
       value = REGION_CONFIG_ENDPOINT + "/{regionName}/indexes")
   @ResponseBody