You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/12/19 18:35:11 UTC

[geode] branch develop updated: GEODE-6103 RegionCreateFunction takes RegionConfig in argument (#2998)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new fff4eb7  GEODE-6103 RegionCreateFunction takes RegionConfig in argument (#2998)
fff4eb7 is described below

commit fff4eb799de6a1dcf716a35bfc361eee8c215f1a
Author: Aditya Anchuri <aa...@pivotal.io>
AuthorDate: Wed Dec 19 10:35:01 2018 -0800

    GEODE-6103 RegionCreateFunction takes RegionConfig in argument (#2998)
    
    * reduce dependence on RegionFunctionArgs
    * Also clean up CreateRegionCommand to achieve a clean separation between validations and calling RegionCreateFunction
    
    Signed-off-by: Aditya Anchuri <aa...@pivotal.io>
    Signed-off-by: Peter Tran <pt...@pivotal.io>
    Signed-off-by: Ken Howe <kh...@pivotal.io>
---
 .../jdbc/internal/cli/CreateDataSourceCommand.java |  17 +-
 .../cli/commands/CreateRegionCommandDUnitTest.java |  98 ++++-
 ...egionCommandPersistsConfigurationDUnitTest.java |  14 +-
 .../CreateRegionCommandIntegrationTest.java        |  17 +-
 .../org/apache/geode/cache/AttributesFactory.java  |   5 +-
 .../java/org/apache/geode/cache/DataPolicy.java    |  18 +
 .../org/apache/geode/cache/EvictionAction.java     |   4 +-
 .../org/apache/geode/cache/EvictionAlgorithm.java  |   4 +-
 .../org/apache/geode/cache/EvictionAttributes.java |  13 +-
 .../org/apache/geode/cache/ExpirationAction.java   |  15 +
 .../apache/geode/cache/PartitionAttributes.java    |   4 +-
 .../java/org/apache/geode/cache/RegionFactory.java |   2 +-
 .../geode/cache/configuration/CacheElement.java    |   4 +
 .../geode/cache/configuration/DeclarableType.java  |   3 +-
 .../geode/cache/configuration/DiskDirsType.java    |   3 +-
 .../configuration/RegionAttributesDataPolicy.java  |   4 +-
 .../RegionAttributesIndexUpdateType.java           |   4 +-
 .../cache/configuration/RegionAttributesType.java  |  17 +-
 .../geode/cache/configuration/RegionConfig.java    |   5 +-
 .../internal/cache/EvictionAttributesImpl.java     |  66 ++++
 .../internal/cache/PartitionAttributesImpl.java    |  64 +++
 .../internal/cache/execute/InternalFunction.java   |   1 -
 .../geode/management/internal/cli/CliUtil.java     |   8 +-
 .../cli/commands/CreateJndiBindingCommand.java     |  17 +-
 .../internal/cli/commands/CreateRegionCommand.java | 433 +++++++++++----------
 .../internal/cli/domain/ExpirationArgs.java        |  59 +++
 .../internal/cli/domain/PartitionArgs.java         |  92 +++++
 .../internal/cli/domain/RegionConfigFactory.java   | 334 +++++++++++-----
 .../cli/exceptions/EntityExistsException.java      |  35 +-
 .../cli/functions/CreateRegionFunctionArgs.java    |  43 ++
 .../cli/functions/GatewaySenderCreateFunction.java |  53 +--
 .../cli/functions/RegionCreateFunction.java        | 411 +++++++++----------
 .../internal/cli/functions/RegionFunctionArgs.java |  32 --
 .../management/internal/cli/i18n/CliStrings.java   |  54 +--
 .../management/internal/cli/util/RegionPath.java   |   6 +-
 .../sanctioned-geode-core-serializables.txt        |  11 +
 .../cli/commands/CreateRegionCommandTest.java      |  53 +--
 .../cli/domain/RegionConfigFactoryTest.java        | 211 +++++++---
 .../cli/commands/CreateRegionCommandDUnitTest.java |  39 +-
 39 files changed, 1420 insertions(+), 853 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateDataSourceCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateDataSourceCommand.java
index bcd03e6..ea68cd7 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateDataSourceCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateDataSourceCommand.java
@@ -31,7 +31,6 @@ import org.apache.geode.distributed.internal.InternalConfigurationPersistenceSer
 import org.apache.geode.management.cli.CliMetaData;
 import org.apache.geode.management.cli.SingleGfshCommand;
 import org.apache.geode.management.internal.cli.commands.CreateJndiBindingCommand.DATASOURCE_TYPE;
-import org.apache.geode.management.internal.cli.exceptions.EntityExistsException;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
 import org.apache.geode.management.internal.cli.functions.CreateJndiBindingFunction;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
@@ -116,19 +115,15 @@ public class CreateDataSourceCommand extends SingleGfshCommand {
       }
     }
 
-    InternalConfigurationPersistenceService service =
-        (InternalConfigurationPersistenceService) getConfigurationPersistenceService();
+    InternalConfigurationPersistenceService service = getConfigurationPersistenceService();
 
     if (service != null) {
       CacheConfig cacheConfig = service.getCacheConfig("cluster");
-      if (cacheConfig != null) {
-        JndiBindingsType.JndiBinding existing =
-            CacheElement.findElement(cacheConfig.getJndiBindings(), name);
-        if (existing != null) {
-          throw new EntityExistsException(
-              CliStrings.format("Data source named \"{0}\" already exists.", name),
-              ifNotExists);
-        }
+      if (cacheConfig != null && CacheElement.exists(cacheConfig.getJndiBindings(), name)) {
+        String message =
+            CliStrings.format("Jndi binding with jndi-name \"{0}\" already exists.", name);
+        return ifNotExists ? ResultModel.createInfo("Skipping: " + message)
+            : ResultModel.createError(message);
       }
     }
 
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java
index 0c50623..4e0f7c2 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java
@@ -238,7 +238,50 @@ public class CreateRegionCommandDUnitTest {
   @Test
   public void testCreateRegionWithInvalidPartitionResolver() throws Exception {
     gfsh.executeAndAssertThat("create region --name=" + testName.getMethodName()
-        + " --type=PARTITION --partition-resolver=InvalidPartitionResolver").statusIsError();
+        + " --type=PARTITION --partition-resolver=InvalidPartitionResolver")
+        .statusIsError()
+        .containsOutput("Could not find class");
+  }
+
+  @Test
+  public void testCreateRegionWithInvalidCustomExpiry() throws Exception {
+    gfsh.executeAndAssertThat("create region --name=" + testName.getMethodName()
+        + " --type=REPLICATE --entry-time-to-live-custom-expiry=InvalidCustomExpiry" +
+        " --enable-statistics=true")
+        .statusIsError()
+        .containsOutput("Could not find class");
+
+    gfsh.executeAndAssertThat("create region --name=" + testName.getMethodName()
+        + " --type=REPLICATE --entry-idle-time-custom-expiry=InvalidCustomExpiry" +
+        " --enable-statistics=true")
+        .statusIsError()
+        .containsOutput("Could not find class");
+  }
+
+  @Test
+  public void testCreateRegionWithInvalidCacheLoader() throws Exception {
+    gfsh.executeAndAssertThat("create region --name=" + testName.getMethodName()
+        + " --type=REPLICATE --cache-loader=InvalidCacheLoader")
+        .statusIsError()
+        .containsOutput("Could not find class");
+  }
+
+  @Test
+  public void testCreateRegionWithInvalidCacheWriter() throws Exception {
+    gfsh.executeAndAssertThat("create region --name=" + testName.getMethodName()
+        + " --type=REPLICATE --cache-writer=InvalidCacheWriter")
+        .statusIsError()
+        .containsOutput("Could not find class");
+  }
+
+  @Test
+  public void testCreateRegionWithInvalidCacheListeners() throws Exception {
+    gfsh.executeAndAssertThat("create region --name=" + testName.getMethodName()
+        + " --type=REPLICATE --cache-listener=" + TestCacheListener.class.getName()
+        + ",InvalidCacheListener")
+        .statusIsError()
+        .containsOutput("Could not find class")
+        .doesNotContainOutput("TestCacheListener");
   }
 
   @Test
@@ -250,6 +293,15 @@ public class CreateRegionCommandDUnitTest {
   }
 
   @Test
+  public void testCreateRegionFromTemplateFailsIfTemplateDoesNotExist() {
+    gfsh.executeAndAssertThat("create region --template-region=/TEMPLATE --name=/TEST"
+        + TestCacheListener.class.getName())
+        .statusIsError()
+        .containsOutput("Specify a valid region path for template-region")
+        .containsOutput("TEMPLATE not found");
+  }
+
+  @Test
   public void overrideListenerFromTemplate() throws Exception {
     gfsh.executeAndAssertThat("create region --name=/TEMPLATE --type=PARTITION_REDUNDANT"
         + " --cache-listener=" + TestCacheListener.class.getName()).statusIsSuccess();
@@ -298,7 +350,6 @@ public class CreateRegionCommandDUnitTest {
     gfsh.executeAndAssertThat("destroy region --name=/TEMPLATE").statusIsSuccess();
   }
 
-
   @Test
   public void ensureOverridingCallbacksFromTemplateDoNotRequireClassesOnLocator() throws Exception {
     final File prJarFile = new File(tmpDir.getRoot(), "myCacheListener.jar");
@@ -611,10 +662,49 @@ public class CreateRegionCommandDUnitTest {
         + " --enable-cloning=false").statusIsError();
   }
 
+  @Test
+  public void cannotColocateWithNonexistentRegion() {
+    String regionName = testName.getMethodName();
+    gfsh.executeAndAssertThat("create region"
+        + " --name=" + regionName
+        + " --type=PARTITION"
+        + " --colocated-with=/nonexistent").statusIsError()
+        .containsOutput("Specify a valid region path for colocated-with")
+        .containsOutput("nonexistent not found");
+  }
+
+  @Test
+  public void cannotColocateWithNonPartitionedRegion() {
+    gfsh.executeAndAssertThat("create region --type=REPLICATE --name=/nonpartitioned")
+        .statusIsSuccess();
+
+    String regionName = testName.getMethodName();
+    gfsh.executeAndAssertThat("create region"
+        + " --name=" + regionName
+        + " --type=PARTITION"
+        + " --colocated-with=/nonpartitioned").statusIsError()
+        .containsOutput("\"/nonpartitioned\" is not a Partitioned Region");
+  }
+
+  @Test
+  public void cannotCreateSubregionOnNonExistentParentRegion() {
+    gfsh.executeAndAssertThat("create region --type=REPLICATE --name=/nonexistentparent/child")
+        .statusIsError()
+        .containsOutput("Parent region for \"/nonexistentparent/child\" does not exist");
+  }
+
+  @Test
+  public void cannotCreateWithNonexistentGatewaySenders() {
+    gfsh.executeAndAssertThat(
+        "create region --type=REPLICATE --name=/invalid --gateway-sender-id=nonexistent")
+        .statusIsError()
+        .containsOutput("There are no GatewaySenders");
+  }
+
   /**
    * Ignored this test until we refactor the FetchRegionAttributesFunction to not use
    * AttributesFactory, and instead use RegionConfig, which we will do as part of implementing
-   * GEODE-6103
+   * GEODE-6104
    */
   @Ignore
   @Test
@@ -649,7 +739,7 @@ public class CreateRegionCommandDUnitTest {
   /**
    * Ignored this test until we refactor the FetchRegionAttributesFunction to not use
    * AttributesFactory, and instead use RegionConfig, which we will do as part of implementing
-   * GEODE-6103
+   * GEODE-6104
    */
   @Ignore
   @Test
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandPersistsConfigurationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandPersistsConfigurationDUnitTest.java
index 1e4b227..b72eacd 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandPersistsConfigurationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandPersistsConfigurationDUnitTest.java
@@ -250,6 +250,7 @@ public class CreateRegionCommandPersistsConfigurationDUnitTest {
         + " --entry-time-to-live-expiration=200"
         + " --entry-time-to-live-expiration-action=local-destroy"
         + " --eviction-action=local-destroy"
+        + " --eviction-entry-count=1000"
         + " --key-constraint=" + Object.class.getName()
         + " --off-heap=false"
         + " --region-idle-time-expiration=100"
@@ -332,9 +333,12 @@ public class CreateRegionCommandPersistsConfigurationDUnitTest {
             .describedAs("Entry time to live expiration action should be local-destroy "
                 + "for region " + name)
             .isEqualTo("local-destroy");
-        assertThat(attr.getEvictionAttributes().getLruHeapPercentage().getAction().value())
+        assertThat(attr.getEvictionAttributes().getLruEntryCount().getAction().value())
             .describedAs("Eviction action should be local-destroy for region " + name)
             .isEqualTo("local-destroy");
+        assertThat(attr.getEvictionAttributes().getLruEntryCount().getMaximum())
+            .describedAs("Eviction max should be 1000 for region " + name)
+            .isEqualTo("1000");
         assertThat(attr.getKeyConstraint())
             .describedAs("Expected key constraint to be " + Object.class.getName() +
                 " for region " + name)
@@ -434,15 +438,15 @@ public class CreateRegionCommandPersistsConfigurationDUnitTest {
 
     gfsh.executeAndAssertThat("create region"
         + " --name=" + regionName
-        + " --type=PARTITION");
+        + " --type=PARTITION_REDUNDANT").statusIsSuccess();
     gfsh.executeAndAssertThat("create region"
         + " --name=" + colocatedRegionName
         + " --colocated-with=" + regionName
-        + " --type=PARTITION");
+        + " --type=PARTITION_REDUNDANT").statusIsSuccess();
 
     gfsh.executeAndAssertThat("create region"
         + " --name=" + colocatedRegionFromTemplateName
-        + " --template-region=" + colocatedRegionName);
+        + " --template-region=" + colocatedRegionName).statusIsSuccess();
 
     locator.invoke(() -> {
       InternalConfigurationPersistenceService cc =
@@ -529,7 +533,7 @@ public class CreateRegionCommandPersistsConfigurationDUnitTest {
         + " --total-num-buckets=1").statusIsSuccess();
     gfsh.executeAndAssertThat("create region"
         + " --name=" + regionFromTemplateName
-        + " --template-region=" + regionName);
+        + " --template-region=" + regionName).statusIsSuccess();
 
     locator.invoke(() -> {
       InternalConfigurationPersistenceService cc =
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandIntegrationTest.java
index 9211ac5..a067052 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandIntegrationTest.java
@@ -65,7 +65,7 @@ public class CreateRegionCommandIntegrationTest {
   @Test
   public void parentRegionDoesNotExist() throws Exception {
     gfsh.executeAndAssertThat(CREATE_REGION + "--name=/A/B").statusIsError()
-        .containsOutput("Parent region for \"/A/B\" doesnt exist");
+        .containsOutput("Parent region for \"/A/B\" does not exist");
   }
 
   @Test
@@ -517,7 +517,8 @@ public class CreateRegionCommandIntegrationTest {
   @Test
   public void cannotSetRegionExpirationForPartitionedRegion() {
     gfsh.executeAndAssertThat(
-        "create region --enable-statistics=true --name=/FOO --type=PARTITION --region-idle-time-expiration=1 --region-time-to-live-expiration=1")
+        "create region --enable-statistics=true --name=/FOO --type=PARTITION " +
+            "--region-idle-time-expiration=1 --region-time-to-live-expiration=1")
         .statusIsError()
         .containsOutput(
             "ExpirationAction INVALIDATE or LOCAL_INVALIDATE for region is not supported for Partitioned Region");
@@ -559,7 +560,8 @@ public class CreateRegionCommandIntegrationTest {
   @Test
   public void testEvictionAttributesForLRUEntry() throws Exception {
     gfsh.executeAndAssertThat(
-        "create region --name=FOO --type=REPLICATE --eviction-entry-count=1001 --eviction-action=overflow-to-disk")
+        "create region --name=FOO --type=REPLICATE --eviction-entry-count=1001 " +
+            "--eviction-action=overflow-to-disk")
         .statusIsSuccess();
 
     Region foo = server.getCache().getRegion("/FOO");
@@ -575,7 +577,8 @@ public class CreateRegionCommandIntegrationTest {
   @Test
   public void testEvictionAttributesForLRUMemory() throws Exception {
     gfsh.executeAndAssertThat(
-        "create region --name=FOO --type=REPLICATE --eviction-max-memory=1001 --eviction-action=overflow-to-disk")
+        "create region --name=FOO --type=REPLICATE --eviction-max-memory=1001 " +
+            "--eviction-action=overflow-to-disk")
         .statusIsSuccess();
 
     Region foo = server.getCache().getRegion("/FOO");
@@ -591,7 +594,8 @@ public class CreateRegionCommandIntegrationTest {
   @Test
   public void testEvictionAttributesForObjectSizer() throws Exception {
     gfsh.executeAndAssertThat(
-        "create region --name=FOO --type=REPLICATE --eviction-max-memory=1001 --eviction-action=overflow-to-disk --eviction-object-sizer="
+        "create region --name=FOO --type=REPLICATE --eviction-max-memory=1001 " +
+            "--eviction-action=overflow-to-disk --eviction-object-sizer="
             + TestObjectSizer.class.getName())
         .statusIsSuccess();
 
@@ -609,7 +613,8 @@ public class CreateRegionCommandIntegrationTest {
   @Test
   public void testEvictionAttributesForNonDeclarableObjectSizer() throws Exception {
     gfsh.executeAndAssertThat(
-        "create region --name=FOO --type=REPLICATE --eviction-max-memory=1001 --eviction-action=overflow-to-disk --eviction-object-sizer="
+        "create region --name=FOO --type=REPLICATE --eviction-max-memory=1001 " +
+            "--eviction-action=overflow-to-disk --eviction-object-sizer="
             + TestObjectSizerNotDeclarable.class.getName())
         .statusIsError().containsOutput(
             "eviction-object-sizer must implement both ObjectSizer and Declarable interfaces");
diff --git a/geode-core/src/main/java/org/apache/geode/cache/AttributesFactory.java b/geode-core/src/main/java/org/apache/geode/cache/AttributesFactory.java
index 69c24e0..c974c14 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/AttributesFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/AttributesFactory.java
@@ -351,7 +351,7 @@ public class AttributesFactory<K, V> {
   public AttributesFactory(RegionAttributes<K, V> regionAttributes) {
     synchronized (this.regionAttributes) {
       this.regionAttributes.cacheListeners =
-          new ArrayList<CacheListener<K, V>>(Arrays.asList(regionAttributes.getCacheListeners()));
+          new ArrayList<>(Arrays.asList(regionAttributes.getCacheListeners()));
     }
     this.regionAttributes.cacheLoader = regionAttributes.getCacheLoader();
     this.regionAttributes.cacheWriter = regionAttributes.getCacheWriter();
@@ -1299,7 +1299,7 @@ public class AttributesFactory<K, V> {
       if (attrs.getDataPolicy().withReplication() && !attrs.getDataPolicy().withPersistence()
           && attrs.getScope().isDistributed()) {
         RegionAttributesImpl<?, ?> rattr = attrs;
-        if (!rattr.isForBucketRegion()) {
+        if (!attrs.isForBucketRegion()) {
           if (attrs.getEvictionAttributes().getAction().isLocalDestroy()
               || attrs.getEntryIdleTimeout().getAction().isLocal()
               || attrs.getEntryTimeToLive().getAction().isLocal()
@@ -1488,7 +1488,6 @@ public class AttributesFactory<K, V> {
     }
   }
 
-
   private static class RegionAttributesImpl<K, V> extends UserSpecifiedRegionAttributes<K, V>
       implements Cloneable, Serializable {
     public Set<String> gatewaySenderIds;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/DataPolicy.java b/geode-core/src/main/java/org/apache/geode/cache/DataPolicy.java
index 06221fd..c67f586 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/DataPolicy.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/DataPolicy.java
@@ -296,4 +296,22 @@ public class DataPolicy implements java.io.Serializable {
     String configName = this.name.toLowerCase().replace("_", "-");
     return RegionAttributesDataPolicy.fromValue(configName);
   }
+
+  public static DataPolicy fromString(String s) {
+    String[] allowedValues =
+        new String[] {"EMPTY", "NORMAL", "REPLICATE", "PERSISTENT_REPLICATE", "PARTITION",
+            "PRELOADED", "PERSISTENT_PARTITION"};
+    int valueIndex = -1;
+    for (int i = 0; i < allowedValues.length; i++) {
+      if (allowedValues[i].equals(s)) {
+        valueIndex = i;
+        break;
+      }
+    }
+
+    if (valueIndex != -1)
+      return VALUES[valueIndex];
+
+    return null;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/EvictionAction.java b/geode-core/src/main/java/org/apache/geode/cache/EvictionAction.java
index c1fbac8..4517fcf 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/EvictionAction.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/EvictionAction.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.cache;
 
+import java.io.Serializable;
+
 import javax.print.attribute.EnumSyntax;
 
 /**
@@ -23,7 +25,7 @@ import javax.print.attribute.EnumSyntax;
  * @see org.apache.geode.cache.EvictionAlgorithm
  * @see org.apache.geode.internal.cache.EvictionAttributesImpl
  */
-public final class EvictionAction extends EnumSyntax {
+public final class EvictionAction extends EnumSyntax implements Serializable {
   private static final long serialVersionUID = -98840597493242980L;
   /**
    * Canonical EvictionAction that represents no eviction
diff --git a/geode-core/src/main/java/org/apache/geode/cache/EvictionAlgorithm.java b/geode-core/src/main/java/org/apache/geode/cache/EvictionAlgorithm.java
index 8cd6c18..feff223 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/EvictionAlgorithm.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/EvictionAlgorithm.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.cache;
 
+import java.io.Serializable;
+
 import javax.print.attribute.EnumSyntax;
 
 /**
@@ -23,7 +25,7 @@ import javax.print.attribute.EnumSyntax;
  * @see org.apache.geode.cache.EvictionAction
  * @see org.apache.geode.internal.cache.EvictionAttributesImpl
  */
-public final class EvictionAlgorithm extends EnumSyntax {
+public final class EvictionAlgorithm extends EnumSyntax implements Serializable {
   private static final long serialVersionUID = 5778669432033106789L;
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/cache/EvictionAttributes.java b/geode-core/src/main/java/org/apache/geode/cache/EvictionAttributes.java
index b333b2e..fee97fb 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/EvictionAttributes.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/EvictionAttributes.java
@@ -15,6 +15,8 @@
 
 package org.apache.geode.cache;
 
+import java.util.Optional;
+
 import org.apache.geode.DataSerializable;
 import org.apache.geode.cache.configuration.EnumActionDestroyOverflow;
 import org.apache.geode.cache.configuration.RegionAttributesType;
@@ -513,20 +515,21 @@ public abstract class EvictionAttributes implements DataSerializable {
     EnumActionDestroyOverflow action = EnumActionDestroyOverflow.fromValue(this.getAction()
         .toString());
     EvictionAlgorithm algorithm = getAlgorithm();
-    String objectSizerClass = getObjectSizer().getClass().toString();
+    Optional<String> objectSizerClass = Optional.ofNullable(getObjectSizer())
+        .map(c -> c.getClass().toString());
     Integer maximum = getMaximum();
 
     if (algorithm.isLRUHeap()) {
       RegionAttributesType.EvictionAttributes.LruHeapPercentage heapPercentage =
           new RegionAttributesType.EvictionAttributes.LruHeapPercentage();
       heapPercentage.setAction(action);
-      heapPercentage.setClassName(objectSizerClass);
+      objectSizerClass.ifPresent(o -> heapPercentage.setClassName(o));
       configAttributes.setLruHeapPercentage(heapPercentage);
     } else if (algorithm.isLRUMemory()) {
       RegionAttributesType.EvictionAttributes.LruMemorySize memorySize =
           new RegionAttributesType.EvictionAttributes.LruMemorySize();
       memorySize.setAction(action);
-      memorySize.setClassName(objectSizerClass);
+      objectSizerClass.ifPresent(o -> memorySize.setClassName(o));
       memorySize.setMaximum(maximum.toString());
       configAttributes.setLruMemorySize(memorySize);
     } else {
@@ -540,8 +543,8 @@ public abstract class EvictionAttributes implements DataSerializable {
     return configAttributes;
   }
 
-  public boolean isEmpty() {
-    return getAction() == EvictionAction.NONE && getAlgorithm() == EvictionAlgorithm.NONE;
+  public boolean isNoEviction() {
+    return getAction() == EvictionAction.NONE;
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/ExpirationAction.java b/geode-core/src/main/java/org/apache/geode/cache/ExpirationAction.java
index 0b66ddc..f7167f3 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/ExpirationAction.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/ExpirationAction.java
@@ -135,6 +135,21 @@ public class ExpirationAction implements Serializable {
     }
   }
 
+  public static ExpirationAction fromString(String s) {
+    int matchValue = -1;
+    for (int i = 0; i < VALUES.length; i++) {
+      if (VALUES[i].toString().equals(s)) {
+        matchValue = i;
+        break;
+      }
+    }
+    if (matchValue != -1) {
+      return VALUES[matchValue];
+    }
+
+    return null;
+  }
+
   // The 4 declarations below are necessary for serialization
   private static int nextOrdinal = 0;
   public final int ordinal = nextOrdinal++;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/PartitionAttributes.java b/geode-core/src/main/java/org/apache/geode/cache/PartitionAttributes.java
index 2cab775..a389733 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/PartitionAttributes.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/PartitionAttributes.java
@@ -161,7 +161,8 @@ public interface PartitionAttributes<K, V> {
     configAttributes.setColocatedWith(getColocatedWith());
     configAttributes.setLocalMaxMemory(Integer.toString(getLocalMaxMemory()));
     if (getPartitionResolver() != null) {
-      configAttributes.setPartitionResolver(new DeclarableType(getPartitionResolver().getName()));
+      configAttributes
+          .setPartitionResolver(new DeclarableType(getPartitionResolver().getClass().getName()));
     }
     configAttributes.setRecoveryDelay(Long.toString(getRecoveryDelay()));
     configAttributes.setStartupRecoveryDelay(Long.toString(getStartupRecoveryDelay()));
@@ -171,5 +172,4 @@ public interface PartitionAttributes<K, V> {
 
     return configAttributes;
   }
-
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/RegionFactory.java b/geode-core/src/main/java/org/apache/geode/cache/RegionFactory.java
index 39be0f2..526939f 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/RegionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/RegionFactory.java
@@ -112,7 +112,7 @@ public class RegionFactory<K, V> {
       throw new IllegalStateException(String.format("No attributes associated with %s",
           regionAttributesId));
     }
-    this.attrsFactory = new AttributesFactory<K, V>(ra);
+    this.attrsFactory = new AttributesFactory<>(ra);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheElement.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheElement.java
index c52cdc6..070a4d2 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheElement.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheElement.java
@@ -26,6 +26,10 @@ import org.apache.geode.lang.Identifiable;
 @Experimental
 public interface CacheElement extends Identifiable<String>, Serializable {
 
+  static <T extends CacheElement> boolean exists(List<T> list, String id) {
+    return list.stream().anyMatch(o -> o.getId().equals(id));
+  }
+
   static <T extends CacheElement> T findElement(List<T> list, String id) {
     return list.stream().filter(o -> o.getId().equals(id)).findFirst().orElse(null);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/DeclarableType.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/DeclarableType.java
index 0f1c454..32c6498 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/DeclarableType.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/DeclarableType.java
@@ -17,6 +17,7 @@
 
 package org.apache.geode.cache.configuration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -61,7 +62,7 @@ import org.apache.geode.management.internal.cli.domain.ClassName;
 @XmlType(name = "declarable-type", namespace = "http://geode.apache.org/schema/cache",
     propOrder = {"parameters"})
 @Experimental
-public class DeclarableType extends ClassNameType {
+public class DeclarableType extends ClassNameType implements Serializable {
   @XmlElement(name = "parameter", namespace = "http://geode.apache.org/schema/cache")
   protected List<ParameterType> parameters;
 
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/DiskDirsType.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/DiskDirsType.java
index 817332d..b5c6989 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/DiskDirsType.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/DiskDirsType.java
@@ -18,6 +18,7 @@
 
 package org.apache.geode.cache.configuration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -58,7 +59,7 @@ import org.apache.geode.annotations.Experimental;
 @XmlType(name = "disk-dirs-type", namespace = "http://geode.apache.org/schema/cache",
     propOrder = {"diskDirs"})
 @Experimental
-public class DiskDirsType {
+public class DiskDirsType implements Serializable {
 
   @XmlElement(name = "disk-dir", namespace = "http://geode.apache.org/schema/cache",
       required = true)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesDataPolicy.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesDataPolicy.java
index 8e78016..aa17d5b 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesDataPolicy.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesDataPolicy.java
@@ -18,6 +18,8 @@
 
 package org.apache.geode.cache.configuration;
 
+import java.io.Serializable;
+
 import javax.xml.bind.annotation.XmlEnum;
 import javax.xml.bind.annotation.XmlEnumValue;
 import javax.xml.bind.annotation.XmlType;
@@ -51,7 +53,7 @@ import org.apache.geode.annotations.Experimental;
 @XmlType(name = "region-attributesData-policy", namespace = "http://geode.apache.org/schema/cache")
 @XmlEnum
 @Experimental
-public enum RegionAttributesDataPolicy {
+public enum RegionAttributesDataPolicy implements Serializable {
 
   @XmlEnumValue("empty")
   EMPTY("empty"), @XmlEnumValue("normal")
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesIndexUpdateType.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesIndexUpdateType.java
index b997248..1081714 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesIndexUpdateType.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesIndexUpdateType.java
@@ -18,6 +18,8 @@
 
 package org.apache.geode.cache.configuration;
 
+import java.io.Serializable;
+
 import javax.xml.bind.annotation.XmlEnum;
 import javax.xml.bind.annotation.XmlEnumValue;
 import javax.xml.bind.annotation.XmlType;
@@ -47,7 +49,7 @@ import org.apache.geode.annotations.Experimental;
     namespace = "http://geode.apache.org/schema/cache")
 @XmlEnum
 @Experimental
-public enum RegionAttributesIndexUpdateType {
+public enum RegionAttributesIndexUpdateType implements Serializable {
 
   @XmlEnumValue("asynchronous")
   ASYNCHRONOUS("asynchronous"), @XmlEnumValue("synchronous")
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesType.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesType.java
index 3a89bd0..6261954 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesType.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionAttributesType.java
@@ -18,6 +18,7 @@
 
 package org.apache.geode.cache.configuration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -333,7 +334,7 @@ import org.apache.geode.annotations.Experimental;
         "partitionAttributes", "membershipAttributes", "subscriptionAttributes", "cacheLoader",
         "cacheWriter", "cacheListeners", "compressor", "evictionAttributes"})
 @Experimental
-public class RegionAttributesType {
+public class RegionAttributesType implements Serializable {
 
   @XmlElement(name = "key-constraint", namespace = "http://geode.apache.org/schema/cache")
   protected Object keyConstraint;
@@ -1641,7 +1642,7 @@ public class RegionAttributesType {
    */
   @XmlAccessorType(XmlAccessType.FIELD)
   @XmlType(name = "", propOrder = {"expirationAttributes"})
-  public static class ExpirationAttributesType {
+  public static class ExpirationAttributesType implements Serializable {
 
     @XmlElement(name = "expiration-attributes", namespace = "http://geode.apache.org/schema/cache",
         required = true)
@@ -1712,7 +1713,7 @@ public class RegionAttributesType {
   @XmlType(name = "expiration-attributes-type", namespace = "http://geode.apache.org/schema/cache",
       propOrder = {"customExpiry"})
   @Experimental
-  public static class ExpirationAttributesDetail {
+  public static class ExpirationAttributesDetail implements Serializable {
     @XmlElement(name = "custom-expiry", namespace = "http://geode.apache.org/schema/cache")
     protected DeclarableType customExpiry;
     @XmlAttribute(name = "action")
@@ -1846,7 +1847,7 @@ public class RegionAttributesType {
    */
   @XmlAccessorType(XmlAccessType.FIELD)
   @XmlType(name = "", propOrder = {"lruEntryCount", "lruHeapPercentage", "lruMemorySize"})
-  public static class EvictionAttributes {
+  public static class EvictionAttributes implements Serializable {
 
     @XmlElement(name = "lru-entry-count", namespace = "http://geode.apache.org/schema/cache")
     protected RegionAttributesType.EvictionAttributes.LruEntryCount lruEntryCount;
@@ -1956,7 +1957,7 @@ public class RegionAttributesType {
      */
     @XmlAccessorType(XmlAccessType.FIELD)
     @XmlType(name = "")
-    public static class LruEntryCount {
+    public static class LruEntryCount implements Serializable {
 
       @XmlAttribute(name = "action")
       protected EnumActionDestroyOverflow action;
@@ -2168,7 +2169,7 @@ public class RegionAttributesType {
    */
   @XmlAccessorType(XmlAccessType.FIELD)
   @XmlType(name = "", propOrder = {"requiredRoles"})
-  public static class MembershipAttributes {
+  public static class MembershipAttributes implements Serializable {
 
     @XmlElement(name = "required-role", namespace = "http://geode.apache.org/schema/cache")
     protected List<RegionAttributesType.MembershipAttributes.RequiredRole> requiredRoles;
@@ -2371,7 +2372,7 @@ public class RegionAttributesType {
   @XmlAccessorType(XmlAccessType.FIELD)
   @XmlType(name = "",
       propOrder = {"partitionResolver", "partitionListeners", "fixedPartitionAttributes"})
-  public static class PartitionAttributes {
+  public static class PartitionAttributes implements Serializable {
 
     @XmlElement(name = "partition-resolver", namespace = "http://geode.apache.org/schema/cache")
     protected DeclarableType partitionResolver;
@@ -2762,7 +2763,7 @@ public class RegionAttributesType {
    */
   @XmlAccessorType(XmlAccessType.FIELD)
   @XmlType(name = "")
-  public static class SubscriptionAttributes {
+  public static class SubscriptionAttributes implements Serializable {
 
     @XmlAttribute(name = "interest-policy")
     protected String interestPolicy;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionConfig.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionConfig.java
index 2638ecf..805255c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/RegionConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.geode.cache.configuration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -392,7 +393,7 @@ public class RegionConfig implements CacheElement {
    */
   @XmlAccessorType(XmlAccessType.FIELD)
   @XmlType(name = "", propOrder = {"key", "value"})
-  public static class Entry {
+  public static class Entry implements Serializable {
 
     @XmlElement(namespace = "http://geode.apache.org/schema/cache", required = true)
     protected ObjectType key;
@@ -512,7 +513,7 @@ public class RegionConfig implements CacheElement {
    *
    */
   @XmlAccessorType(XmlAccessType.FIELD)
-  public static class Index implements CacheElement {
+  public static class Index implements CacheElement, Serializable {
     @XmlAttribute(name = "name", required = true)
     protected String name;
     @XmlAttribute(name = "expression")
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EvictionAttributesImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EvictionAttributesImpl.java
index bbf493b..392b991 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EvictionAttributesImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EvictionAttributesImpl.java
@@ -19,11 +19,14 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Declarable;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAlgorithm;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.EvictionAttributesMutator;
+import org.apache.geode.cache.configuration.RegionAttributesType;
 import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.internal.InternalDataSerializer;
 
 /**
@@ -159,5 +162,68 @@ public class EvictionAttributesImpl extends EvictionAttributes {
     return this.algorithm == EvictionAlgorithm.LIFO_MEMORY;
   }
 
+  public static EvictionAttributesImpl fromConfig(
+      RegionAttributesType.EvictionAttributes configAttributes)
+      throws ClassCastException, InstantiationException,
+      IllegalAccessException {
+    EvictionAttributesImpl evictionAttributes = new EvictionAttributesImpl();
+
+    if (configAttributes.getLruHeapPercentage() != null) {
+      evictionAttributes.setAlgorithm(EvictionAlgorithm.LRU_HEAP);
+    } else if (configAttributes.getLruEntryCount() != null) {
+      evictionAttributes.setAlgorithm(EvictionAlgorithm.LRU_ENTRY);
+    } else if (configAttributes.getLruMemorySize() != null) {
+      evictionAttributes.setAlgorithm(EvictionAlgorithm.LRU_MEMORY);
+    } else {
+      evictionAttributes.setAlgorithm(EvictionAlgorithm.NONE);
+    }
+
+    String sizerClassName = null;
+    if (configAttributes.getLruHeapPercentage() != null) {
+      sizerClassName = configAttributes.getLruHeapPercentage().getClassName();
+    } else if (configAttributes.getLruMemorySize() != null) {
+      sizerClassName = configAttributes.getLruMemorySize().getClassName();
+    }
 
+    if (sizerClassName != null) {
+      ObjectSizer sizer;
+      try {
+        sizer = (ObjectSizer) ClassPathLoader.getLatest().forName(sizerClassName).newInstance();
+      } catch (ClassNotFoundException e) {
+        sizer = ObjectSizer.DEFAULT;
+      }
+      if (sizer != null && !(sizer instanceof Declarable)) {
+        throw new ClassCastException();
+      }
+      evictionAttributes.setObjectSizer(sizer);
+    }
+
+    if (configAttributes.getLruMemorySize() != null) {
+      evictionAttributes
+          .setMaximum(Integer.valueOf(configAttributes.getLruMemorySize().getMaximum()));
+    } else if (configAttributes.getLruEntryCount() != null) {
+      evictionAttributes
+          .setMaximum(Integer.valueOf(configAttributes.getLruEntryCount().getMaximum()));
+    } else {
+      evictionAttributes.setMaximum(0);
+    }
+
+    if (configAttributes.getLruMemorySize() != null) {
+      evictionAttributes
+          .setAction(EvictionAction.parseAction(configAttributes.getLruMemorySize().getAction()
+              .value()));
+    } else if (configAttributes.getLruEntryCount() != null) {
+      evictionAttributes
+          .setAction(EvictionAction.parseAction(configAttributes.getLruEntryCount().getAction()
+              .value()));
+    } else if (configAttributes.getLruHeapPercentage() != null) {
+      evictionAttributes
+          .setAction(EvictionAction.parseAction(configAttributes.getLruHeapPercentage().getAction()
+              .value()));
+    } else {
+      evictionAttributes.setAction(EvictionAction.NONE);
+    }
+
+    return evictionAttributes;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionAttributesImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionAttributesImpl.java
index 5ef31e4..bfc9ded 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionAttributesImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionAttributesImpl.java
@@ -35,8 +35,11 @@ import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.PartitionResolver;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.cache.configuration.RegionAttributesType;
 import org.apache.geode.cache.partition.PartitionListener;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.offheap.OffHeapStorage;
@@ -787,4 +790,65 @@ public class PartitionAttributesImpl implements PartitionAttributes, Cloneable,
 
     return computeOffHeapLocalMaxMemory();
   }
+
+  public static PartitionAttributesImpl fromConfig(
+      RegionAttributesType.PartitionAttributes configAttributes) {
+    PartitionAttributesImpl partitionAttributes = new PartitionAttributesImpl();
+    if (configAttributes == null) {
+      return null;
+    }
+
+    if (configAttributes.getRedundantCopies() != null) {
+      partitionAttributes
+          .setRedundantCopies(Integer.valueOf(configAttributes.getRedundantCopies()));
+    }
+
+    if (configAttributes.getTotalMaxMemory() != null) {
+      partitionAttributes.setTotalMaxMemory(Integer.valueOf(configAttributes.getTotalMaxMemory()));
+    }
+
+    if (configAttributes.getTotalNumBuckets() != null) {
+      partitionAttributes
+          .setTotalNumBuckets(Integer.valueOf(configAttributes.getTotalNumBuckets()));
+    }
+
+    if (configAttributes.getLocalMaxMemory() != null) {
+      partitionAttributes.setLocalMaxMemory(Integer.valueOf(configAttributes.getLocalMaxMemory()));
+    }
+
+    if (configAttributes.getColocatedWith() != null) {
+      partitionAttributes.setColocatedWith(configAttributes.getColocatedWith());
+    }
+
+    if (configAttributes.getPartitionResolver() != null) {
+      try {
+        partitionAttributes.setPartitionResolver((PartitionResolver) ClassPathLoader.getLatest()
+            .forName(configAttributes.getPartitionResolver().getClassName()).newInstance());
+      } catch (Exception e) {
+      }
+    }
+
+    if (configAttributes.getRecoveryDelay() != null) {
+      partitionAttributes.setRecoveryDelay(Long.valueOf(configAttributes.getRecoveryDelay()));
+    }
+
+    if (configAttributes.getStartupRecoveryDelay() != null) {
+      partitionAttributes
+          .setStartupRecoveryDelay(Long.valueOf(configAttributes.getStartupRecoveryDelay()));
+    }
+
+    if (configAttributes.getPartitionListeners() != null) {
+      List<DeclarableType> configListeners = configAttributes.getPartitionListeners();
+      for (int i = 0; i < configListeners.size(); i++) {
+        try {
+          partitionAttributes.addPartitionListener((PartitionListener) ClassPathLoader.getLatest()
+              .forName(configListeners.get(i).getClassName())
+              .newInstance());
+        } catch (Exception e) {
+        }
+      }
+    }
+
+    return partitionAttributes;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunction.java
index ffacf4f..9bb34e0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunction.java
@@ -29,7 +29,6 @@ import org.apache.geode.security.ResourcePermission;
  * then it shouldn't be an InternalFunction.
  */
 public interface InternalFunction<T> extends Function<T>, InternalEntity {
-
   /**
    * InternalFunction do require ResourcePermissions.ALL so that it only allows super users to
    * invoke from Clients. So don't override this in implementations.
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
index 1d1da67..623fa19 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
@@ -443,12 +443,12 @@ public class CliUtil {
       }
     } catch (ClassNotFoundException | NoClassDefFoundError e) {
       throw new RuntimeException(
-          CliStrings.format(CliStrings.CREATE_REGION__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1,
+          CliStrings.format(CliStrings.ERROR__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1,
               classToLoadName, neededFor),
           e);
     } catch (ClassCastException e) {
       throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_REGION__MSG__CLASS_SPECIFIED_FOR_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE,
+          CliStrings.ERROR__MSG__CLASS_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE,
           classToLoadName, neededFor), e);
     }
 
@@ -461,11 +461,11 @@ public class CliUtil {
       instance = klass.newInstance();
     } catch (InstantiationException e) {
       throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_REGION__MSG__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1, klass,
+          CliStrings.ERROR__MSG__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1, klass,
           neededFor), e);
     } catch (IllegalAccessException e) {
       throw new RuntimeException(
-          CliStrings.format(CliStrings.CREATE_REGION__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1,
+          CliStrings.format(CliStrings.ERROR__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1,
               klass, neededFor),
           e);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateJndiBindingCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateJndiBindingCommand.java
index bf89c89..bf3d989 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateJndiBindingCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateJndiBindingCommand.java
@@ -31,7 +31,6 @@ import org.apache.geode.distributed.internal.InternalConfigurationPersistenceSer
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.management.cli.CliMetaData;
 import org.apache.geode.management.cli.SingleGfshCommand;
-import org.apache.geode.management.internal.cli.exceptions.EntityExistsException;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
 import org.apache.geode.management.internal.cli.functions.CreateJndiBindingFunction;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
@@ -150,19 +149,15 @@ public class CreateJndiBindingCommand extends SingleGfshCommand {
     if (dsConfigProperties != null && dsConfigProperties.length > 0)
       configuration.getConfigProperties().addAll(Arrays.asList(dsConfigProperties));
 
-    InternalConfigurationPersistenceService service =
-        (InternalConfigurationPersistenceService) getConfigurationPersistenceService();
+    InternalConfigurationPersistenceService service = getConfigurationPersistenceService();
 
     if (service != null) {
       CacheConfig cacheConfig = service.getCacheConfig("cluster");
-      if (cacheConfig != null) {
-        JndiBindingsType.JndiBinding existing =
-            CacheElement.findElement(cacheConfig.getJndiBindings(), jndiName);
-        if (existing != null) {
-          throw new EntityExistsException(
-              CliStrings.format("Jndi binding with jndi-name \"{0}\" already exists.", jndiName),
-              ifNotExists);
-        }
+      if (cacheConfig != null && CacheElement.exists(cacheConfig.getJndiBindings(), jndiName)) {
+        String message =
+            CliStrings.format("Jndi binding with jndi-name \"{0}\" already exists.", jndiName);
+        return ifNotExists ? ResultModel.createInfo("Skipping: " + message)
+            : ResultModel.createError(message);
       }
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommand.java
index 888287d..da8c9ec 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommand.java
@@ -16,8 +16,10 @@ package org.apache.geode.management.internal.cli.commands;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -33,7 +35,6 @@ import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.CustomExpiry;
-import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.ExpirationAction;
 import org.apache.geode.cache.Region;
@@ -61,13 +62,14 @@ import org.apache.geode.management.internal.cli.CliUtil;
 import org.apache.geode.management.internal.cli.GfshParseResult;
 import org.apache.geode.management.internal.cli.LogWrapper;
 import org.apache.geode.management.internal.cli.domain.ClassName;
+import org.apache.geode.management.internal.cli.domain.PartitionArgs;
 import org.apache.geode.management.internal.cli.domain.RegionConfigFactory;
 import org.apache.geode.management.internal.cli.exceptions.EntityExistsException;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.management.internal.cli.functions.CreateRegionFunctionArgs;
 import org.apache.geode.management.internal.cli.functions.FetchRegionAttributesFunction;
 import org.apache.geode.management.internal.cli.functions.RegionAttributesWrapper;
 import org.apache.geode.management.internal.cli.functions.RegionCreateFunction;
-import org.apache.geode.management.internal.cli.functions.RegionFunctionArgs;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
 import org.apache.geode.management.internal.cli.result.ResultBuilder;
 import org.apache.geode.management.internal.cli.result.model.ResultModel;
@@ -197,264 +199,233 @@ public class CreateRegionCommand extends SingleGfshCommand {
           CliStrings.CREATE_REGION__MSG__ONE_OF_REGIONSHORTCUT_AND_USEATTRIBUTESFROM_IS_REQUIRED);
     }
 
+    try {
+      failIfRegionAlreadyExists(regionPath, regionShortcut, groups);
+    } catch (EntityExistsException e) {
+      return ifNotExists ? ResultModel.createInfo("Skipping: " + e.getMessage())
+          : ResultModel.createError(e.getMessage());
+    }
+
     InternalCache cache = (InternalCache) getCache();
 
-    /*
-     * Adding name collision check for regions created with regionShortcut only.
-     * Regions can be categories as Proxy(replicate/partition), replicate/partition, and local
-     * For concise purpose: we call existing region (E) and region to be created (C)
-     */
-    DistributedRegionMXBean regionBean =
-        getManagementService().getDistributedRegionMXBean(regionPath);
+    // validate the parent region
+    RegionPath regionPathData = new RegionPath(regionPath);
+    if (!regionPathData.isRoot() && !regionExists(regionPathData.getParent())) {
+      return ResultModel.createError(
+          CliStrings.format(CliStrings.CREATE_REGION__MSG__PARENT_REGION_FOR_0_DOES_NOT_EXIST,
+              new Object[] {regionPath}));
+    }
 
-    if (regionBean != null && regionShortcut != null) {
-      String existingDataPolicy = regionBean.getRegionType();
-      // either C is local, or E is local or E and C are both non-proxy regions. this is to make
-      // sure local, replicate or partition regions have unique names across the entire cluster
-      if (regionShortcut.isLocal() || existingDataPolicy.equals("NORMAL")
-          || !regionShortcut.isProxy()
-              && (regionBean.getMemberCount() > regionBean.getEmptyNodes())) {
-        throw new EntityExistsException(
-            String.format("Region %s already exists on the cluster.", regionPath), ifNotExists);
-      }
+    // validate if partition args are supplied only for partitioned regions
+    PartitionArgs partitionArgs =
+        new PartitionArgs(prColocatedWith, prLocalMaxMemory, prRecoveryDelay,
+            prRedundantCopies, prStartupRecoveryDelay, prTotalMaxMemory, prTotalNumBuckets,
+            partitionResolver);
+    if (regionShortcut != null && !regionShortcut.name().startsWith("PARTITION")
+        && !partitionArgs.isEmpty()) {
+      return ResultModel.createError(CliStrings.format(
+          CliStrings.CREATE_REGION__MSG__OPTION_0_CAN_BE_USED_ONLY_FOR_PARTITIONEDREGION,
+          partitionArgs.getUserSpecifiedPartitionAttributes()) + " "
+          + CliStrings.format(CliStrings.CREATE_REGION__MSG__0_IS_NOT_A_PARITIONEDREGION,
+              regionPath));
+    }
 
-      // after this, one of E and C is proxy region or both are proxy regions.
+    // validate colocation for partitioned regions
+    if (prColocatedWith != null) {
+      DistributedRegionMXBean colocatedRegionBean =
+          getManagementService().getDistributedRegionMXBean(prColocatedWith);
 
-      // we first make sure E and C have the compatible data policy
-      if (regionShortcut.isPartition() && !existingDataPolicy.contains("PARTITION")) {
-        LogService.getLogger().info("Create region command: got EntityExists exception");
-        throw new EntityExistsException("The existing region is not a partitioned region",
-            ifNotExists);
-      }
-      if (regionShortcut.isReplicate()
-          && !(existingDataPolicy.equals("EMPTY") || existingDataPolicy.contains("REPLICATE")
-              || existingDataPolicy.contains("PRELOADED"))) {
-        throw new EntityExistsException("The existing region is not a replicate region",
-            ifNotExists);
-      }
-      // then we make sure E and C are on different members
-      Set<String> membersWithThisRegion =
-          Arrays.stream(regionBean.getMembers()).collect(Collectors.toSet());
-      Set<String> membersWithinGroup = findMembers(groups, null).stream()
-          .map(DistributedMember::getName).collect(Collectors.toSet());
-      if (!Collections.disjoint(membersWithinGroup, membersWithThisRegion)) {
-        throw new EntityExistsException(
-            String.format("Region %s already exists on these members: %s.", regionPath,
-                StringUtils.join(membersWithThisRegion, ",")),
-            ifNotExists);
+      if (colocatedRegionBean == null) {
+        return ResultModel.createError(CliStrings.format(
+            CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_REGION_PATH_FOR_0_REGIONPATH_1_NOT_FOUND,
+            CliStrings.CREATE_REGION__COLOCATEDWITH, prColocatedWith));
       }
-    }
 
-    // validating the parent region
-    RegionPath regionPathData = new RegionPath(regionPath);
-    String parentRegionPath = regionPathData.getParent();
-    if (parentRegionPath != null && !Region.SEPARATOR.equals(parentRegionPath)) {
-      if (!regionExists(cache, parentRegionPath)) {
-        return ResultModel.createError(
-            CliStrings.format(CliStrings.CREATE_REGION__MSG__PARENT_REGION_FOR_0_DOES_NOT_EXIST,
-                new Object[] {regionPath}));
+      if (!colocatedRegionBean.getRegionType().equals("PARTITION") &&
+          !colocatedRegionBean.getRegionType().equals("PERSISTENT_PARTITION")) {
+        return ResultModel.createError(CliStrings.format(
+            CliStrings.CREATE_REGION__MSG__COLOCATEDWITH_REGION_0_IS_NOT_PARTITIONEDREGION,
+            new String[] {prColocatedWith}));
       }
     }
 
-    // creating the RegionFunctionArgs
-    RegionFunctionArgs functionArgs = new RegionFunctionArgs();
-    functionArgs.setRegionPath(regionPath);
-    functionArgs.setIfNotExists(ifNotExists);
-    functionArgs.setStatisticsEnabled(statisticsEnabled);
-    functionArgs.setEntryExpirationIdleTime(entryExpirationIdleTime, entryExpirationIdleTimeAction);
-    functionArgs.setEntryExpirationTTL(entryExpirationTTL, entryExpirationTTLAction);
-    functionArgs.setRegionExpirationIdleTime(regionExpirationIdleTime,
-        regionExpirationIdleTimeAction);
-    functionArgs.setRegionExpirationTTL(regionExpirationTTL, regionExpirationTTLAction);
-    functionArgs.setEntryIdleTimeCustomExpiry(entryIdleTimeCustomExpiry);
-    functionArgs.setEntryTTLCustomExpiry(entryTTLCustomExpiry);
-    functionArgs.setEvictionAttributes(evictionAction, evictionMaxMemory, evictionEntryCount,
-        evictionObjectSizer);
-    functionArgs.setDiskStore(diskStore);
-    functionArgs.setDiskSynchronous(diskSynchronous);
-    functionArgs.setEnableAsyncConflation(enableAsyncConflation);
-    functionArgs.setEnableSubscriptionConflation(enableSubscriptionConflation);
-    functionArgs.setAsyncEventQueueIds(asyncEventQueueIds);
-    functionArgs.setGatewaySenderIds(gatewaySenderIds);
-    functionArgs.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
-    functionArgs.setCloningEnabled(cloningEnabled);
-    functionArgs.setConcurrencyLevel(concurrencyLevel);
-    functionArgs.setPartitionArgs(prColocatedWith, prLocalMaxMemory, prRecoveryDelay,
-        prRedundantCopies, prStartupRecoveryDelay, prTotalMaxMemory, prTotalNumBuckets,
-        partitionResolver);
-    functionArgs.setOffHeap(offHeap);
-    functionArgs.setMcastEnabled(mcastEnabled);
-
-    RegionAttributes<?, ?> regionAttributes = null;
-    if (regionShortcut != null) {
-      if (!regionShortcut.name().startsWith("PARTITION") && functionArgs.hasPartitionAttributes()) {
-        return ResultModel.createError(CliStrings.format(
-            CliStrings.CREATE_REGION__MSG__OPTION_0_CAN_BE_USED_ONLY_FOR_PARTITIONEDREGION,
-            functionArgs.getPartitionArgs().getUserSpecifiedPartitionAttributes()) + " "
-            + CliStrings.format(CliStrings.CREATE_REGION__MSG__0_IS_NOT_A_PARITIONEDREGION,
-                regionPath));
+    // validate gateway senders
+    if (gatewaySenderIds != null) {
+      Set<String> existingGatewaySenders =
+          Arrays.stream(getDSMBean().listGatewaySenders()).collect(Collectors.toSet());
+      if (existingGatewaySenders.isEmpty()) {
+        return ResultModel
+            .createError(CliStrings.CREATE_REGION__MSG__NO_GATEWAYSENDERS_IN_THE_SYSTEM);
       }
-      functionArgs.setRegionShortcut(regionShortcut);
-      functionArgs.setRegionAttributes(cache.getRegionAttributes(regionShortcut.toString()));
-    } else { // templateRegion != null
-      if (!regionExists(cache, templateRegion)) {
+
+      if (Arrays.stream(gatewaySenderIds).anyMatch(id -> !existingGatewaySenders.contains(id))) {
         return ResultModel.createError(CliStrings.format(
-            CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_REGION_PATH_FOR_0_REGIONPATH_1_NOT_FOUND,
-            CliStrings.CREATE_REGION__USEATTRIBUTESFROM, templateRegion));
+            CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_GATEWAYSENDER_ID_UNKNOWN_0,
+            (Object[]) gatewaySenderIds));
       }
+    }
 
-      RegionAttributesWrapper<?, ?> wrappedAttributes = getRegionAttributes(cache, templateRegion);
+    // validate if template region exists, if provided
+    if (templateRegion != null && !regionExists(templateRegion)) {
+      return ResultModel.createError(CliStrings.format(
+          CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_REGION_PATH_FOR_0_REGIONPATH_1_NOT_FOUND,
+          CliStrings.CREATE_REGION__USEATTRIBUTESFROM, templateRegion));
+    }
 
-      if (wrappedAttributes == null) {
+    // get predefined attributes for a template region
+    RegionAttributesWrapper<?, ?> wrappedTemplateAttributes = null;
+    if (templateRegion != null) {
+      wrappedTemplateAttributes = getRegionAttributes(cache, templateRegion);
+      if (wrappedTemplateAttributes == null) {
         return ResultModel.createError(CliStrings.format(
             CliStrings.CREATE_REGION__MSG__COULD_NOT_RETRIEVE_REGION_ATTRS_FOR_PATH_0_VERIFY_REGION_EXISTS,
             templateRegion));
       }
 
-      if (wrappedAttributes.getRegionAttributes().getPartitionAttributes() == null
-          && functionArgs.hasPartitionAttributes()) {
+      if (wrappedTemplateAttributes.getRegionAttributes().getPartitionAttributes() == null
+          && !partitionArgs.isEmpty()) {
         return ResultModel.createError(CliStrings.format(
             CliStrings.CREATE_REGION__MSG__OPTION_0_CAN_BE_USED_ONLY_FOR_PARTITIONEDREGION,
-            functionArgs.getPartitionArgs().getUserSpecifiedPartitionAttributes()) + " "
+            partitionArgs.getUserSpecifiedPartitionAttributes()) + " "
             + CliStrings.format(CliStrings.CREATE_REGION__MSG__0_IS_NOT_A_PARITIONEDREGION,
                 templateRegion));
       }
-      functionArgs.setTemplateRegion(templateRegion);
+    }
+
+    RegionAttributes<?, ?> regionAttributes;
+    if (wrappedTemplateAttributes != null) {
+      regionAttributes = wrappedTemplateAttributes.getRegionAttributes();
+    } else {
+      regionAttributes = cache.getRegionAttributes(regionShortcut.toString());
+    }
 
-      // These attributes will have the actual callback fields (if previously present) nulled out.
-      functionArgs.setRegionAttributes(wrappedAttributes.getRegionAttributes());
+    // validating diskstore with other attributes
+    if (diskStore != null && !regionAttributes.getDataPolicy().withPersistence()) {
+      String subMessage = "Only regions with persistence or overflow to disk can specify DiskStore";
+      String message = subMessage + ". "
+          + CliStrings.format(
+              CliStrings.CREATE_REGION__MSG__USE_ATTRIBUTES_FROM_REGION_0_IS_NOT_WITH_PERSISTENCE,
+              new Object[] {templateRegion});
+
+      return ResultModel.createError(message);
+    }
+
+    if (diskStore != null && !diskStoreExists(diskStore)) {
+      return ResultModel.createError(CliStrings.format(
+          CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_DISKSTORE_UNKNOWN_DISKSTORE_0,
+          new Object[] {diskStore}));
+    }
 
-      functionArgs
-          .setCacheListeners(wrappedAttributes.getCacheListenerClasses().toArray(new ClassName[0]));
-      functionArgs.setCacheWriter(wrappedAttributes.getCacheWriterClass());
-      functionArgs.setCacheLoader(wrappedAttributes.getCacheLoaderClass());
-      functionArgs.setCompressor(wrappedAttributes.getCompressorClass());
-      functionArgs.setKeyConstraint(wrappedAttributes.getKeyConstraintClass());
-      functionArgs.setValueConstraint(wrappedAttributes.getValueConstraintClass());
+    // additional authorization
+    if (isAttributePersistent(regionAttributes)) {
+      authorize(ResourcePermission.Resource.CLUSTER, ResourcePermission.Operation.WRITE,
+          ResourcePermission.Target.DISK);
+    }
+
+    // validating the groups
+    Set<DistributedMember> membersToCreateRegionOn = findMembers(groups, null);
+    if (membersToCreateRegionOn.isEmpty()) {
+      if (groups == null || groups.length == 0) {
+        return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
+      }
+
+      return ResultModel.createError(
+          CliStrings.format(CliStrings.CREATE_REGION__MSG__GROUPS_0_ARE_INVALID,
+              (Object[]) groups));
     }
 
+    // generate the RegionConfig object for passing to distributed function and persisting
+    Set<ClassName<CacheListener>> cacheListeners = new HashSet<>();
     if (cacheListener != null) {
-      functionArgs.setCacheListeners(cacheListener);
+      Arrays.stream(cacheListener).forEach(c -> cacheListeners.add(c));
+    } else if (wrappedTemplateAttributes != null
+        && wrappedTemplateAttributes.getCacheListenerClasses() != null) {
+      cacheListeners.addAll(wrappedTemplateAttributes.getCacheListenerClasses());
     }
 
+    ClassName<CacheLoader> cacheLoaderClassNameToPersist = null;
     if (cacheLoader != null) {
-      functionArgs.setCacheLoader(cacheLoader);
+      cacheLoaderClassNameToPersist = cacheLoader;
+    } else if (wrappedTemplateAttributes != null
+        && wrappedTemplateAttributes.getCacheLoaderClass() != null) {
+      cacheLoaderClassNameToPersist = wrappedTemplateAttributes.getCacheLoaderClass();
     }
 
+    ClassName<CacheWriter> cacheWriterClassNameToPersist = null;
     if (cacheWriter != null) {
-      functionArgs.setCacheWriter(cacheWriter);
+      cacheWriterClassNameToPersist = cacheWriter;
+    } else if (wrappedTemplateAttributes != null
+        && wrappedTemplateAttributes.getCacheWriterClass() != null) {
+      cacheWriterClassNameToPersist = wrappedTemplateAttributes.getCacheWriterClass();
     }
 
+    String compressorClassNameToPersist = null;
     if (compressor != null) {
-      functionArgs.setCompressor(compressor);
+      compressorClassNameToPersist = compressor;
+    } else if (wrappedTemplateAttributes != null
+        && wrappedTemplateAttributes.getCompressorClass() != null) {
+      compressorClassNameToPersist = wrappedTemplateAttributes.getCompressorClass();
     }
 
+    String keyConstraintToPersist = null;
     if (keyConstraint != null) {
-      functionArgs.setKeyConstraint(keyConstraint);
+      keyConstraintToPersist = keyConstraint;
+    } else if (wrappedTemplateAttributes != null
+        && wrappedTemplateAttributes.getKeyConstraintClass() != null) {
+      keyConstraintToPersist = wrappedTemplateAttributes.getKeyConstraintClass();
     }
 
+    String valueConstraintToPersist = null;
     if (valueConstraint != null) {
-      functionArgs.setValueConstraint(valueConstraint);
-    }
-
-    DistributedSystemMXBean dsMBean = getDSMBean();
-    // validating colocation
-    if (functionArgs.hasPartitionAttributes()) {
-      if (prColocatedWith != null) {
-        ManagementService mgmtService = getManagementService();
-        DistributedRegionMXBean distributedRegionMXBean =
-            mgmtService.getDistributedRegionMXBean(prColocatedWith);
-        if (distributedRegionMXBean == null) {
-          return ResultModel.createError(CliStrings.format(
-              CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_REGION_PATH_FOR_0_REGIONPATH_1_NOT_FOUND,
-              CliStrings.CREATE_REGION__COLOCATEDWITH, prColocatedWith));
-        }
-        String regionType = distributedRegionMXBean.getRegionType();
-        if (!(DataPolicy.PARTITION.toString().equals(regionType)
-            || DataPolicy.PERSISTENT_PARTITION.toString().equals(regionType))) {
-          return ResultModel.createError(CliStrings.format(
-              CliStrings.CREATE_REGION__MSG__COLOCATEDWITH_REGION_0_IS_NOT_PARTITIONEDREGION,
-              new Object[] {prColocatedWith}));
-        }
-      }
-    }
-
-    // validating gateway senders
-    if (gatewaySenderIds != null) {
-      Set<String> existingGatewaySenders =
-          Arrays.stream(dsMBean.listGatewaySenders()).collect(Collectors.toSet());
-      if (existingGatewaySenders.size() == 0) {
-        return ResultModel
-            .createError(CliStrings.CREATE_REGION__MSG__NO_GATEWAYSENDERS_IN_THE_SYSTEM);
-      } else {
-        Set<String> specifiedGatewaySenders =
-            Arrays.stream(gatewaySenderIds).collect(Collectors.toSet());
-        specifiedGatewaySenders.removeAll(existingGatewaySenders);
-        if (!specifiedGatewaySenders.isEmpty()) {
-          return ResultModel.createError(CliStrings.format(
-              CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_GATEWAYSENDER_ID_UNKNOWN_0,
-              (Object[]) gatewaySenderIds));
-        }
-      }
+      valueConstraintToPersist = valueConstraint;
+    } else if (wrappedTemplateAttributes != null
+        && wrappedTemplateAttributes.getValueConstraintClass() != null) {
+      valueConstraintToPersist = wrappedTemplateAttributes.getValueConstraintClass();
     }
 
-    // validating diskstore with other attributes
-    if (diskStore != null) {
-      regionAttributes = functionArgs.getRegionAttributes();
-      if (regionAttributes != null && !regionAttributes.getDataPolicy().withPersistence()) {
-        String subMessage =
-            "Only regions with persistence or overflow to disk can specify DiskStore";
-        String message = subMessage + ". "
-            + CliStrings.format(
-                CliStrings.CREATE_REGION__MSG__USE_ATTRIBUTES_FROM_REGION_0_IS_NOT_WITH_PERSISTENCE,
-                new Object[] {String.valueOf(functionArgs.getTemplateRegion())});
-
-        return ResultModel.createError(message);
-      }
+    Set<String> asyncEventQueueIdSet = Optional.ofNullable(asyncEventQueueIds)
+        .map(a -> Arrays.stream(a).collect(Collectors.toSet()))
+        .orElse(null);
+    Set<String> gatewaySenderIdSet = Optional.ofNullable(gatewaySenderIds)
+        .map(a -> Arrays.stream(a).collect(Collectors.toSet()))
+        .orElse(null);
+
+    RegionConfig config = (new RegionConfigFactory()).generate(regionPath, keyConstraintToPersist,
+        valueConstraintToPersist, statisticsEnabled, entryExpirationIdleTime,
+        entryExpirationIdleTimeAction, entryExpirationTTL, entryExpirationTTLAction,
+        entryIdleTimeCustomExpiry,
+        entryTTLCustomExpiry, regionExpirationIdleTime, regionExpirationIdleTimeAction,
+        regionExpirationTTL, regionExpirationTTLAction, evictionAction, evictionMaxMemory,
+        evictionEntryCount, evictionObjectSizer, diskStore, diskSynchronous, enableAsyncConflation,
+        enableSubscriptionConflation, cacheListeners, cacheLoaderClassNameToPersist,
+        cacheWriterClassNameToPersist,
+        asyncEventQueueIdSet, gatewaySenderIdSet, concurrencyChecksEnabled, cloningEnabled,
+        mcastEnabled,
+        concurrencyLevel, partitionArgs, compressorClassNameToPersist, offHeap, regionAttributes);
 
-      if (!diskStoreExists(cache, diskStore)) {
-        return ResultModel.createError(CliStrings.format(
-            CliStrings.CREATE_REGION__MSG__SPECIFY_VALID_DISKSTORE_UNKNOWN_DISKSTORE_0,
-            new Object[] {diskStore}));
-      }
-    }
-
-    // additional authorization
-    if ((functionArgs.getRegionShortcut() != null
-        && functionArgs.getRegionShortcut().isPersistent())
-        || isAttributePersistent(functionArgs.getRegionAttributes())) {
-      authorize(ResourcePermission.Resource.CLUSTER, ResourcePermission.Operation.WRITE,
-          ResourcePermission.Target.DISK);
-    }
-
-    // validating the groups
-    Set<DistributedMember> membersToCreateRegionOn = findMembers(groups, null);
-    // just in case we found no members with this group name
-    if (membersToCreateRegionOn.isEmpty()) {
-      if (groups == null || groups.length == 0) {
-        return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
-      } else {
-        return ResultModel.createError(
-            CliStrings.format(CliStrings.CREATE_REGION__MSG__GROUPS_0_ARE_INVALID,
-                (Object[]) groups));
-      }
-    }
+    // creating the RegionFunctionArgs
+    CreateRegionFunctionArgs functionArgs =
+        new CreateRegionFunctionArgs(regionPath, config, ifNotExists);
 
     List<CliFunctionResult> regionCreateResults = executeAndGetFunctionResult(
         RegionCreateFunction.INSTANCE, functionArgs, membersToCreateRegionOn);
 
     ResultModel resultModel = ResultModel.createMemberStatusResult(regionCreateResults);
-    InternalConfigurationPersistenceService service =
-        (InternalConfigurationPersistenceService) getConfigurationPersistenceService();
-
+    InternalConfigurationPersistenceService service = getConfigurationPersistenceService();
     if (service == null) {
       return resultModel;
     }
 
-    // otherwise, prepare the regionConfig for persistence
+    if (resultModel.isSuccessful() && regionCreateResults.stream()
+        .anyMatch(
+            res -> res.getStatusMessage() != null && res.getStatusMessage().contains("Skipping"))) {
+      return resultModel;
+    }
+
+    // persist the RegionConfig object if the function is successful on all members
     if (resultModel.isSuccessful()) {
       verifyDistributedRegionMbean(cache, regionPath);
-      RegionConfig config = (new RegionConfigFactory()).generate(functionArgs);
+
       // the following is a temporary solution before lucene make the change to create region first
       // before creating the lucene index.
       // GEODE-3924
@@ -468,14 +439,13 @@ public class CreateRegionCommand extends SingleGfshCommand {
       List<CacheElement> extensions = regionConfigFromServer.getCustomRegionElements();
       config.getCustomRegionElements().addAll(extensions);
 
-      resultModel.setConfigObject(new CreateRegionResultConfig(config,
-          functionArgs.getRegionPath()));
+      resultModel.setConfigObject(new CreateRegionResult(config, regionPath));
     }
 
     return resultModel;
   }
 
-  private class CreateRegionResultConfig {
+  private class CreateRegionResult {
     RegionConfig getRegionConfig() {
       return regionConfig;
     }
@@ -487,7 +457,7 @@ public class CreateRegionCommand extends SingleGfshCommand {
     private final RegionConfig regionConfig;
     private final String fullRegionPath;
 
-    public CreateRegionResultConfig(RegionConfig regionConfig, String fullRegionPath) {
+    public CreateRegionResult(RegionConfig regionConfig, String fullRegionPath) {
       this.regionConfig = regionConfig;
       this.fullRegionPath = fullRegionPath;
     }
@@ -499,7 +469,7 @@ public class CreateRegionCommand extends SingleGfshCommand {
       return false;
     }
 
-    CreateRegionResultConfig regionResultConfigObject = (CreateRegionResultConfig) configObject;
+    CreateRegionResult regionResultConfigObject = (CreateRegionResult) configObject;
     RegionConfig regionConfig = regionResultConfigObject.getRegionConfig();
     String regionPath = regionResultConfigObject.getFullRegionPath();
 
@@ -605,6 +575,53 @@ public class CreateRegionCommand extends SingleGfshCommand {
     return attributes;
   }
 
+  private void failIfRegionAlreadyExists(String regionPath, RegionShortcut regionShortcut,
+      String[] groups) throws EntityExistsException {
+    /*
+     * Adding name collision check for regions created with regionShortcut only.
+     * Regions can be categories as Proxy(replicate/partition), replicate/partition, and local
+     * For concise purpose: we call existing region (E) and region to be created (C)
+     */
+    DistributedRegionMXBean regionBean =
+        getManagementService().getDistributedRegionMXBean(regionPath);
+    if (regionBean == null || regionShortcut == null) {
+      return;
+    }
+
+    String existingDataPolicy = regionBean.getRegionType();
+    // either C is local, or E is local or E and C are both non-proxy regions. this is to make
+    // sure local, replicate or partition regions have unique names across the entire cluster
+    if (regionShortcut.isLocal() || existingDataPolicy.equals("NORMAL") || !regionShortcut.isProxy()
+        && (regionBean.getMemberCount() > regionBean.getEmptyNodes())) {
+      throw new EntityExistsException(
+          String.format("Region %s already exists on the cluster.", regionPath));
+    }
+
+    // after this, one of E and C is proxy region or both are proxy regions.
+
+    // we first make sure E and C have the compatible data policy
+    if (regionShortcut.isPartition() && !existingDataPolicy.contains("PARTITION")) {
+      LogService.getLogger().info("Create region command: got EntityExists exception");
+      throw new EntityExistsException("The existing region is not a partitioned region");
+    }
+
+    if (regionShortcut.isReplicate() && !existingDataPolicy.equals("EMPTY")
+        && !existingDataPolicy.contains("REPLICATE") && !existingDataPolicy.contains("PRELOADED")) {
+      throw new EntityExistsException("The existing region is not a replicate region");
+    }
+
+    // then we make sure E and C are on different members
+    Set<String> membersWithThisRegion =
+        Arrays.stream(regionBean.getMembers()).collect(Collectors.toSet());
+    Set<String> membersWithinGroup = findMembers(groups, null).stream()
+        .map(DistributedMember::getName).collect(Collectors.toSet());
+    if (!Collections.disjoint(membersWithinGroup, membersWithThisRegion)) {
+      throw new EntityExistsException(
+          String.format("Region %s already exists on these members: %s.", regionPath,
+              StringUtils.join(membersWithThisRegion, ",")));
+    }
+  }
+
   private boolean isClusterWideSameConfig(InternalCache cache, String regionPath) {
     ManagementService managementService = getManagementService();
 
@@ -638,7 +655,7 @@ public class CreateRegionCommand extends SingleGfshCommand {
     return true;
   }
 
-  boolean regionExists(InternalCache cache, String regionPath) {
+  boolean regionExists(String regionPath) {
     if (regionPath == null || Region.SEPARATOR.equals(regionPath)) {
       return false;
     }
@@ -650,7 +667,7 @@ public class CreateRegionCommand extends SingleGfshCommand {
     return Arrays.stream(allRegionPaths).anyMatch(regionPath::equals);
   }
 
-  private boolean diskStoreExists(InternalCache cache, String diskStoreName) {
+  private boolean diskStoreExists(String diskStoreName) {
     ManagementService managementService = getManagementService();
     DistributedSystemMXBean dsMXBean = managementService.getDistributedSystemMXBean();
     Map<String, String[]> diskstore = dsMXBean.listMemberDiskstore();
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/ExpirationArgs.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/ExpirationArgs.java
new file mode 100644
index 0000000..97835aa
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/ExpirationArgs.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.domain;
+
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+
+public class ExpirationArgs {
+  private final Integer time;
+  private final ExpirationAction action;
+
+  public ExpirationArgs(Integer time, ExpirationAction action) {
+    this.time = time;
+    this.action = action;
+  }
+
+  public Integer getTime() {
+    return time;
+  }
+
+  public ExpirationAction getAction() {
+    return action;
+  }
+
+  public ExpirationAttributes getExpirationAttributes() {
+    return getExpirationAttributes(null);
+  }
+
+  public ExpirationAttributes getExpirationAttributes(ExpirationAttributes existing) {
+    // default values
+    int timeToUse = 0;
+    ExpirationAction actionToUse = ExpirationAction.INVALIDATE;
+
+    if (existing != null) {
+      timeToUse = existing.getTimeout();
+      actionToUse = existing.getAction();
+    }
+    if (time != null) {
+      timeToUse = time;
+    }
+
+    if (action != null) {
+      actionToUse = action;
+    }
+    return new ExpirationAttributes(timeToUse, actionToUse);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/PartitionArgs.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/PartitionArgs.java
new file mode 100644
index 0000000..30b4644
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/PartitionArgs.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.domain;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+
+public class PartitionArgs {
+  public String prColocatedWith;
+  public Integer prLocalMaxMemory;
+  public Long prRecoveryDelay;
+  public Integer prRedundantCopies;
+  public Long prStartupRecoveryDelay;
+  public Long prTotalMaxMemory;
+  public Integer prTotalNumBuckets;
+  public String partitionResolver;
+
+  public PartitionArgs(String prColocatedWith, Integer prLocalMaxMemory, Long prRecoveryDelay,
+      Integer prRedundantCopies, Long prStartupRecoveryDelay, Long prTotalMaxMemory,
+      Integer prTotalNumBuckets, String partitionResolver) {
+    this.prColocatedWith = prColocatedWith;
+    this.prLocalMaxMemory = prLocalMaxMemory;
+    this.prRecoveryDelay = prRecoveryDelay;
+    this.prRedundantCopies = prRedundantCopies;
+    this.prStartupRecoveryDelay = prStartupRecoveryDelay;
+    this.prTotalMaxMemory = prTotalMaxMemory;
+    this.prTotalNumBuckets = prTotalNumBuckets;
+    this.partitionResolver = partitionResolver;
+  }
+
+  public boolean isEmpty() {
+    return prColocatedWith == null &&
+        prLocalMaxMemory == null &&
+        prRecoveryDelay == null &&
+        prRedundantCopies == null &&
+        prStartupRecoveryDelay == null &&
+        prTotalMaxMemory == null &&
+        prTotalNumBuckets == null &&
+        partitionResolver == null;
+  }
+
+  /*
+   * This method is duplicated in RegionFunctionArgs.PartitionArgs, but the latter
+   * will be removed after we refactor AlterRegionCommand to not use RegionFunctionArgs,
+   * because at that point RegionFunctionArgs will be unused.
+   * GEODE-5971
+   */
+  public Set<String> getUserSpecifiedPartitionAttributes() {
+    Set<String> userSpecifiedPartitionAttributes = new HashSet<>();
+
+    if (this.prColocatedWith != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__COLOCATEDWITH);
+    }
+    if (this.prLocalMaxMemory != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__LOCALMAXMEMORY);
+    }
+    if (this.prRecoveryDelay != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__RECOVERYDELAY);
+    }
+    if (this.prRedundantCopies != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__REDUNDANTCOPIES);
+    }
+    if (this.prStartupRecoveryDelay != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__STARTUPRECOVERYDDELAY);
+    }
+    if (this.prTotalMaxMemory != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__TOTALMAXMEMORY);
+    }
+    if (this.prTotalNumBuckets != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__TOTALNUMBUCKETS);
+    }
+    if (this.partitionResolver != null) {
+      userSpecifiedPartitionAttributes.add(CliStrings.CREATE_REGION__PARTITION_RESOLVER);
+    }
+
+    return userSpecifiedPartitionAttributes;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactory.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactory.java
index efcf313..f7f7aaf 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactory.java
@@ -15,197 +15,298 @@
 package org.apache.geode.management.internal.cli.domain;
 
 import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.CustomExpiry;
 import org.apache.geode.cache.ExpirationAction;
 import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.configuration.ClassNameType;
 import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.cache.configuration.EnumActionDestroyOverflow;
 import org.apache.geode.cache.configuration.RegionAttributesScope;
 import org.apache.geode.cache.configuration.RegionAttributesType;
 import org.apache.geode.cache.configuration.RegionConfig;
-import org.apache.geode.management.internal.cli.functions.RegionFunctionArgs;
 
 public class RegionConfigFactory {
-  public RegionConfig generate(RegionFunctionArgs args) {
+  public RegionConfig generate(
+      String regionPath,
+      String keyConstraint,
+      String valueConstraint,
+      Boolean statisticsEnabled,
+      Integer entryExpirationIdleTime,
+      ExpirationAction entryExpirationIdleAction,
+      Integer entryExpirationTTL,
+      ExpirationAction entryExpirationTTLAction,
+      ClassName<CustomExpiry> entryIdleTimeCustomExpiry,
+      ClassName<CustomExpiry> entryTTLCustomExpiry,
+      Integer regionExpirationIdleTime,
+      ExpirationAction regionExpirationIdleAction,
+      Integer regionExpirationTTL,
+      ExpirationAction regionExpirationTTLAction,
+      String evictionAction,
+      Integer evictionMaxMemory,
+      Integer evictionEntryCount,
+      String evictionObjectSizer,
+      String diskStore,
+      Boolean diskSynchronous,
+      Boolean enableAsyncConflation,
+      Boolean enableSubscriptionConflation,
+      Set<ClassName<CacheListener>> cacheListeners,
+      ClassName<CacheLoader> cacheLoader,
+      ClassName<CacheWriter> cacheWriter,
+      Set<String> asyncEventQueueIds,
+      Set<String> gatewaySenderIds,
+      Boolean concurrencyChecksEnabled,
+      Boolean cloningEnabled,
+      Boolean mcastEnabled,
+      Integer concurrencyLevel,
+      PartitionArgs partitionArgs,
+      String compressor,
+      Boolean offHeap,
+      RegionAttributes<?, ?> regionAttributes) {
+
     RegionConfig regionConfig = new RegionConfig();
-    regionConfig.setName(getLeafRegion(args.getRegionPath()));
+    regionConfig.setName(getLeafRegion(regionPath));
     RegionAttributesType regionAttributesType = new RegionAttributesType();
     regionConfig.setRegionAttributes(regionAttributesType);
 
-    RegionAttributes<?, ?> regionAttributes = args.getRegionAttributes();
-
-    if (args.getKeyConstraint() != null) {
-      regionAttributesType.setKeyConstraint(args.getKeyConstraint());
+    if (keyConstraint != null) {
+      regionAttributesType.setKeyConstraint(keyConstraint);
     }
 
-    if (args.getValueConstraint() != null) {
-      regionAttributesType.setValueConstraint(args.getValueConstraint());
+    if (valueConstraint != null) {
+      regionAttributesType.setValueConstraint(valueConstraint);
     }
 
-    if (args.getStatisticsEnabled() != null) {
-      regionAttributesType.setStatisticsEnabled(args.getStatisticsEnabled());
+    if (statisticsEnabled != null) {
+      regionAttributesType.setStatisticsEnabled(statisticsEnabled);
     } else if (regionAttributes != null) {
       regionAttributesType.setStatisticsEnabled(regionAttributes.getStatisticsEnabled());
     }
 
     // first get the expiration attributes from the command options
-    regionAttributesType.setEntryIdleTime(getExpirationAttributes(args.getEntryExpirationIdleTime(),
-        args.getEntryIdleTimeCustomExpiry()));
-    regionAttributesType.setEntryTimeToLive(
-        getExpirationAttributes(args.getEntryExpirationTTL(), args.getEntryTTLCustomExpiry()));
+    regionAttributesType.setEntryIdleTime(
+        getExpirationAttributes(entryExpirationIdleTime, entryExpirationIdleAction,
+            entryIdleTimeCustomExpiry));
     regionAttributesType
-        .setRegionIdleTime(getExpirationAttributes(args.getRegionExpirationIdleTime(), null));
+        .setEntryTimeToLive(getExpirationAttributes(entryExpirationTTL, entryExpirationTTLAction,
+            entryTTLCustomExpiry));
+    regionAttributesType.setRegionIdleTime(
+        getExpirationAttributes(regionExpirationIdleTime, regionExpirationIdleAction,
+            null));
     regionAttributesType
-        .setRegionTimeToLive(getExpirationAttributes(args.getRegionExpirationTTL(), null));
+        .setRegionTimeToLive(getExpirationAttributes(regionExpirationTTL, regionExpirationTTLAction,
+            null));
 
-    // if regionAttributes has these attributes, then use that
+    // if regionAttributes has these attributes, then use them instead
     if (regionAttributes != null) {
-      if (regionAttributesType.getEntryIdleTime() == null) {
+      ExpirationAttributes entryIdleTimeout = regionAttributes.getEntryIdleTimeout();
+      if (entryIdleTimeout != null && !entryIdleTimeout.isDefault()
+          && regionAttributesType.getEntryIdleTime() == null) {
         regionAttributesType.setEntryIdleTime(getExpirationAttributes(
-            regionAttributes.getEntryIdleTimeout(), regionAttributes.getCustomEntryIdleTimeout()));
+            entryIdleTimeout.getTimeout(), entryIdleTimeout.getAction(),
+            getClassName(regionAttributes.getCustomEntryIdleTimeout())));
       }
-      if (regionAttributesType.getEntryTimeToLive() == null) {
+
+      ExpirationAttributes entryTimeToLive = regionAttributes.getEntryTimeToLive();
+      if (entryTimeToLive != null && !entryTimeToLive.isDefault()
+          && regionAttributesType.getEntryTimeToLive() == null) {
         regionAttributesType.setEntryTimeToLive(getExpirationAttributes(
-            regionAttributes.getEntryTimeToLive(), regionAttributes.getCustomEntryTimeToLive()));
+            entryTimeToLive.getTimeout(), entryTimeToLive.getAction(),
+            getClassName(regionAttributes.getCustomEntryTimeToLive())));
       }
 
-      if (regionAttributesType.getRegionIdleTime() == null) {
+      ExpirationAttributes regionIdleTimeout = regionAttributes.getRegionIdleTimeout();
+      if (regionIdleTimeout != null && !regionIdleTimeout.isDefault()
+          && regionAttributesType.getRegionIdleTime() == null) {
         regionAttributesType.setRegionIdleTime(
-            getExpirationAttributes(regionAttributes.getRegionIdleTimeout(), null));
+            getExpirationAttributes(regionIdleTimeout.getTimeout(),
+                regionIdleTimeout.getAction(), null));
       }
 
-      if (regionAttributesType.getRegionTimeToLive() == null) {
+      ExpirationAttributes regionTimeToLive = regionAttributes.getRegionTimeToLive();
+      if (regionTimeToLive != null && !regionTimeToLive.isDefault()
+          && regionAttributesType.getRegionTimeToLive() == null) {
         regionAttributesType.setRegionTimeToLive(
-            getExpirationAttributes(regionAttributes.getRegionTimeToLive(), null));
+            getExpirationAttributes(regionTimeToLive.getTimeout(),
+                regionTimeToLive.getAction(), null));
       }
     }
 
-
-    if (args.getDiskStore() != null) {
-      regionAttributesType.setDiskStoreName(args.getDiskStore());
+    if (diskStore != null) {
+      regionAttributesType.setDiskStoreName(diskStore);
     } else if (regionAttributes != null) {
       regionAttributesType.setDiskStoreName(regionAttributes.getDiskStoreName());
     }
 
-    if (args.getDiskSynchronous() != null) {
-      regionAttributesType.setDiskSynchronous(args.getDiskSynchronous());
+    if (diskSynchronous != null) {
+      regionAttributesType.setDiskSynchronous(diskSynchronous);
     } else if (regionAttributes != null) {
       regionAttributesType.setDiskSynchronous(regionAttributes.isDiskSynchronous());
     }
 
-    if (args.getEnableAsyncConflation() != null) {
-      regionAttributesType.setEnableAsyncConflation(args.getEnableAsyncConflation());
+    if (enableAsyncConflation != null) {
+      regionAttributesType.setEnableAsyncConflation(enableAsyncConflation);
     } else if (regionAttributes != null) {
       regionAttributesType.setEnableAsyncConflation(regionAttributes.getEnableAsyncConflation());
     }
 
-    if (args.getEnableSubscriptionConflation() != null) {
-      regionAttributesType.setEnableSubscriptionConflation(args.getEnableSubscriptionConflation());
+    if (enableSubscriptionConflation != null) {
+      regionAttributesType.setEnableSubscriptionConflation(enableSubscriptionConflation);
     } else if (regionAttributes != null) {
       regionAttributesType
           .setEnableSubscriptionConflation(regionAttributes.getEnableSubscriptionConflation());
     }
 
-    if (args.getConcurrencyChecksEnabled() != null) {
-      regionAttributesType.setConcurrencyChecksEnabled(args.getConcurrencyChecksEnabled());
+    if (concurrencyChecksEnabled != null) {
+      regionAttributesType.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
     } else if (regionAttributes != null) {
       regionAttributesType
           .setConcurrencyChecksEnabled(regionAttributes.getConcurrencyChecksEnabled());
     }
 
-    if (args.getCloningEnabled() != null) {
-      regionAttributesType.setCloningEnabled(args.getCloningEnabled());
+    if (cloningEnabled != null) {
+      regionAttributesType.setCloningEnabled(cloningEnabled);
     } else if (regionAttributes != null) {
       regionAttributesType.setCloningEnabled(regionAttributes.getCloningEnabled());
     }
 
-    if (args.getOffHeap() != null) {
-      regionAttributesType.setOffHeap(args.getOffHeap());
+    if (offHeap != null) {
+      regionAttributesType.setOffHeap(offHeap);
     } else if (regionAttributes != null) {
       regionAttributesType.setOffHeap(regionAttributes.getOffHeap());
     }
 
-    if (args.getMcastEnabled() != null) {
-      regionAttributesType.setMulticastEnabled(args.getMcastEnabled());
+    if (mcastEnabled != null) {
+      regionAttributesType.setMulticastEnabled(mcastEnabled);
     } else if (regionAttributes != null) {
       regionAttributesType.setMulticastEnabled(regionAttributes.getMulticastEnabled());
     }
 
-    if (args.getPartitionArgs() != null) {
+    if (partitionArgs != null && !partitionArgs.isEmpty()) {
       RegionAttributesType.PartitionAttributes partitionAttributes =
           new RegionAttributesType.PartitionAttributes();
-      RegionFunctionArgs.PartitionArgs partitionArgs = args.getPartitionArgs();
-      partitionAttributes.setColocatedWith(partitionArgs.getPrColocatedWith());
-      partitionAttributes.setLocalMaxMemory(Objects.toString(partitionArgs.getPrLocalMaxMemory()));
-      partitionAttributes.setRecoveryDelay(Objects.toString(partitionArgs.getPrRecoveryDelay()));
-      partitionAttributes
-          .setRedundantCopies(Objects.toString(partitionArgs.getPrRedundantCopies()));
-      partitionAttributes
-          .setStartupRecoveryDelay(Objects.toString(partitionArgs.getPrStartupRecoveryDelay()));
-      partitionAttributes.setTotalMaxMemory(Objects.toString(partitionArgs.getPrTotalMaxMemory()));
+      regionAttributesType.setPartitionAttributes(partitionAttributes);
+
+      partitionAttributes.setColocatedWith(partitionArgs.prColocatedWith);
+      partitionAttributes.setLocalMaxMemory(int2string(partitionArgs.prLocalMaxMemory));
+      partitionAttributes.setRecoveryDelay(long2string(partitionArgs.prRecoveryDelay));
+      partitionAttributes.setRedundantCopies(int2string(partitionArgs.prRedundantCopies));
       partitionAttributes
-          .setTotalNumBuckets(Objects.toString(partitionArgs.getPrTotalNumBuckets()));
+          .setStartupRecoveryDelay(long2string(partitionArgs.prStartupRecoveryDelay));
+      partitionAttributes.setTotalMaxMemory(long2string(partitionArgs.prTotalMaxMemory));
+      partitionAttributes.setTotalNumBuckets(int2string(partitionArgs.prTotalNumBuckets));
 
-      if (partitionArgs.getPartitionResolver() != null) {
+      if (partitionArgs.partitionResolver != null) {
         DeclarableType partitionResolverType = new DeclarableType();
-        partitionResolverType.setClassName(partitionArgs.getPartitionResolver());
+        partitionResolverType.setClassName(partitionArgs.partitionResolver);
         partitionAttributes.setPartitionResolver(partitionResolverType);
       }
+    }
 
+    if (regionAttributes != null && regionAttributes.getPartitionAttributes() != null) {
+      RegionAttributesType.PartitionAttributes partitionAttributes = Optional.ofNullable(
+          regionAttributesType.getPartitionAttributes())
+          .orElse(new RegionAttributesType.PartitionAttributes());
       regionAttributesType.setPartitionAttributes(partitionAttributes);
-    } else if (regionAttributes != null && regionAttributes.getPartitionAttributes() != null) {
-      regionAttributesType.setPartitionAttributes(
-          regionAttributes.getPartitionAttributes().convertToConfigPartitionAttributes());
+
+      RegionAttributesType.PartitionAttributes implicitPartitionAttributes =
+          regionAttributes.getPartitionAttributes().convertToConfigPartitionAttributes();
+
+      String implicitColocatedWith = implicitPartitionAttributes.getColocatedWith();
+      if (partitionAttributes.getColocatedWith() == null && implicitColocatedWith != null) {
+        partitionAttributes.setColocatedWith(implicitColocatedWith);
+      }
+
+      String implicitLocalMaxMemory = implicitPartitionAttributes.getLocalMaxMemory();
+      if (partitionAttributes.getLocalMaxMemory() == null && implicitLocalMaxMemory != null) {
+        partitionAttributes.setLocalMaxMemory(implicitLocalMaxMemory);
+      }
+
+      String implicitRecoveryDelay = implicitPartitionAttributes.getRecoveryDelay();
+      if (partitionAttributes.getRecoveryDelay() == null && implicitRecoveryDelay != null) {
+        partitionAttributes.setRecoveryDelay(implicitRecoveryDelay);
+      }
+
+      String implicitRedundantCopies = implicitPartitionAttributes.getRedundantCopies();
+      if (partitionAttributes.getRedundantCopies() == null && implicitRedundantCopies != null) {
+        partitionAttributes.setRedundantCopies(implicitRedundantCopies);
+      }
+
+      String implicitStartupRecoveryDelay = implicitPartitionAttributes.getStartupRecoveryDelay();
+      if (partitionAttributes.getStartupRecoveryDelay() == null
+          && implicitStartupRecoveryDelay != null) {
+        partitionAttributes.setStartupRecoveryDelay(implicitStartupRecoveryDelay);
+      }
+
+      String implicitTotalMaxMemory = implicitPartitionAttributes.getTotalMaxMemory();
+      if (partitionAttributes.getTotalMaxMemory() == null && implicitTotalMaxMemory != null) {
+        partitionAttributes.setTotalMaxMemory(implicitTotalMaxMemory);
+      }
+
+      String implicitTotalNumBuckets = implicitPartitionAttributes.getTotalNumBuckets();
+      if (partitionAttributes.getTotalNumBuckets() == null && implicitTotalNumBuckets != null) {
+        partitionAttributes.setTotalNumBuckets(implicitTotalNumBuckets);
+      }
+
+      DeclarableType implicitPartitionResolver = implicitPartitionAttributes.getPartitionResolver();
+      if (partitionAttributes.getPartitionResolver() == null && implicitPartitionResolver != null) {
+        partitionAttributes.setPartitionResolver(implicitPartitionResolver);
+      }
     }
 
-    if (args.getGatewaySenderIds() != null && !args.getGatewaySenderIds().isEmpty()) {
-      regionAttributesType.setGatewaySenderIds(String.join(",", args.getGatewaySenderIds()));
+    if (gatewaySenderIds != null && !gatewaySenderIds.isEmpty()) {
+      regionAttributesType.setGatewaySenderIds(String.join(",", gatewaySenderIds));
     }
 
-    if (args.getEvictionAttributes() != null) {
-      regionAttributesType
-          .setEvictionAttributes(args.getEvictionAttributes().convertToConfigEvictionAttributes());
-    } else if (regionAttributes != null &&
-        regionAttributes.getEvictionAttributes() != null &&
-        !regionAttributes.getEvictionAttributes().isEmpty()) {
-      regionAttributesType.setEvictionAttributes(
-          regionAttributes.getEvictionAttributes().convertToConfigEvictionAttributes());
+    if (evictionAction != null) {
+      RegionAttributesType.EvictionAttributes evictionAttributes =
+          generateEvictionAttributes(evictionAction, evictionMaxMemory, evictionEntryCount,
+              evictionObjectSizer);
+      regionAttributesType.setEvictionAttributes(evictionAttributes);
+    } else if (regionAttributes != null && regionAttributes.getEvictionAttributes() != null
+        && !regionAttributes.getEvictionAttributes().isNoEviction()) {
+      regionAttributesType.setEvictionAttributes(regionAttributes.getEvictionAttributes()
+          .convertToConfigEvictionAttributes());
     }
 
-    if (args.getAsyncEventQueueIds() != null && !args.getAsyncEventQueueIds().isEmpty()) {
-      regionAttributesType.setAsyncEventQueueIds(String.join(",", args.getAsyncEventQueueIds()));
+    if (asyncEventQueueIds != null && !asyncEventQueueIds.isEmpty()) {
+      regionAttributesType.setAsyncEventQueueIds(String.join(",", asyncEventQueueIds));
     }
 
-    if (args.getCacheListeners() != null && !args.getCacheListeners().isEmpty()) {
-      regionAttributesType.getCacheListeners().addAll(args.getCacheListeners().stream().map(l -> {
+    if (cacheListeners != null && !cacheListeners.isEmpty()) {
+      regionAttributesType.getCacheListeners().addAll(cacheListeners.stream().map(l -> {
         DeclarableType declarableType = new DeclarableType();
         declarableType.setClassName(l.getClassName());
         return declarableType;
       }).collect(Collectors.toList()));
     }
 
-    if (args.getCacheLoader() != null) {
+    if (cacheLoader != null) {
       DeclarableType declarableType = new DeclarableType();
-      declarableType.setClassName(args.getCacheLoader().getClassName());
+      declarableType.setClassName(cacheLoader.getClassName());
       regionAttributesType.setCacheLoader(declarableType);
     }
 
-    if (args.getCacheWriter() != null) {
+    if (cacheWriter != null) {
       DeclarableType declarableType = new DeclarableType();
-      declarableType.setClassName(args.getCacheWriter().getClassName());
+      declarableType.setClassName(cacheWriter.getClassName());
       regionAttributesType.setCacheWriter(declarableType);
     }
 
-    if (args.getCompressor() != null) {
-      regionAttributesType.setCompressor(new ClassNameType(args.getCompressor()));
+    if (compressor != null) {
+      regionAttributesType.setCompressor(new ClassNameType(compressor));
       regionAttributesType.setCloningEnabled(true);
     }
 
-    if (args.getConcurrencyLevel() != null) {
-      regionAttributesType.setConcurrencyLevel(args.getConcurrencyLevel().toString());
+    if (concurrencyLevel != null) {
+      regionAttributesType.setConcurrencyLevel(concurrencyLevel.toString());
     } else if (regionAttributes != null) {
       regionAttributesType
           .setConcurrencyLevel(Integer.toString(regionAttributes.getConcurrencyLevel()));
@@ -224,34 +325,35 @@ public class RegionConfigFactory {
     return regionConfig;
   }
 
-  public static RegionAttributesType.ExpirationAttributesType getExpirationAttributes(
-      ExpirationAttributes entryIdleTimeout, CustomExpiry<?, ?> customEntryIdleTimeout) {
-
-    if ((entryIdleTimeout == null || entryIdleTimeout.isDefault())
-        && customEntryIdleTimeout == null) {
-      return null;
-    }
-
-    if (entryIdleTimeout == null || entryIdleTimeout.isDefault()) {
-      return getExpirationAttributes(null, null,
-          new ClassName<>(customEntryIdleTimeout.getClass().getName()));
-    } else if (customEntryIdleTimeout == null) {
-      return getExpirationAttributes(entryIdleTimeout.getTimeout(), entryIdleTimeout.getAction(),
-          null);
+  private RegionAttributesType.EvictionAttributes generateEvictionAttributes(String evictionAction,
+      Integer maxMemory, Integer maxEntryCount,
+      String objectSizer) {
+    RegionAttributesType.EvictionAttributes configAttributes =
+        new RegionAttributesType.EvictionAttributes();
+    EnumActionDestroyOverflow action = EnumActionDestroyOverflow.fromValue(evictionAction);
+
+    if (maxMemory == null && maxEntryCount == null) {
+      RegionAttributesType.EvictionAttributes.LruHeapPercentage heapPercentage =
+          new RegionAttributesType.EvictionAttributes.LruHeapPercentage();
+      heapPercentage.setAction(action);
+      heapPercentage.setClassName(objectSizer);
+      configAttributes.setLruHeapPercentage(heapPercentage);
+    } else if (maxMemory != null) {
+      RegionAttributesType.EvictionAttributes.LruMemorySize memorySize =
+          new RegionAttributesType.EvictionAttributes.LruMemorySize();
+      memorySize.setAction(action);
+      memorySize.setClassName(objectSizer);
+      memorySize.setMaximum(maxMemory.toString());
+      configAttributes.setLruMemorySize(memorySize);
     } else {
-      return getExpirationAttributes(entryIdleTimeout.getTimeout(), entryIdleTimeout.getAction(),
-          new ClassName<>(customEntryIdleTimeout.getClass().getName()));
+      RegionAttributesType.EvictionAttributes.LruEntryCount entryCount =
+          new RegionAttributesType.EvictionAttributes.LruEntryCount();
+      entryCount.setAction(action);
+      entryCount.setMaximum(maxEntryCount.toString());
+      configAttributes.setLruEntryCount(entryCount);
     }
-  }
 
-  public static RegionAttributesType.ExpirationAttributesType getExpirationAttributes(
-      RegionFunctionArgs.ExpirationAttrs expirationAttrs, ClassName<CustomExpiry> customExpiry) {
-    if (expirationAttrs == null) {
-      return getExpirationAttributes(null, null, customExpiry);
-    } else {
-      return getExpirationAttributes(expirationAttrs.getTime(), expirationAttrs.getAction(),
-          customExpiry);
-    }
+    return configAttributes;
   }
 
   public static RegionAttributesType.ExpirationAttributesType getExpirationAttributes(
@@ -259,6 +361,7 @@ public class RegionConfigFactory {
     if (timeout == null && action == null && expiry == null) {
       return null;
     }
+
     RegionAttributesType.ExpirationAttributesType attributesType =
         new RegionAttributesType.ExpirationAttributesType();
 
@@ -266,16 +369,31 @@ public class RegionConfigFactory {
     if (action == null) {
       action = ExpirationAction.INVALIDATE;
     }
+
     attributesType.setAction(action.toXmlString());
 
     if (expiry != null) {
-      attributesType
-          .setCustomExpiry(new DeclarableType(expiry.getClassName(), expiry.getInitProperties()));
+      attributesType.setCustomExpiry(new DeclarableType(expiry.getClassName()));
     }
 
     return attributesType;
   }
 
+  private static ClassName<CustomExpiry> getClassName(CustomExpiry expiry) {
+    if (expiry == null) {
+      return null;
+    }
+
+    return new ClassName<>(expiry.getClass().getName());
+  }
+
+  private static String int2string(Integer x) {
+    return Optional.ofNullable(x).map(v -> v.toString()).orElse(null);
+  }
+
+  private static String long2string(Long x) {
+    return Optional.ofNullable(x).map(v -> v.toString()).orElse(null);
+  }
 
   private String getLeafRegion(String fullPath) {
     String regionPath = fullPath;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/exceptions/EntityExistsException.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/exceptions/EntityExistsException.java
index dd44715..fa2ec46 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/exceptions/EntityExistsException.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/exceptions/EntityExistsException.java
@@ -1,28 +1,35 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  */
 
 package org.apache.geode.management.internal.cli.exceptions;
 
-public class EntityExistsException extends EntityNotFoundException {
+import org.apache.geode.GemFireException;
+
+public class EntityExistsException extends GemFireException {
+
+  public EntityExistsException() {}
+
   public EntityExistsException(String message) {
     super(message);
   }
 
-  public EntityExistsException(String message, boolean statusOK) {
-    super(message, statusOK);
+  public EntityExistsException(Throwable cause) {
+    super(cause);
+  }
+
+  public EntityExistsException(String message, Throwable cause) {
+    super(message, cause);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateRegionFunctionArgs.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateRegionFunctionArgs.java
new file mode 100644
index 0000000..ccc89d0
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateRegionFunctionArgs.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import java.io.Serializable;
+
+import org.apache.geode.cache.configuration.RegionConfig;
+
+public class CreateRegionFunctionArgs implements Serializable {
+  private final String regionPath;
+  private final RegionConfig config;
+  private final boolean ifNotExists;
+
+  public CreateRegionFunctionArgs(String path, RegionConfig config, boolean ifNotExists) {
+    this.regionPath = path;
+    this.config = config;
+    this.ifNotExists = ifNotExists;
+  }
+
+  public boolean isIfNotExists() {
+    return ifNotExists;
+  }
+
+  public String getRegionPath() {
+    return regionPath;
+  }
+
+  public RegionConfig getConfig() {
+    return config;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
index d1942fb..ecaf2a8 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
@@ -26,9 +26,9 @@ import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.cache.wan.GatewaySenderFactory;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
-import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.management.internal.cli.CliUtil;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
 
 public class GatewaySenderCreateFunction implements InternalFunction {
@@ -144,18 +144,20 @@ public class GatewaySenderCreateFunction implements InternalFunction {
     if (gatewayEventFilters != null) {
       for (String gatewayEventFilter : gatewayEventFilters) {
         Class gatewayEventFilterKlass =
-            forName(gatewayEventFilter, CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER);
-        gateway.addGatewayEventFilter((GatewayEventFilter) newInstance(gatewayEventFilterKlass,
-            CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER));
+            CliUtil.forName(gatewayEventFilter,
+                CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER);
+        gateway.addGatewayEventFilter(
+            (GatewayEventFilter) CliUtil.newInstance(gatewayEventFilterKlass,
+                CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER));
       }
     }
 
     List<String> gatewayTransportFilters = gatewaySenderCreateArgs.getGatewayTransportFilter();
     if (gatewayTransportFilters != null) {
       for (String gatewayTransportFilter : gatewayTransportFilters) {
-        Class gatewayTransportFilterKlass = forName(gatewayTransportFilter,
+        Class gatewayTransportFilterKlass = CliUtil.forName(gatewayTransportFilter,
             CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER);
-        gateway.addGatewayTransportFilter((GatewayTransportFilter) newInstance(
+        gateway.addGatewayTransportFilter((GatewayTransportFilter) CliUtil.newInstance(
             gatewayTransportFilterKlass, CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER));
       }
     }
@@ -163,45 +165,6 @@ public class GatewaySenderCreateFunction implements InternalFunction {
         gatewaySenderCreateArgs.getRemoteDistributedSystemId());
   }
 
-  @SuppressWarnings("unchecked")
-  private static Class forName(String classToLoadName, String neededFor) {
-    Class loadedClass = null;
-    try {
-      // Set Constraints
-      ClassPathLoader classPathLoader = ClassPathLoader.getLatest();
-      if (classToLoadName != null && !classToLoadName.isEmpty()) {
-        loadedClass = classPathLoader.forName(classToLoadName);
-      }
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(
-          CliStrings.format(CliStrings.CREATE_REGION__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1,
-              classToLoadName, neededFor),
-          e);
-    } catch (ClassCastException e) {
-      throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_REGION__MSG__CLASS_SPECIFIED_FOR_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE,
-          classToLoadName, neededFor), e);
-    }
-
-    return loadedClass;
-  }
-
-  private static Object newInstance(Class klass, String neededFor) {
-    Object instance = null;
-    try {
-      instance = klass.newInstance();
-    } catch (InstantiationException e) {
-      throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_GATEWAYSENDER__MSG__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1,
-          klass, neededFor), e);
-    } catch (IllegalAccessException e) {
-      throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_GATEWAYSENDER__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1, klass,
-          neededFor), e);
-    }
-    return instance;
-  }
-
   @Override
   public String getId() {
     return ID;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionCreateFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionCreateFunction.java
index 26ed5a4..1401d50 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionCreateFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionCreateFunction.java
@@ -14,12 +14,9 @@
  */
 package org.apache.geode.management.internal.cli.functions;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
-import java.util.Set;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.logging.log4j.Logger;
 
@@ -27,28 +24,30 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CustomExpiry;
 import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.Declarable;
-import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.PartitionAttributes;
-import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.PartitionResolver;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.ResultSender;
-import org.apache.geode.cache.util.ObjectSizer;
 import org.apache.geode.compression.Compressor;
 import org.apache.geode.internal.ClassPathLoader;
+import org.apache.geode.internal.cache.EvictionAttributesImpl;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.cache.xmlcache.CacheXml;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.management.internal.cli.CliUtil;
 import org.apache.geode.management.internal.cli.commands.RegionCommandsUtils;
-import org.apache.geode.management.internal.cli.domain.ClassName;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
 import org.apache.geode.management.internal.cli.util.RegionPath;
 import org.apache.geode.management.internal.configuration.domain.XmlEntity;
@@ -79,7 +78,7 @@ public class RegionCreateFunction implements InternalFunction {
     Cache cache = ((InternalCache) context.getCache()).getCacheForProcessingClientRequests();
     String memberNameOrId = context.getMemberName();
 
-    RegionFunctionArgs regionCreateArgs = (RegionFunctionArgs) context.getArguments();
+    CreateRegionFunctionArgs regionCreateArgs = (CreateRegionFunctionArgs) context.getArguments();
 
     if (regionCreateArgs.isIfNotExists()) {
       Region<Object, Object> region = cache.getRegion(regionCreateArgs.getRegionPath());
@@ -94,12 +93,12 @@ public class RegionCreateFunction implements InternalFunction {
     }
 
     try {
-      Region<?, ?> createdRegion = createRegion(cache, regionCreateArgs);
+      Region<?, ?> createdRegion =
+          createRegion(cache, regionCreateArgs.getConfig(), regionCreateArgs.getRegionPath());
       XmlEntity xmlEntity = getXmlEntityForRegion(createdRegion);
-      resultSender
-          .lastResult(new CliFunctionResult(memberNameOrId, xmlEntity.getXmlDefinition(),
-              CliStrings.format(CliStrings.CREATE_REGION__MSG__REGION_0_CREATED_ON_1,
-                  createdRegion.getFullPath(), memberNameOrId)));
+      resultSender.lastResult(new CliFunctionResult(memberNameOrId, xmlEntity.getXmlDefinition(),
+          CliStrings.format(CliStrings.CREATE_REGION__MSG__REGION_0_CREATED_ON_1,
+              createdRegion.getFullPath(), memberNameOrId)));
     } catch (IllegalStateException e) {
       String exceptionMsg = e.getMessage();
       String localizedString =
@@ -149,174 +148,88 @@ public class RegionCreateFunction implements InternalFunction {
     return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR);
   }
 
-  private <K, V> Region<?, ?> createRegion(Cache cache, RegionFunctionArgs regionCreateArgs) {
-    Region<K, V> createdRegion = null;
+  private <K, V> Region<?, ?> createRegion(Cache cache, RegionConfig config, String regionPath)
+      throws RuntimeException {
+    RegionAttributesType regionAttributes = config.getRegionAttributes();
+    Region<K, V> createdRegion;
+    RegionFactory<K, V> factory = cache.createRegionFactory();
 
-    // create the region factory using the arguments
-    RegionAttributes<K, V> regionAttributes = regionCreateArgs.getRegionAttributes();
-    boolean isPartitioned = regionAttributes.getPartitionAttributes() != null;
-    RegionFactory<K, V> factory = cache.createRegionFactory(regionAttributes);
+    validateAndSetCustomClasses(regionAttributes, factory);
 
-    if (isPartitioned) {
-      PartitionAttributes<K, V> partitionAttributes =
-          extractPartitionAttributes(cache, regionAttributes, regionCreateArgs);
-
-      DataPolicy originalDataPolicy = regionAttributes.getDataPolicy();
-      factory.setPartitionAttributes(partitionAttributes);
-      // We have to do this because AttributesFactory.setPartitionAttributes()
-      // checks RegionAttributes.hasDataPolicy() which is set only when the data
-      // policy is set explicitly
-      factory.setDataPolicy(originalDataPolicy);
-    }
-
-    // Set Constraints
-    final String keyConstraint = regionCreateArgs.getKeyConstraint();
-    final String valueConstraint = regionCreateArgs.getValueConstraint();
-    if (keyConstraint != null && !keyConstraint.isEmpty()) {
-      Class<K> keyConstraintClass =
-          CliUtil.forName(keyConstraint, CliStrings.CREATE_REGION__KEYCONSTRAINT);
-      factory.setKeyConstraint(keyConstraintClass);
-    }
-
-    if (valueConstraint != null && !valueConstraint.isEmpty()) {
-      Class<V> valueConstraintClass =
-          CliUtil.forName(valueConstraint, CliStrings.CREATE_REGION__VALUECONSTRAINT);
-      factory.setValueConstraint(valueConstraintClass);
+    if (regionAttributes.getPartitionAttributes() != null) {
+      factory.setPartitionAttributes(
+          PartitionAttributesImpl.fromConfig(regionAttributes.getPartitionAttributes()));
     }
 
-    // Expiration attributes
-    final RegionFunctionArgs.ExpirationAttrs entryExpirationIdleTime =
-        regionCreateArgs.getEntryExpirationIdleTime();
-    if (entryExpirationIdleTime != null && entryExpirationIdleTime.isTimeOrActionSet()) {
-      factory.setEntryIdleTimeout(entryExpirationIdleTime.getExpirationAttributes());
-    }
+    factory
+        .setDataPolicy(DataPolicy.fromString(regionAttributes.getDataPolicy().value().toUpperCase()
+            .replace("-", "_")));
 
-    if (regionCreateArgs.getEntryIdleTimeCustomExpiry() != null) {
-      factory.setCustomEntryIdleTimeout(
-          regionCreateArgs.getEntryIdleTimeCustomExpiry().newInstance(cache));
+    if (regionAttributes.getScope() != null) {
+      factory.setScope(Scope.fromString(regionAttributes.getScope().value().toUpperCase()
+          .replace("-", "_")));
     }
 
-    if (regionCreateArgs.getEntryTTLCustomExpiry() != null) {
-      factory
-          .setCustomEntryTimeToLive(regionCreateArgs.getEntryTTLCustomExpiry().newInstance(cache));
-    }
+    validateAndSetExpirationAttributes(regionAttributes, factory);
 
-    final RegionFunctionArgs.ExpirationAttrs entryExpirationTTL =
-        regionCreateArgs.getEntryExpirationTTL();
-    if (entryExpirationTTL != null && entryExpirationTTL.isTimeOrActionSet()) {
-      factory.setEntryTimeToLive(entryExpirationTTL.getExpirationAttributes());
-    }
-    final RegionFunctionArgs.ExpirationAttrs regionExpirationIdleTime =
-        regionCreateArgs.getRegionExpirationIdleTime();
-    if (regionExpirationIdleTime != null && regionExpirationIdleTime.isTimeOrActionSet()) {
-      factory.setRegionIdleTimeout(regionExpirationIdleTime.getExpirationAttributes());
-    }
-    final RegionFunctionArgs.ExpirationAttrs regionExpirationTTL =
-        regionCreateArgs.getRegionExpirationTTL();
-    if (regionExpirationTTL != null && regionExpirationTTL.isTimeOrActionSet()) {
-      factory.setRegionTimeToLive(regionExpirationTTL.getExpirationAttributes());
-    }
-
-    EvictionAttributes evictionAttributes = Optional
-        .ofNullable(regionCreateArgs.getEvictionAttributes())
-        .map(a -> a.convertToEvictionAttributes()).orElse(null);
-    if (evictionAttributes != null) {
-      ObjectSizer sizer = evictionAttributes.getObjectSizer();
-      if (sizer != null && !(sizer instanceof Declarable)) {
+    if (regionAttributes.getEvictionAttributes() != null) {
+      try {
+        factory.setEvictionAttributes(
+            EvictionAttributesImpl.fromConfig(regionAttributes.getEvictionAttributes()));
+      } catch (Exception e) {
         throw new IllegalArgumentException(
             CliStrings.CREATE_REGION__MSG__OBJECT_SIZER_MUST_BE_OBJECTSIZER_AND_DECLARABLE);
       }
-      factory.setEvictionAttributes(evictionAttributes);
     }
 
-    // Associate a Disk Store
-    final String diskStore = regionCreateArgs.getDiskStore();
-    if (diskStore != null && !diskStore.isEmpty()) {
-      factory.setDiskStoreName(diskStore);
+    if (regionAttributes.getDiskStoreName() != null) {
+      factory.setDiskStoreName(regionAttributes.getDiskStoreName());
     }
 
-    if (regionCreateArgs.getDiskSynchronous() != null) {
-      factory.setDiskSynchronous(regionCreateArgs.getDiskSynchronous());
+    if (regionAttributes.isDiskSynchronous() != null) {
+      factory.setDiskSynchronous(regionAttributes.isDiskSynchronous());
     }
 
-    if (regionCreateArgs.getOffHeap() != null) {
-      factory.setOffHeap(regionCreateArgs.getOffHeap());
+    if (regionAttributes.isOffHeap() != null) {
+      factory.setOffHeap(regionAttributes.isOffHeap());
     }
 
-    if (regionCreateArgs.getStatisticsEnabled() != null) {
-      factory.setStatisticsEnabled(regionCreateArgs.getStatisticsEnabled());
+    if (regionAttributes.isStatisticsEnabled() != null) {
+      factory.setStatisticsEnabled(regionAttributes.isStatisticsEnabled());
     }
 
-    if (regionCreateArgs.getEnableAsyncConflation() != null) {
-      factory.setEnableAsyncConflation(regionCreateArgs.getEnableAsyncConflation());
+    if (regionAttributes.isEnableAsyncConflation() != null) {
+      factory.setEnableAsyncConflation(regionAttributes.isEnableAsyncConflation());
     }
 
-    if (regionCreateArgs.getEnableSubscriptionConflation() != null) {
-      factory.setEnableSubscriptionConflation(regionCreateArgs.getEnableSubscriptionConflation());
+    if (regionAttributes.isEnableSubscriptionConflation() != null) {
+      factory.setEnableSubscriptionConflation(regionAttributes.isEnableSubscriptionConflation());
     }
 
-    // Gateway Sender Ids
-    final Set<String> gatewaySenderIds = regionCreateArgs.getGatewaySenderIds();
-    if (gatewaySenderIds != null && !gatewaySenderIds.isEmpty()) {
-      for (String gatewaySenderId : gatewaySenderIds) {
-        factory.addGatewaySenderId(gatewaySenderId);
-      }
+    if (regionAttributes.getGatewaySenderIds() != null) {
+      Arrays.stream(regionAttributes.getGatewaySenderIds().split(","))
+          .forEach(gsi -> factory.addGatewaySenderId(gsi));
     }
 
-    // Async Queue Ids
-    final Set<String> asyncEventQueueIds = regionCreateArgs.getAsyncEventQueueIds();
-    if (asyncEventQueueIds != null && !asyncEventQueueIds.isEmpty()) {
-      for (String asyncEventQueueId : asyncEventQueueIds) {
-        factory.addAsyncEventQueueId(asyncEventQueueId);
-      }
+    if (regionAttributes.getAsyncEventQueueIds() != null) {
+      Arrays.stream(regionAttributes.getAsyncEventQueueIds().split(","))
+          .forEach(gsi -> factory.addAsyncEventQueueId(gsi));
     }
 
-    if (regionCreateArgs.getConcurrencyChecksEnabled() != null) {
-      factory.setConcurrencyChecksEnabled(regionCreateArgs.getConcurrencyChecksEnabled());
-    }
+    factory.setConcurrencyChecksEnabled(regionAttributes.isConcurrencyChecksEnabled());
 
-    if (regionCreateArgs.getConcurrencyLevel() != null) {
-      factory.setConcurrencyLevel(regionCreateArgs.getConcurrencyLevel());
+    if (regionAttributes.getConcurrencyLevel() != null) {
+      factory.setConcurrencyLevel(Integer.valueOf(regionAttributes.getConcurrencyLevel()));
     }
 
-    if (regionCreateArgs.getCloningEnabled() != null) {
-      factory.setCloningEnabled(regionCreateArgs.getCloningEnabled());
+    if (regionAttributes.isCloningEnabled() != null) {
+      factory.setCloningEnabled(regionAttributes.isCloningEnabled());
     }
 
-    if (regionCreateArgs.getMcastEnabled() != null) {
-      factory.setMulticastEnabled(regionCreateArgs.getMcastEnabled());
+    if (regionAttributes.isMulticastEnabled() != null) {
+      factory.setMulticastEnabled(regionAttributes.isMulticastEnabled());
     }
 
-    // Set plugins
-    final Set<ClassName<CacheListener>> cacheListeners = regionCreateArgs.getCacheListeners();
-    if (cacheListeners != null && !cacheListeners.isEmpty()) {
-      List<CacheListener<K, V>> newListeners = new ArrayList<>();
-      for (ClassName<CacheListener> cacheListener : cacheListeners) {
-        newListeners.add(cacheListener.newInstance(cache));
-      }
-      factory.initCacheListeners(newListeners.toArray(new CacheListener[0]));
-    }
-
-    // Compression provider
-    if (regionCreateArgs.getCompressor() != null) {
-      Class<Compressor> compressorKlass =
-          CliUtil.forName(regionCreateArgs.getCompressor(), CliStrings.CREATE_REGION__COMPRESSOR);
-      factory.setCompressor(
-          CliUtil.newInstance(compressorKlass, CliStrings.CREATE_REGION__COMPRESSOR));
-    }
-
-    final ClassName<CacheLoader> cacheLoader = regionCreateArgs.getCacheLoader();
-    if (cacheLoader != null) {
-      factory.setCacheLoader(cacheLoader.newInstance(cache));
-    }
-
-    final ClassName<CacheWriter> cacheWriter = regionCreateArgs.getCacheWriter();
-    if (cacheWriter != null) {
-      factory.setCacheWriter(cacheWriter.newInstance(cache));
-    }
-
-    // If a region path indicates a sub-region,
-    final String regionPath = regionCreateArgs.getRegionPath();
     RegionPath regionPathData = new RegionPath(regionPath);
     String regionName = regionPathData.getName();
     String parentRegionPath = regionPathData.getParent();
@@ -330,99 +243,143 @@ public class RegionCreateFunction implements InternalFunction {
     return createdRegion;
   }
 
-  @SuppressWarnings("unchecked")
-  private static <K, V> PartitionAttributes<K, V> extractPartitionAttributes(Cache cache,
-      RegionAttributes<K, V> regionAttributes, RegionFunctionArgs regionCreateArgs) {
-
-    PartitionAttributesFactory<K, V> prAttrFactory;
-
-    PartitionAttributes<K, V> partitionAttributes = regionAttributes.getPartitionAttributes();
-    if (partitionAttributes != null) {
-      prAttrFactory = new PartitionAttributesFactory<>(partitionAttributes);
-    } else {
-      prAttrFactory = new PartitionAttributesFactory<>();
+  private <K, V> void validateAndSetExpirationAttributes(RegionAttributesType regionAttributes,
+      RegionFactory<K, V> factory) {
+    if (regionAttributes.getEntryIdleTime() != null) {
+      RegionAttributesType.ExpirationAttributesType eitl = regionAttributes.getEntryIdleTime();
+      factory.setEntryIdleTimeout(
+          new ExpirationAttributes(Integer.valueOf(eitl.getTimeout()),
+              ExpirationAction.fromString(eitl.getAction().toUpperCase()
+                  .replace("-", "_"))));
+
+      try {
+        if (eitl.getCustomExpiry() != null) {
+          factory.setCustomEntryIdleTimeout((CustomExpiry) ClassPathLoader.getLatest()
+              .forName(eitl.getCustomExpiry().getClassName())
+              .newInstance());
+        }
+      } catch (Exception e) {
+      }
     }
 
-    if (regionCreateArgs.hasPartitionAttributes()) {
-      RegionFunctionArgs.PartitionArgs partitionArgs = regionCreateArgs.getPartitionArgs();
-      String colocatedWith = partitionArgs.getPrColocatedWith();
-      if (colocatedWith != null) {
-        Region<Object, Object> colocatedWithRegion = cache.getRegion(colocatedWith);
-        if (colocatedWithRegion == null) {
-          throw new IllegalArgumentException(CliStrings.format(
-              CliStrings.CREATE_REGION__MSG__COLOCATEDWITH_REGION_0_DOES_NOT_EXIST, colocatedWith));
-        }
-        if (!colocatedWithRegion.getAttributes().getDataPolicy().withPartitioning()) {
-          throw new IllegalArgumentException(CliStrings.format(
-              CliStrings.CREATE_REGION__MSG__COLOCATEDWITH_REGION_0_IS_NOT_PARTITIONEDREGION,
-              colocatedWith));
+    if (regionAttributes.getEntryTimeToLive() != null) {
+      RegionAttributesType.ExpirationAttributesType ettl = regionAttributes.getEntryTimeToLive();
+      factory.setEntryTimeToLive(
+          new ExpirationAttributes(Integer.valueOf(ettl.getTimeout()),
+              ExpirationAction.fromString(ettl.getAction().toUpperCase()
+                  .replace("-", "_"))));
+
+      try {
+        if (ettl.getCustomExpiry() != null) {
+          factory.setCustomEntryTimeToLive((CustomExpiry) ClassPathLoader.getLatest()
+              .forName(ettl.getCustomExpiry().getClassName())
+              .newInstance());
         }
-        prAttrFactory.setColocatedWith(colocatedWith);
-      }
-      if (partitionArgs.getPrLocalMaxMemory() != null) {
-        prAttrFactory.setLocalMaxMemory(partitionArgs.getPrLocalMaxMemory());
-      }
-      if (partitionArgs.getPrTotalMaxMemory() != null) {
-        prAttrFactory.setTotalMaxMemory(partitionArgs.getPrTotalMaxMemory());
-      }
-      if (partitionArgs.getPrTotalNumBuckets() != null) {
-        prAttrFactory.setTotalNumBuckets(partitionArgs.getPrTotalNumBuckets());
-      }
-      if (partitionArgs.getPrRedundantCopies() != null) {
-        prAttrFactory.setRedundantCopies(partitionArgs.getPrRedundantCopies());
-      }
-      if (partitionArgs.getPrRecoveryDelay() != null) {
-        prAttrFactory.setRecoveryDelay(partitionArgs.getPrRecoveryDelay());
-      }
-      if (partitionArgs.getPrStartupRecoveryDelay() != null) {
-        prAttrFactory.setStartupRecoveryDelay(partitionArgs.getPrStartupRecoveryDelay());
+      } catch (Exception e) {
       }
+    }
 
-      if (regionCreateArgs.getPartitionArgs().getPartitionResolver() != null) {
-        Class<PartitionResolver> partitionResolverClass =
-            forName(regionCreateArgs.getPartitionArgs().getPartitionResolver(),
-                CliStrings.CREATE_REGION__PARTITION_RESOLVER);
-        prAttrFactory
-            .setPartitionResolver((PartitionResolver<K, V>) newInstance(partitionResolverClass,
-                CliStrings.CREATE_REGION__PARTITION_RESOLVER));
-      }
+    if (regionAttributes.getRegionIdleTime() != null) {
+      RegionAttributesType.ExpirationAttributesType ritl = regionAttributes.getRegionIdleTime();
+      factory.setRegionIdleTimeout(
+          new ExpirationAttributes(Integer.valueOf(ritl.getTimeout()),
+              ExpirationAction.fromString(ritl.getAction().toUpperCase()
+                  .replace("-", "_"))));
+    }
+
+    if (regionAttributes.getRegionTimeToLive() != null) {
+      RegionAttributesType.ExpirationAttributesType rttl = regionAttributes.getRegionTimeToLive();
+      factory.setRegionTimeToLive(
+          new ExpirationAttributes(Integer.valueOf(rttl.getTimeout()),
+              ExpirationAction.fromString(rttl.getAction().toUpperCase()
+                  .replace("-", "_"))));
     }
-    return prAttrFactory.create();
   }
 
+  private <K, V> void validateAndSetCustomClasses(RegionAttributesType regionAttributes,
+      RegionFactory<K, V> factory) {
+    if (regionAttributes.getEntryIdleTime() != null
+        && regionAttributes.getEntryIdleTime().getCustomExpiry() != null) {
+      String customExpiry = regionAttributes.getEntryIdleTime().getCustomExpiry().getClassName();
+      String neededFor = CliStrings.ENTRY_IDLE_TIME_CUSTOM_EXPIRY;
+      Class<CustomExpiry> customExpiryClass = CliUtil.forName(customExpiry, neededFor);
+      CliUtil.newInstance(customExpiryClass, neededFor);
+    }
+
+    if (regionAttributes.getEntryTimeToLive() != null
+        && regionAttributes.getEntryTimeToLive().getCustomExpiry() != null) {
+      String customExpiry = regionAttributes.getEntryTimeToLive().getCustomExpiry().getClassName();
+      String neededFor = CliStrings.ENTRY_TTL_CUSTOM_EXPIRY;
+      Class<CustomExpiry> customExpiryClass = CliUtil.forName(customExpiry, neededFor);
+      CliUtil.newInstance(customExpiryClass, neededFor);
+    }
+
+    if (regionAttributes.getPartitionAttributes() != null
+        && regionAttributes.getPartitionAttributes().getPartitionResolver() != null) {
+      String partitionResolver =
+          regionAttributes.getPartitionAttributes().getPartitionResolver().getClassName();
+      String neededFor = CliStrings.CREATE_REGION__PARTITION_RESOLVER;
+      Class<PartitionResolver> partitionResolverClass =
+          CliUtil.forName(partitionResolver, neededFor);
+      CliUtil.newInstance(partitionResolverClass, neededFor);
+    }
+
+    if (regionAttributes.getCacheLoader() != null) {
+      String cacheLoader =
+          regionAttributes.getCacheLoader().getClassName();
+      String neededFor = CliStrings.CREATE_REGION__CACHELOADER;
+      Class<CacheLoader> cacheLoaderClass =
+          CliUtil.forName(cacheLoader, neededFor);
+      CacheLoader loader = CliUtil.newInstance(cacheLoaderClass, neededFor);
+      factory.setCacheLoader(loader);
+    }
+
+    if (regionAttributes.getCacheWriter() != null) {
+      String cacheWriter =
+          regionAttributes.getCacheWriter().getClassName();
+      String neededFor = CliStrings.CREATE_REGION__CACHEWRITER;
+      Class<CacheWriter> cacheWriterClass =
+          CliUtil.forName(cacheWriter, neededFor);
+      CacheWriter writer = CliUtil.newInstance(cacheWriterClass, neededFor);
+      factory.setCacheWriter(writer);
+    }
+
+    if (regionAttributes.getCacheListeners() != null) {
+      List<DeclarableType> configListeners = regionAttributes.getCacheListeners();
+      CacheListener[] listeners = new CacheListener[configListeners.size()];
+      String neededFor = CliStrings.CREATE_REGION__CACHELISTENER;
+      for (int i = 0; i < configListeners.size(); i++) {
+        String listener = configListeners.get(i).getClassName();
+        Class<CacheListener> cacheListenerClass = CliUtil.forName(listener, neededFor);
+        listeners[i] = CliUtil.newInstance(cacheListenerClass, neededFor);
+      }
+      factory.initCacheListeners(listeners);
+    }
 
-  private static Class<PartitionResolver> forName(String className, String neededFor) {
-    if (StringUtils.isBlank(className)) {
-      throw new IllegalArgumentException(CliStrings
-          .format(CliStrings.CREATE_REGION__MSG__INVALID_PARTITION_RESOLVER, className, neededFor));
+    final String keyConstraint = (String) regionAttributes.getKeyConstraint();
+    final String valueConstraint = regionAttributes.getValueConstraint();
+    if (keyConstraint != null && !keyConstraint.isEmpty()) {
+      Class<K> keyConstraintClass =
+          CliUtil.forName(keyConstraint, CliStrings.CREATE_REGION__KEYCONSTRAINT);
+      factory.setKeyConstraint(keyConstraintClass);
     }
-    try {
-      return (Class<PartitionResolver>) ClassPathLoader.getLatest().forName(className);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_REGION_PARTITION_RESOLVER__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1,
-          className, neededFor), e);
-    } catch (ClassCastException e) {
-      throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_REGION__MSG__PARTITION_RESOLVER__CLASS_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE,
-          className, neededFor), e);
+
+    if (valueConstraint != null && !valueConstraint.isEmpty()) {
+      Class<V> valueConstraintClass =
+          CliUtil.forName(valueConstraint, CliStrings.CREATE_REGION__VALUECONSTRAINT);
+      factory.setValueConstraint(valueConstraintClass);
     }
-  }
 
-  private static PartitionResolver newInstance(Class<PartitionResolver> klass, String neededFor) {
-    try {
-      return klass.newInstance();
-    } catch (InstantiationException e) {
-      throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_REGION__MSG__PARTITION_RESOLVER__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1,
-          klass, neededFor), e);
-    } catch (IllegalAccessException e) {
-      throw new RuntimeException(CliStrings.format(
-          CliStrings.CREATE_REGION__MSG__PARTITION_RESOLVER__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1,
-          klass, neededFor), e);
+    if (regionAttributes.getCompressor() != null) {
+      Class<Compressor> compressorKlass =
+          CliUtil.forName(regionAttributes.getCompressor().getClassName(),
+              CliStrings.CREATE_REGION__COMPRESSOR);
+      factory.setCompressor(
+          CliUtil.newInstance(compressorKlass, CliStrings.CREATE_REGION__COMPRESSOR));
     }
   }
 
+
   @Override
   public String getId() {
     return ID;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionFunctionArgs.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionFunctionArgs.java
index 21d975d..2547c04 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionFunctionArgs.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionFunctionArgs.java
@@ -33,8 +33,6 @@ import org.apache.geode.cache.ExpirationAction;
 import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.configuration.EnumActionDestroyOverflow;
-import org.apache.geode.cache.configuration.RegionAttributesType;
 import org.apache.geode.cache.util.ObjectSizer;
 import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.management.internal.cli.domain.ClassName;
@@ -630,38 +628,8 @@ public class RegionFunctionArgs implements Serializable {
         return EvictionAttributes.createLRUEntryAttributes(maxEntryCount, action);
       }
     }
-
-    public RegionAttributesType.EvictionAttributes convertToConfigEvictionAttributes() {
-      RegionAttributesType.EvictionAttributes configAttributes =
-          new RegionAttributesType.EvictionAttributes();
-      EnumActionDestroyOverflow action = EnumActionDestroyOverflow.fromValue(evictionAction);
-
-      if (maxMemory == null && maxEntryCount == null) {
-        RegionAttributesType.EvictionAttributes.LruHeapPercentage heapPercentage =
-            new RegionAttributesType.EvictionAttributes.LruHeapPercentage();
-        heapPercentage.setAction(action);
-        heapPercentage.setClassName(objectSizer);
-        configAttributes.setLruHeapPercentage(heapPercentage);
-      } else if (maxMemory != null) {
-        RegionAttributesType.EvictionAttributes.LruMemorySize memorySize =
-            new RegionAttributesType.EvictionAttributes.LruMemorySize();
-        memorySize.setAction(action);
-        memorySize.setClassName(objectSizer);
-        memorySize.setMaximum(maxMemory.toString());
-        configAttributes.setLruMemorySize(memorySize);
-      } else {
-        RegionAttributesType.EvictionAttributes.LruEntryCount entryCount =
-            new RegionAttributesType.EvictionAttributes.LruEntryCount();
-        entryCount.setAction(action);
-        entryCount.setMaximum(maxEntryCount.toString());
-        configAttributes.setLruEntryCount(entryCount);
-      }
-
-      return configAttributes;
-    }
   }
 
-
   public static class PartitionArgs implements Serializable {
     private static final long serialVersionUID = 5907052187323280919L;
 
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
index bd96248..3b062e3 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
@@ -374,15 +374,6 @@ public class CliStrings {
   public static final String ALTER_REGION__EVICTIONMAX = "eviction-max";
   public static final String ALTER_REGION__EVICTIONMAX__HELP =
       "Maximum value for the Eviction Attributes which the Eviction Algorithm uses to determine when to perform its Eviction Action. The unit of the maximum value is determined by the Eviction Algorithm.";
-  public static final String ALTER_REGION__MSG__SPECIFY_VALID_CLASSNAME_FOR_CACHELISTENER_0_IS_INVALID =
-      "Specify a valid class name for " + CliStrings.CREATE_REGION__CACHELISTENER
-          + ". \"{0}\" is not valid.";
-  public static final String ALTER_REGION__MSG__SPECIFY_VALID_CLASSNAME_FOR_CACHEWRITER_0_IS_INVALID =
-      "Specify a valid class name for " + CliStrings.CREATE_REGION__CACHEWRITER
-          + ". \"{0}\" is not valid.";
-  public static final String ALTER_REGION__MSG__SPECIFY_VALID_CLASSNAME_FOR_CACHELOADER_0_IS_INVALID =
-      "Specify a valid class name for " + CliStrings.CREATE_REGION__CACHELOADER
-          + ". \"{0}\" is not valid.";
   public static final String ALTER_REGION__MSG__REGION_0_ALTERED_ON_1 =
       "Region \"{0}\" altered on \"{1}\"";
   public static final String ALTER_REGION__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1 =
@@ -951,7 +942,7 @@ public class CliStrings {
   public static final String CREATE_REGION__MSG__SPECIFY_VALID_REGION_PATH =
       "Specify a valid " + CliStrings.CREATE_REGION__REGION;
   public static final String CREATE_REGION__MSG__PARENT_REGION_FOR_0_DOES_NOT_EXIST =
-      "Parent region for \"{0}\" doesn't exist. ";
+      "Parent region for \"{0}\" does not exist. ";
   public static final String CREATE_REGION__MSG__GROUPS_0_ARE_INVALID =
       "Group(s) \"{0}\" are invalid.";
   public static final String CREATE_REGION__MSG__SPECIFY_VALID_REGION_PATH_FOR_USE_ATTR_FROM =
@@ -962,15 +953,6 @@ public class CliStrings {
   public static final String CREATE_REGION__MSG__SPECIFY_VALID_CLASSNAME_FOR_VALUECONSTRAINT_0_IS_INVALID =
       "Specify a valid class name for " + CliStrings.CREATE_REGION__VALUECONSTRAINT
           + ". \"{0}\" is not valid.";
-  public static final String CREATE_REGION__MSG__SPECIFY_VALID_CLASSNAME_FOR_CACHELISTENER_0_IS_INVALID =
-      "Specify a valid class name for " + CliStrings.CREATE_REGION__CACHELISTENER
-          + ". \"{0}\" is not valid.";
-  public static final String CREATE_REGION__MSG__SPECIFY_VALID_CLASSNAME_FOR_CACHEWRITER_0_IS_INVALID =
-      "Specify a valid class name for " + CliStrings.CREATE_REGION__CACHEWRITER
-          + ". \"{0}\" is not valid.";
-  public static final String CREATE_REGION__MSG__SPECIFY_VALID_CLASSNAME_FOR_CACHELOADER_0_IS_INVALID =
-      "Specify a valid class name for " + CliStrings.CREATE_REGION__CACHELOADER
-          + ". \"{0}\" is not valid.";
   public static final String CREATE_REGION__MSG__NO_GATEWAYSENDERS_IN_THE_SYSTEM =
       "There are no GatewaySenders defined currently in the system.";
   public static final String CREATE_REGION__MSG__SPECIFY_VALID_GATEWAYSENDER_ID_UNKNOWN_0 =
@@ -997,14 +979,6 @@ public class CliStrings {
           + "\" is required.";
   public static final String CREATE_REGION__MSG__SPECIFY_VALID_REGION_PATH_FOR_0_REGIONPATH_1_NOT_FOUND =
       "Specify a valid region path for {0}. Region {1} not found.";
-  public static final String CREATE_REGION__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not find class \"{0}\" specified for \"{1}\".";
-  public static final String CREATE_REGION__MSG__CLASS_SPECIFIED_FOR_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE =
-      "Class \"{0}\" specified for \"{1}\" is not of an expected type.";
-  public static final String CREATE_REGION__MSG__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not instantiate class \"{0}\" specified for \"{1}\".";
-  public static final String CREATE_REGION__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not access class \"{0}\" specified for \"{1}\".";
   public static final String CREATE_REGION__MSG__EXPIRATION_ACTION_0_IS_NOT_VALID =
       "Expiration action \"{0}\" is not valid.";
   public static final String CREATE_REGION__MSG__ERROR_ON_MEMBER_0 = "Error on member: {0}. ";
@@ -1051,18 +1025,6 @@ public class CliStrings {
   public static final String CREATE_REGION__MSG__PARTITION_RESOLVER_ONLY_FOR_REGION_TYPE_PARTITION =
       "partition resolver property is only applicable to PARTITION region type";
 
-  public static final String CREATE_REGION_PARTITION_RESOLVER__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not find class \"{0}\" specified for \"{1}\".";
-
-  public static final String CREATE_REGION__MSG__PARTITION_RESOLVER__CLASS_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE =
-      "Class \"{0}\" specified for \"{1}\" is not of an expected type.";
-
-  public static final String CREATE_REGION__MSG__PARTITION_RESOLVER__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not instantiate class \"{0}\" specified for \"{1}\".";
-
-  public static final String CREATE_REGION__MSG__PARTITION_RESOLVER__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not access class \"{0}\" specified for \"{1}\".";
-
   public static final String CREATE_REGION__MSG__INVALID_PARTITION_RESOLVER =
       "{0} is an invalid Partition Resolver.";
 
@@ -2251,10 +2213,6 @@ public class CliStrings {
       "The fully qualified class name of GatewayTransportFilter to be added to the GatewaySender. ";
   public static final String CREATE_GATEWAYSENDER__MSG__GATEWAYSENDER_0_CREATED_ON_1 =
       "GatewaySender \"{0}\" created on \"{1}\"";
-  public static final String CREATE_GATEWAYSENDER__MSG__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not instantiate class \"{0}\" specified for \"{1}\".";
-  public static final String CREATE_GATEWAYSENDER__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 =
-      "Could not access class \"{0}\" specified for \"{1}\".";
   public static final String CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS =
       "Gateway Sender cannot be created until all members are the current version";
 
@@ -3187,6 +3145,16 @@ public class CliStrings {
   public static final String START_SERVER__REDIRECT_OUTPUT__HELP =
       "Causes the member to redirect standard out and standard error to its own log file.";
 
+  // General CLI Error messages
+  public static final String ERROR__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1 =
+      "Could not find class \"{0}\" specified for \"{1}\".";
+  public static final String ERROR__MSG__CLASS_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE =
+      "Class \"{0}\" specified for \"{1}\" is not of an expected type.";
+  public static final String ERROR__MSG__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1 =
+      "Could not instantiate class \"{0}\" specified for \"{1}\".";
+  public static final String ERROR__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 =
+      "Could not access class \"{0}\" specified for \"{1}\".";
+
   /**
    * Creates a MessageFormat with the given pattern and uses it to format the given argument.
    *
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/util/RegionPath.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/util/RegionPath.java
index 3bd548b..1d3c5eb 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/util/RegionPath.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/util/RegionPath.java
@@ -38,7 +38,7 @@ public class RegionPath {
     regionPath = pathName;
     String[] regions = pathName.split(Region.SEPARATOR);
 
-    LinkedList<String> regionsNames = new LinkedList<String>();
+    LinkedList<String> regionsNames = new LinkedList<>();
     for (String region : regions) {
       if (!region.isEmpty()) {
         regionsNames.add(region);
@@ -69,6 +69,10 @@ public class RegionPath {
     return regionParentPath;
   }
 
+  public boolean isRoot() {
+    return regionParentPath == Region.SEPARATOR || regionParentPath == null;
+  }
+
   public String[] getRegionsOnParentPath() {
     String[] regionsOnPath = getParent().split(Region.SEPARATOR);
 
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 118ec2f..ab0c4d1 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -123,6 +123,7 @@ org/apache/geode/cache/client/internal/pooling/ConnectionDestroyedException,true
 org/apache/geode/cache/configuration/CacheConfig$AsyncEventQueue,false,asyncEventListener:org/apache/geode/cache/configuration/DeclarableType,batchSize:java/lang/String,batchTimeInterval:java/lang/String,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/String,enableBatchConflation:java/lang/Boolean,forwardExpirationDestroy:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayEventSubstitutionFilter:org/apache/geode/cache/configuration/De [...]
 org/apache/geode/cache/configuration/ClassNameType,false,className:java/lang/String
 org/apache/geode/cache/configuration/DeclarableType,false,parameters:java/util/List
+org/apache/geode/cache/configuration/DiskDirsType,false,diskDirs:java/util/List
 org/apache/geode/cache/configuration/DiskStoreType,false,allowForceCompaction:java/lang/Boolean,autoCompact:java/lang/Boolean,compactionThreshold:java/lang/String,diskDirs:org/apache/geode/cache/configuration/DiskDirsType,diskUsageCriticalPercentage:java/lang/String,diskUsageWarningPercentage:java/lang/String,maxOplogSize:java/lang/String,name:java/lang/String,queueSize:java/lang/String,timeInterval:java/lang/String,writeBufferSize:java/lang/String
 org/apache/geode/cache/configuration/EnumActionDestroyOverflow,false,value:java/lang/String
 org/apache/geode/cache/configuration/EnumReadableWritable,false,value:java/lang/String
@@ -134,9 +135,18 @@ org/apache/geode/cache/configuration/RegionAttributesDataPolicy,false,value:java
 org/apache/geode/cache/configuration/RegionAttributesIndexUpdateType,false,value:java/lang/String
 org/apache/geode/cache/configuration/RegionAttributesMirrorType,false,value:java/lang/String
 org/apache/geode/cache/configuration/RegionAttributesScope,false,value:java/lang/String
+org/apache/geode/cache/configuration/RegionAttributesType,false,asyncEventQueueIds:java/lang/String,cacheListeners:java/util/List,cacheLoader:org/apache/geode/cache/configuration/DeclarableType,cacheWriter:org/apache/geode/cache/configuration/DeclarableType,cloningEnabled:java/lang/Boolean,compressor:org/apache/geode/cache/configuration/ClassNameType,concurrencyChecksEnabled:java/lang/Boolean,concurrencyLevel:java/lang/String,dataPolicy:org/apache/geode/cache/configuration/RegionAttribut [...]
+org/apache/geode/cache/configuration/RegionAttributesType$EvictionAttributes,false,lruEntryCount:org/apache/geode/cache/configuration/RegionAttributesType$EvictionAttributes$LruEntryCount,lruHeapPercentage:org/apache/geode/cache/configuration/RegionAttributesType$EvictionAttributes$LruHeapPercentage,lruMemorySize:org/apache/geode/cache/configuration/RegionAttributesType$EvictionAttributes$LruMemorySize
+org/apache/geode/cache/configuration/RegionAttributesType$EvictionAttributes$LruEntryCount,false,action:org/apache/geode/cache/configuration/EnumActionDestroyOverflow,maximum:java/lang/String
 org/apache/geode/cache/configuration/RegionAttributesType$EvictionAttributes$LruHeapPercentage,false,action:org/apache/geode/cache/configuration/EnumActionDestroyOverflow
 org/apache/geode/cache/configuration/RegionAttributesType$EvictionAttributes$LruMemorySize,false,maximum:java/lang/String
+org/apache/geode/cache/configuration/RegionAttributesType$ExpirationAttributesDetail,false,action:java/lang/String,customExpiry:org/apache/geode/cache/configuration/DeclarableType,timeout:java/lang/String
+org/apache/geode/cache/configuration/RegionAttributesType$ExpirationAttributesType,false,expirationAttributes:org/apache/geode/cache/configuration/RegionAttributesType$ExpirationAttributesDetail
+org/apache/geode/cache/configuration/RegionAttributesType$MembershipAttributes,false,lossAction:java/lang/String,requiredRoles:java/util/List,resumptionAction:java/lang/String
+org/apache/geode/cache/configuration/RegionAttributesType$PartitionAttributes,false,colocatedWith:java/lang/String,fixedPartitionAttributes:java/util/List,localMaxMemory:java/lang/String,partitionListeners:java/util/List,partitionResolver:org/apache/geode/cache/configuration/DeclarableType,recoveryDelay:java/lang/String,redundantCopies:java/lang/String,startupRecoveryDelay:java/lang/String,totalMaxMemory:java/lang/String,totalNumBuckets:java/lang/String
+org/apache/geode/cache/configuration/RegionAttributesType$SubscriptionAttributes,false,interestPolicy:java/lang/String
 org/apache/geode/cache/configuration/RegionConfig,false,entries:java/util/List,indexes:java/util/List,name:java/lang/String,refid:java/lang/String,regionAttributes:org/apache/geode/cache/configuration/RegionAttributesType,regionElements:java/util/List,regions:java/util/List
+org/apache/geode/cache/configuration/RegionConfig$Entry,false,key:org/apache/geode/cache/configuration/ObjectType,value:org/apache/geode/cache/configuration/ObjectType
 org/apache/geode/cache/configuration/RegionConfig$Index,false,expression:java/lang/String,fromClause:java/lang/String,imports:java/lang/String,keyIndex:java/lang/Boolean,name:java/lang/String,type:java/lang/String
 org/apache/geode/cache/execute/EmptyRegionFunctionException,true,1
 org/apache/geode/cache/execute/FunctionAdapter,true,-4891043890440825485
@@ -534,6 +544,7 @@ org/apache/geode/management/internal/cli/functions/CreateDefinedIndexesFunction,
 org/apache/geode/management/internal/cli/functions/CreateDiskStoreFunction,true,1
 org/apache/geode/management/internal/cli/functions/CreateIndexFunction,true,1
 org/apache/geode/management/internal/cli/functions/CreateJndiBindingFunction,false
+org/apache/geode/management/internal/cli/functions/CreateRegionFunctionArgs,false,config:org/apache/geode/cache/configuration/RegionConfig,ifNotExists:boolean,regionPath:java/lang/String
 org/apache/geode/management/internal/cli/functions/DataCommandFunction,true,1,optimizeForWrite:boolean
 org/apache/geode/management/internal/cli/functions/DeployFunction,true,1
 org/apache/geode/management/internal/cli/functions/DescribeDiskStoreFunction,false
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandTest.java
index 83ea16b..1b334b4 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandTest.java
@@ -32,7 +32,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
@@ -42,8 +41,7 @@ import org.apache.geode.management.ManagementService;
 import org.apache.geode.management.cli.Result;
 import org.apache.geode.management.internal.cli.GfshParseResult;
 import org.apache.geode.management.internal.cli.domain.ClassName;
-import org.apache.geode.management.internal.cli.functions.RegionFunctionArgs;
-import org.apache.geode.management.internal.cli.functions.RegionFunctionArgs.ExpirationAttrs;
+import org.apache.geode.management.internal.cli.functions.CreateRegionFunctionArgs;
 import org.apache.geode.management.internal.cli.result.CommandResult;
 import org.apache.geode.test.junit.rules.GfshParserRule;
 
@@ -72,7 +70,7 @@ public class CreateRegionCommandTest {
 
   @Test
   public void testRegionExistsReturnsCorrectValue() throws Exception {
-    assertThat(command.regionExists(cache, null)).isFalse();
+    assertThat(command.regionExists(null)).isFalse();
   }
 
   @Test
@@ -138,8 +136,11 @@ public class CreateRegionCommandTest {
   @Test
   public void templateRegionAttributesNotAvailable() throws Exception {
     doReturn(null).when(command).getRegionAttributes(eq(cache), any());
+    DistributedSystemMXBean dsMBean = mock(DistributedSystemMXBean.class);
+    doReturn(dsMBean).when(command).getDSMBean();
+    doReturn(new String[] {}).when(dsMBean).listGatewaySenders();
     doReturn(Collections.emptySet()).when(command).findMembers(any(), any());
-    doReturn(true).when(command).regionExists(eq(cache), any());
+    doReturn(true).when(command).regionExists(any());
 
     CommandResult result = parser.executeCommandWithInstance(command,
         "create region --name=region --template-region=regionA");
@@ -155,49 +156,19 @@ public class CreateRegionCommandTest {
     when(resultCollector.getResult()).thenReturn(Collections.emptyList());
     DistributedSystemMXBean dsMBean = mock(DistributedSystemMXBean.class);
     doReturn(dsMBean).when(command).getDSMBean();
+    doReturn(new String[] {}).when(dsMBean).listGatewaySenders();
     doReturn(Collections.singleton(mock(DistributedMember.class))).when(command).findMembers(any(),
         any());
     doReturn(true).when(command).verifyDistributedRegionMbean(any(), any());
     when(service.getDistributedRegionMXBean(any())).thenReturn(null);
 
     parser.executeCommandWithInstance(command, "create region --name=A --type=REPLICATE");
-    ArgumentCaptor<RegionFunctionArgs> argsCaptor =
-        ArgumentCaptor.forClass(RegionFunctionArgs.class);
+    ArgumentCaptor<CreateRegionFunctionArgs> argsCaptor =
+        ArgumentCaptor.forClass(CreateRegionFunctionArgs.class);
     verify(command).executeFunction(any(), argsCaptor.capture(), any(Set.class));
-    RegionFunctionArgs args = argsCaptor.getValue();
-
-    assertThat(args.getRegionPath()).isEqualTo("/A");
-    assertThat(args.getRegionShortcut()).isEqualTo(RegionShortcut.REPLICATE);
-    assertThat(args.getTemplateRegion()).isNull();
-    assertThat(args.isIfNotExists()).isFalse();
-    assertThat(args.getKeyConstraint()).isNull();
-    assertThat(args.getValueConstraint()).isNull();
-    assertThat(args.getStatisticsEnabled()).isNull();
-
-    ExpirationAttrs empty = new ExpirationAttrs(null, null);
-    assertThat(args.getEntryExpirationIdleTime()).isNull();
-    assertThat(args.getEntryExpirationTTL()).isNull();
-    assertThat(args.getRegionExpirationIdleTime()).isNull();
-    assertThat(args.getRegionExpirationTTL()).isNull();
-
-    assertThat(args.getDiskStore()).isNull();
-    assertThat(args.getDiskSynchronous()).isNull();
-    assertThat(args.getEnableAsyncConflation()).isNull();
-    assertThat(args.getEnableSubscriptionConflation()).isNull();
-    assertThat(args.getCacheListeners()).isEmpty();
-    assertThat(args.getCacheLoader()).isNull();
-    assertThat(args.getCacheWriter()).isNull();
-    assertThat(args.getAsyncEventQueueIds()).isEmpty();
-    assertThat(args.getGatewaySenderIds()).isEmpty();
-    assertThat(args.getConcurrencyChecksEnabled()).isNull();
-    assertThat(args.getCloningEnabled()).isNull();
-    assertThat(args.getMcastEnabled()).isNull();
-    assertThat(args.getConcurrencyLevel()).isNull();
-    assertThat(args.getPartitionArgs()).isNull();
-    assertThat(args.getEvictionMax()).isNull();
-    assertThat(args.getCompressor()).isNull();
-    assertThat(args.getOffHeap()).isNull();
-    assertThat(args.getRegionAttributes()).isNull();
+    CreateRegionFunctionArgs args = argsCaptor.getValue();
+
+    assertThat(args.getConfig().getRegionAttributes()).isNotNull();
   }
 
   @Test
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactoryTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactoryTest.java
index 4f4b45c..6d257a7 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/domain/RegionConfigFactoryTest.java
@@ -16,57 +16,148 @@ package org.apache.geode.management.internal.cli.domain;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CustomExpiry;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.configuration.DeclarableType;
 import org.apache.geode.cache.configuration.EnumActionDestroyOverflow;
 import org.apache.geode.cache.configuration.RegionAttributesType;
 import org.apache.geode.cache.configuration.RegionConfig;
-import org.apache.geode.management.internal.cli.functions.RegionFunctionArgs;
 
 public class RegionConfigFactoryTest {
 
   RegionConfigFactory subject;
-  RegionFunctionArgs args;
+  RegionConfig config;
+
+  String regionPath;
+  String keyConstraint;
+  String valueConstraint;
+  Boolean statisticsEnabled;
+  Integer entryExpirationIdleTime;
+  ExpirationAction entryExpirationIdleAction;
+  Integer entryExpirationTTL;
+  ExpirationAction entryExpirationTTLAction;
+  ClassName<CustomExpiry> entryIdleTimeCustomExpiry;
+  ClassName<CustomExpiry> entryTTLCustomExpiry;
+
+  Integer regionExpirationIdleTime;
+  ExpirationAction regionExpirationIdleAction;
+  Integer regionExpirationTTL;
+  ExpirationAction regionExpirationTTLAction;
+
+  String evictionAction;
+  Integer evictionMaxMemory;
+  Integer evictionEntryCount;
+  String evictionObjectSizer;
+
+  String diskStore;
+  Boolean diskSynchronous;
+  Boolean enableAsyncConflation;
+  Boolean enableSubscriptionConflation;
+  Set<ClassName<CacheListener>> cacheListeners;
+  ClassName<CacheLoader> cacheLoader;
+  ClassName<CacheWriter> cacheWriter;
+  Set<String> asyncEventQueueIds;
+  Set<String> gatewaySenderIds;
+  Boolean concurrencyChecksEnabled;
+  Boolean cloningEnabled;
+  Boolean mcastEnabled;
+  Integer concurrencyLevel;
+  PartitionArgs partitionArgs;
+  String compressor;
+  Boolean offHeap;
+  RegionAttributes<?, ?> regionAttributes;
 
   @Before
   public void setup() {
     subject = new RegionConfigFactory();
-    args = new RegionFunctionArgs();
-    args.setRegionPath("region-name");
+    regionPath = "region-name";
+    keyConstraint = null;
+    valueConstraint = null;
+    statisticsEnabled = null;
+    entryExpirationIdleTime = null;
+    entryExpirationIdleAction = null;
+    entryExpirationTTL = null;
+    entryExpirationTTLAction = null;
+    entryIdleTimeCustomExpiry = null;
+    entryTTLCustomExpiry = null;
+    regionExpirationIdleTime = null;
+    regionExpirationIdleAction = null;
+    regionExpirationTTL = null;
+    regionExpirationTTLAction = null;
+    evictionAction = null;
+    evictionMaxMemory = null;
+    evictionEntryCount = null;
+    evictionObjectSizer = null;
+    diskStore = null;
+    diskSynchronous = null;
+    enableAsyncConflation = null;
+    enableSubscriptionConflation = null;
+    cacheListeners = null;
+    cacheLoader = null;
+    cacheWriter = null;
+    asyncEventQueueIds = null;
+    gatewaySenderIds = null;
+    concurrencyChecksEnabled = null;
+    cloningEnabled = null;
+    mcastEnabled = null;
+    concurrencyLevel = null;
+    partitionArgs = null;
+    compressor = null;
+    offHeap = null;
+    regionAttributes = null;
+  }
+
+  private void generate() {
+    config = subject.generate(regionPath, keyConstraint, valueConstraint, statisticsEnabled,
+        entryExpirationIdleTime, entryExpirationIdleAction, entryExpirationTTL,
+        entryExpirationTTLAction,
+        entryIdleTimeCustomExpiry, entryTTLCustomExpiry, regionExpirationIdleTime,
+        regionExpirationIdleAction,
+        regionExpirationTTL, regionExpirationTTLAction, evictionAction, evictionMaxMemory,
+        evictionEntryCount,
+        evictionObjectSizer, diskStore, diskSynchronous, enableAsyncConflation,
+        enableSubscriptionConflation,
+        cacheListeners, cacheLoader,
+        cacheWriter,
+        asyncEventQueueIds, gatewaySenderIds, concurrencyChecksEnabled, cloningEnabled,
+        mcastEnabled,
+        concurrencyLevel, partitionArgs, compressor, offHeap, regionAttributes);
   }
 
   @Test
   public void generatesConfigForRegion() {
-    RegionConfig config = subject.generate(args);
+    generate();
     assertThat(config.getName()).isEqualTo("region-name");
   }
 
   @Test
   public void generatesConfigForSubRegion() {
-    args.setRegionPath("region-name/subregion");
+    regionPath = "region-name/subregion";
 
-    RegionConfig config = subject.generate(args);
+    generate();
     assertThat(config.getName()).isEqualTo("subregion");
   }
 
   @Test
-  public void generatesNotNullWithNoAttributes() {
-    RegionConfig config = subject.generate(args);
-    assertThat(config.getRegionAttributes()).isNotNull();
-  }
-
-  @Test
   public void generatesWithConstraintAttributes() {
-    args.setKeyConstraint("key-const");
-    args.setValueConstraint("value-const");
+    keyConstraint = "key-const";
+    valueConstraint = "value-const";
 
-    RegionConfig config = subject.generate(args);
+    generate();
     assertThat(config.getRegionAttributes().getKeyConstraint()).isEqualTo("key-const");
     assertThat(config.getRegionAttributes().getValueConstraint())
         .isEqualTo("value-const");
@@ -74,13 +165,20 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithExpirationIdleTimeAttributes() {
-    args.setRegionExpirationTTL(10, ExpirationAction.DESTROY);
-    args.setRegionExpirationIdleTime(3, ExpirationAction.INVALIDATE);
-    args.setEntryExpirationTTL(1, ExpirationAction.LOCAL_DESTROY);
-    args.setEntryExpirationIdleTime(12, ExpirationAction.LOCAL_DESTROY);
-    args.setEntryIdleTimeCustomExpiry(new ClassName<>("java.lang.String"));
+    regionExpirationTTL = 10;
+    regionExpirationTTLAction = ExpirationAction.DESTROY;
 
-    RegionConfig config = subject.generate(args);
+    regionExpirationIdleTime = 3;
+    regionExpirationIdleAction = ExpirationAction.INVALIDATE;
+
+    entryExpirationTTL = 1;
+    entryExpirationTTLAction = ExpirationAction.LOCAL_DESTROY;
+
+    entryExpirationIdleTime = 12;
+    entryExpirationIdleAction = ExpirationAction.LOCAL_DESTROY;
+    entryIdleTimeCustomExpiry = new ClassName<>("java.lang.String");
+
+    generate();
     RegionAttributesType.ExpirationAttributesType regionTimeToLive =
         config.getRegionAttributes().getRegionTimeToLive();
     assertThat(regionTimeToLive.getTimeout()).isEqualTo("10");
@@ -102,21 +200,21 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithDiskAttributes() {
-    args.setDiskStore("disk-store");
-    args.setDiskSynchronous(false);
+    diskStore = "disk-store";
+    diskSynchronous = false;
 
-    RegionConfig config = subject.generate(args);
+    generate();
     assertThat(config.getRegionAttributes().getDiskStoreName()).isEqualTo("disk-store");
     assertThat(config.getRegionAttributes().isDiskSynchronous()).isEqualTo(false);
   }
 
   @Test
   public void generatesWithPrAttributes() {
-    args.setPartitionArgs("colo-with", 100,
+    partitionArgs = new PartitionArgs("colo-with", 100,
         100L, 100, 100L,
         100L, 100, "java.lang.String");
 
-    RegionConfig config = subject.generate(args);
+    generate();
     RegionAttributesType.PartitionAttributes partitionAttributes =
         config.getRegionAttributes().getPartitionAttributes();
     assertThat(partitionAttributes).isNotNull();
@@ -134,14 +232,14 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithMiscBooleanFlags() {
-    args.setStatisticsEnabled(false);
-    args.setEnableAsyncConflation(false);
-    args.setConcurrencyChecksEnabled(true);
-    args.setEnableSubscriptionConflation(true);
-    args.setMcastEnabled(false);
-    args.setCloningEnabled(false);
-    args.setOffHeap(true);
-    RegionConfig config = subject.generate(args);
+    statisticsEnabled = false;
+    enableAsyncConflation = false;
+    concurrencyChecksEnabled = true;
+    enableSubscriptionConflation = true;
+    mcastEnabled = false;
+    cloningEnabled = false;
+    offHeap = true;
+    generate();
 
     assertThat(config.getRegionAttributes().isStatisticsEnabled()).isEqualTo(false);
     assertThat(config.getRegionAttributes().isEnableSubscriptionConflation())
@@ -158,8 +256,9 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithGatewayFlags() {
-    args.setGatewaySenderIds(new String[] {"some-id", "some-other-id"});
-    RegionConfig config = subject.generate(args);
+    gatewaySenderIds =
+        Arrays.stream(new String[] {"some-id", "some-other-id"}).collect(Collectors.toSet());
+    generate();
 
     assertThat(config.getRegionAttributes().getGatewaySenderIds())
         .contains("some-id");
@@ -169,9 +268,9 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithEvictionHeapPercentageFlags() {
-    args.setEvictionAttributes(EvictionAction.LOCAL_DESTROY.toString(), null, null,
-        "java.lang.String");
-    RegionConfig config = subject.generate(args);
+    evictionAction = EvictionAction.LOCAL_DESTROY.toString();
+    evictionObjectSizer = "java.lang.String";
+    generate();
 
     RegionAttributesType.EvictionAttributes evictionAttributes =
         config.getRegionAttributes().getEvictionAttributes();
@@ -184,9 +283,9 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithEvictionMaxMemory() {
-    args.setEvictionAttributes(EvictionAction.LOCAL_DESTROY.toString(), 100, null,
-        null);
-    RegionConfig config = subject.generate(args);
+    evictionAction = EvictionAction.LOCAL_DESTROY.toString();
+    evictionMaxMemory = 100;
+    generate();
 
     RegionAttributesType.EvictionAttributes evictionAttributes =
         config.getRegionAttributes().getEvictionAttributes();
@@ -198,9 +297,9 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithEvictionMaxEntry() {
-    args.setEvictionAttributes(EvictionAction.OVERFLOW_TO_DISK.toString(), null, 1,
-        null);
-    RegionConfig config = subject.generate(args);
+    evictionAction = EvictionAction.OVERFLOW_TO_DISK.toString();
+    evictionEntryCount = 1;
+    generate();
     RegionAttributesType.EvictionAttributes evictionAttributes =
         config.getRegionAttributes().getEvictionAttributes();
     assertThat(evictionAttributes).isNotNull();
@@ -211,8 +310,8 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithAsyncEventQueueIds() {
-    args.setAsyncEventQueueIds(new String[] {"id-1", "id-2"});
-    RegionConfig config = subject.generate(args);
+    asyncEventQueueIds = Arrays.stream(new String[] {"id-1", "id-2"}).collect(Collectors.toSet());
+    generate();
 
     assertThat(config.getRegionAttributes().getAsyncEventQueueIds())
         .contains("id-1");
@@ -222,10 +321,11 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithCacheClasses() {
-    args.setCacheListeners(new ClassName[] {new ClassName("java.lang.String")});
-    args.setCacheLoader(new ClassName("java.lang.String"));
-    args.setCacheWriter(new ClassName("java.lang.String"));
-    RegionConfig config = subject.generate(args);
+    cacheListeners = new HashSet<>();
+    cacheListeners.add(new ClassName<>("java.lang.String"));
+    cacheLoader = new ClassName("java.lang.String");
+    cacheWriter = new ClassName("java.lang.String");
+    generate();
 
     List<DeclarableType> cacheListeners = config.getRegionAttributes().getCacheListeners();
 
@@ -241,10 +341,9 @@ public class RegionConfigFactoryTest {
 
   @Test
   public void generatesWithOtherMiscSimpleFlags() {
-    args.setCompressor("java.lang.String");
-    args.setConcurrencyLevel(1);
-
-    RegionConfig config = subject.generate(args);
+    compressor = "java.lang.String";
+    concurrencyLevel = 1;
+    generate();
 
     assertThat(
         config.getRegionAttributes().getCompressor().getClassName())
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java
index 57d91a3..dbd4b5c 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/CreateRegionCommandDUnitTest.java
@@ -16,8 +16,7 @@ package org.apache.geode.management.internal.cli.commands;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -36,17 +35,17 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 public class CreateRegionCommandDUnitTest {
   private static MemberVM locator, server1, server2;
 
-  @ClassRule
-  public static ClusterStartupRule lsRule = new ClusterStartupRule();
+  @Rule
+  public ClusterStartupRule lsRule = new ClusterStartupRule();
 
-  @ClassRule
-  public static GfshCommandRule gfsh = new GfshCommandRule();
+  @Rule
+  public GfshCommandRule gfsh = new GfshCommandRule();
 
   @Rule
   public TestName testName = new SerializableTestName();
 
-  @BeforeClass
-  public static void before() throws Exception {
+  @Before
+  public void before() throws Exception {
     locator = lsRule.startLocatorVM(0);
     server1 = lsRule.startServerVM(1, locator.getPort());
     server2 = lsRule.startServerVM(2, locator.getPort());
@@ -105,10 +104,32 @@ public class CreateRegionCommandDUnitTest {
     gfsh.executeAndAssertThat("list regions").statusIsSuccess().doesNotContainOutput(regionName);
   }
 
+  @Test
+  public void cannotCreateRegionIfGatewaySenderDoesNotExist() {
+    String regionName = testName.getMethodName();
+    String gatewaySenderName = "gatewaySender";
+    IgnoredException.addIgnoredException("could not get remote locator information");
+
+    gfsh.executeAndAssertThat(
+        "create gateway-sender --remote-distributed-system-id=2 --id="
+            + gatewaySenderName)
+        .statusIsSuccess();
+    locator.waitUntilGatewaySendersAreReadyOnExactlyThisManyServers(2);
+
+    gfsh.executeAndAssertThat("create region --type=REPLICATE  --name=" + regionName
+        + " --gateway-sender-id=" + gatewaySenderName + "-2")
+        .statusIsError()
+        .containsOutput("Specify valid gateway-sender-id");
+
+    // The exception must be thrown early in the initialization, so the region itself shouldn't be
+    // added to the root regions.
+    gfsh.executeAndAssertThat("list regions").statusIsSuccess().doesNotContainOutput(regionName);
+  }
+
   /**
    * Ignored this test until we refactor the FetchRegionAttributesFunction to not use
    * AttributesFactory, and instead use RegionConfig, which we will do as part of implementing
-   * GEODE-6103
+   * GEODE-6104
    */
   @Ignore
   @Test