You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/11/14 18:44:54 UTC

[geode] branch develop updated: GEODE-6010: change create jdbc-mapping to alter region and create async-queue (#2836)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0dc46ea  GEODE-6010: change create jdbc-mapping to alter region and create async-queue (#2836)
0dc46ea is described below

commit 0dc46eab3a95195ea45e2215047a5a79a0bde2e8
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed Nov 14 10:44:45 2018 -0800

    GEODE-6010: change create jdbc-mapping to alter region and create async-queue (#2836)
    
    create jdbc-mapping now also does the following:
    -- alters the region to have a JdbcLoader as its cache-loader
    -- if synchronous, alters the region to have a JdbcWriter as its cache-writer
    -- if asynchronous, creates an async-event-queue and alters regions to have it
    The async-event-queue is created parallel for partitioned regions, and serial for all other regions.
    
    create jdbc-mapping now requires cluster configuration and uses it to check
    all required preconditions before the command starts changing things.
    
    Co-authored-by: Darrel Schneider <ds...@pivotal.io>
    Co-authored-by: Jianxia Chen <jc...@pivotal.io>
    Co-authored-by: Scott Jewell <sj...@pivotal.io>
---
 .../geode/connectors/jdbc/JdbcDistributedTest.java | 180 +++----
 .../cli/CreateMappingCommandDUnitTest.java         | 267 +++++++++-
 .../cli/CreateMappingCommandIntegrationTest.java   | 121 -----
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 .../jdbc/internal/cli/CreateMappingCommand.java    | 211 +++++++-
 .../jdbc/internal/cli/CreateMappingFunction.java   |  61 ++-
 .../jdbc/internal/cli/PreconditionException.java   |  28 ++
 .../internal/cli/CreateMappingCommandTest.java     | 537 +++++++++++++++++++++
 .../internal/cli/CreateMappingFunctionTest.java    |  98 +++-
 9 files changed, 1242 insertions(+), 262 deletions(-)

diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
index c0e44c0..94370e9 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
 import org.apache.geode.pdx.internal.AutoSerializableManager;
@@ -179,8 +180,11 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception {
     createTable();
-    createRegionUsingGfsh(true, false, false);
-    createJdbcConnection();
+    StringBuffer createRegionCmd = new StringBuffer();
+    createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE"
+        + " --cache-writer=" + JdbcWriter.class.getName());
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+    createJdbcDataSource();
 
     server.invoke(() -> {
       PdxInstance pdxEmployee1 =
@@ -197,8 +201,12 @@ public abstract class JdbcDistributedTest implements Serializable {
   public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws Exception {
     createTable();
     IgnoredException.addIgnoredException("JdbcConnectorException");
-    createRegionUsingGfsh(false, true, false);
-    createJdbcConnection();
+    StringBuffer createRegionCmd = new StringBuffer();
+    createAsyncListener("JAW");
+    createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE"
+        + " --async-event-queue-id=JAW");
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+    createJdbcDataSource();
 
     server.invoke(() -> {
       PdxInstance pdxEmployee1 =
@@ -212,32 +220,14 @@ public abstract class JdbcDistributedTest implements Serializable {
       await().untilAsserted(() -> {
         assertThat(asyncWriter.getFailedEvents()).isEqualTo(1);
       });
-
-    });
-  }
-
-  @Test
-  public void throwsExceptionWhenNoMappingMatches() throws Exception {
-    createTable();
-    createRegionUsingGfsh(true, false, false);
-    createJdbcConnection();
-
-    server.invoke(() -> {
-      PdxInstance pdxEmployee1 =
-          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
-              .writeString("name", "Emp1").writeInt("age", 55).create();
-      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
-      assertThatThrownBy(() -> region.put("key1", pdxEmployee1))
-          .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
-              "JDBC mapping for region employees not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
     });
   }
 
   @Test
   public void throwsExceptionWhenNoDataSourceExists() throws Exception {
     createTable();
-    createRegionUsingGfsh(true, false, false);
-    createMapping(REGION_NAME, DATA_SOURCE_NAME);
+    createRegionUsingGfsh();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
 
     server.invoke(() -> {
       PdxInstance pdxEmployee1 =
@@ -262,9 +252,9 @@ public abstract class JdbcDistributedTest implements Serializable {
           "Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
               + TestDate.DATE_FIELD_NAME + " date)");
     });
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
     final String key = "emp1";
     final java.sql.Date sqlDate = java.sql.Date.valueOf("1982-09-11");
     final Date jdkDate = new Date(sqlDate.getTime());
@@ -297,9 +287,9 @@ public abstract class JdbcDistributedTest implements Serializable {
           "Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
               + TestDate.DATE_FIELD_NAME + " time)");
     });
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
     final String key = "emp1";
     final java.sql.Time sqlTime = java.sql.Time.valueOf("23:59:59");
     final Date jdkDate = new Date(sqlTime.getTime());
@@ -327,9 +317,9 @@ public abstract class JdbcDistributedTest implements Serializable {
     server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
     createTableWithTimeStamp(server, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
 
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
     final String key = "emp1";
     final java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1982-09-11 23:59:59.123");
     final Date jdkDate = new Date(sqlTimestamp.getTime());
@@ -365,9 +355,9 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void putWritesToDB() throws Exception {
     createTable();
-    createRegionUsingGfsh(true, false, false);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME);
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
     server.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -382,9 +372,26 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void putAsyncWritesToDB() throws Exception {
     createTable();
-    createRegionUsingGfsh(true, false, false);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME);
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+      String key = "emp1";
+      ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1);
+      assertTableHasEmployeeData(1, pdxEmployee1, key);
+    });
+  }
+
+  @Test
+  public void putAsyncWithPartitionWritesToDB() throws Exception {
+    createTable();
+    createPartitionRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
     server.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -399,9 +406,9 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromEmptyDB() throws Exception {
     createTable();
-    createRegionUsingGfsh(false, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME);
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
     server.invoke(() -> {
       String key = "emp1";
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
@@ -414,9 +421,9 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromDB() throws Exception {
     createTable();
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME);
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
     server.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -442,9 +449,9 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromDBWithAsyncWriter() throws Exception {
     createTable();
-    createRegionUsingGfsh(false, true, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME);
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
     server.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -452,7 +459,8 @@ public abstract class JdbcDistributedTest implements Serializable {
       String key = "id1";
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter) ClusterStartupRule.getCache()
-          .getAsyncEventQueue("JAW").getAsyncEventListener();
+          .getAsyncEventQueue(CreateMappingCommand.createAsyncEventQueueName(REGION_NAME))
+          .getAsyncEventListener();
 
       region.put(key, pdxEmployee1);
       await().untilAsserted(() -> {
@@ -473,9 +481,9 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromDBWithPdxClassName() throws Exception {
     createTable();
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true);
     server.invoke(() -> {
       String key = "id1";
       Employee value = new Employee(key, "Emp1", 55);
@@ -495,9 +503,9 @@ public abstract class JdbcDistributedTest implements Serializable {
     ClientVM client = getClientVM();
     createClientRegion(client);
 
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     client.invoke(() -> {
       String key = "id1";
       ClassWithSupportedPdxFields value =
@@ -519,9 +527,9 @@ public abstract class JdbcDistributedTest implements Serializable {
     ClientVM client = getClientVM();
     createClientRegion(client);
 
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     client.invoke(() -> {
       String key = "id1";
       ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
@@ -540,9 +548,9 @@ public abstract class JdbcDistributedTest implements Serializable {
     createTableForAllSupportedFields();
     ClientVM client = getClientVM();
     createClientRegion(client);
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     String key = "id1";
     ClassWithSupportedPdxFields value =
         new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2,
@@ -569,9 +577,9 @@ public abstract class JdbcDistributedTest implements Serializable {
     createTableForAllSupportedFields();
     ClientVM client = getClientVM();
     createClientRegion(client);
-    createRegionUsingGfsh(true, false, true);
-    createJdbcConnection();
-    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     String key = "id1";
     ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
 
@@ -607,9 +615,9 @@ public abstract class JdbcDistributedTest implements Serializable {
     });
   }
 
-  private void createJdbcConnection() {
+  private void createJdbcDataSource() {
     final String commandStr =
-        "create jndi-binding --type=POOLED --name=" + DATA_SOURCE_NAME + " --url=" + connectionUrl;
+        "create data-source --pooled --name=" + DATA_SOURCE_NAME + " --url=" + connectionUrl;
     gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
   }
 
@@ -620,32 +628,36 @@ public abstract class JdbcDistributedTest implements Serializable {
     gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
   }
 
-  private void createRegionUsingGfsh(boolean withCacheWriter, boolean withAsyncWriter,
-      boolean withLoader) {
+  private void createRegionUsingGfsh() {
     StringBuffer createRegionCmd = new StringBuffer();
-    createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE ");
-    if (withCacheWriter) {
-      createRegionCmd.append(" --cache-writer=" + JdbcWriter.class.getName());
-    }
-    if (withLoader) {
-      createRegionCmd.append(" --cache-loader=" + JdbcLoader.class.getName());
-    }
-    if (withAsyncWriter) {
-      createAsyncListener("JAW");
-      createRegionCmd.append(" --async-event-queue-id=JAW");
-    }
+    createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE");
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+  }
 
+  private void createPartitionRegionUsingGfsh() {
+    StringBuffer createRegionCmd = new StringBuffer();
+    createRegionCmd.append("create region --name=" + REGION_NAME + " --type=PARTITION");
     gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
   }
 
-  private void createMapping(String regionName, String connectionName) {
-    createMapping(regionName, connectionName, Employee.class.getName());
+  private void createMapping(String regionName, String connectionName, boolean synchronous) {
+    createMapping(regionName, connectionName, Employee.class.getName(), synchronous);
   }
 
-  private void createMapping(String regionName, String connectionName, String pdxClassName) {
-    final String commandStr = "create jdbc-mapping --region=" + regionName + " --data-source="
-        + connectionName + (pdxClassName != null ? " --pdx-name=" + pdxClassName : "");
+  private void createMapping(String regionName, String connectionName, String pdxClassName,
+      boolean synchronous) {
+    final String commandStr = "create jdbc-mapping --region=" + regionName
+        + " --data-source=" + connectionName
+        + " --synchronous=" + synchronous
+        + " --pdx-name=" + pdxClassName;
     gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+    if (!synchronous) {
+      final String alterAsyncQueue =
+          "alter async-event-queue --id="
+              + CreateMappingCommand.createAsyncEventQueueName(regionName)
+              + " --batch-size=1 --batch-time-interval=0";
+      gfsh.executeAndAssertThat(alterAsyncQueue).statusIsSuccess();
+    }
   }
 
   private void assertTableHasEmployeeData(int size, PdxInstance employee, String key)
diff --git a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
index 8d86ec9..d9f8e94 100644
--- a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
+++ b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
@@ -18,14 +18,25 @@ import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__DATA_SOURCE_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__PDX_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__REGION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__SYNCHRONOUS_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__TABLE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.List;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
+import org.apache.geode.connectors.jdbc.JdbcWriter;
 import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.distributed.internal.InternalLocator;
@@ -61,12 +72,96 @@ public class CreateMappingCommandDUnitTest {
 
     gfsh.connectAndVerify(locator);
 
-    gfsh.executeAndAssertThat("create region --name=" + REGION_NAME + " --type=REPLICATE")
+  }
+
+  private void setupReplicate() {
+    setupReplicate(false);
+  }
+
+  private void setupReplicate(boolean addLoader) {
+    gfsh.executeAndAssertThat("create region --name=" + REGION_NAME + " --type=REPLICATE"
+        + (addLoader ? " --cache-loader=" + JdbcLoader.class.getName() : ""))
         .statusIsSuccess();
   }
 
+  private void setupPartition() {
+    gfsh.executeAndAssertThat("create region --name=" + REGION_NAME + " --type=PARTITION")
+        .statusIsSuccess();
+  }
+
+  private void setupAsyncEventQueue() {
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --id="
+            + CreateMappingCommand.createAsyncEventQueueName(REGION_NAME)
+            + " --listener=" + JdbcAsyncWriter.class.getName())
+        .statusIsSuccess();
+  }
+
+  private static RegionMapping getRegionMappingFromClusterConfig() {
+    CacheConfig cacheConfig =
+        InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(null);
+    RegionConfig regionConfig = cacheConfig.getRegions().stream()
+        .filter(region -> region.getName().equals(REGION_NAME)).findFirst().orElse(null);
+    return (RegionMapping) regionConfig.getCustomRegionElements().stream()
+        .filter(element -> element instanceof RegionMapping).findFirst().orElse(null);
+  }
+
+  private static RegionMapping getRegionMappingFromService() {
+    return ClusterStartupRule.getCache().getService(JdbcConnectorService.class)
+        .getMappingForRegion(REGION_NAME);
+  }
+
+  private static void validateAsyncEventQueueCreatedInClusterConfig(boolean isParallel) {
+    CacheConfig cacheConfig =
+        InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(null);
+    List<CacheConfig.AsyncEventQueue> queueList = cacheConfig.getAsyncEventQueues();
+    CacheConfig.AsyncEventQueue queue = queueList.get(0);
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+    assertThat(queue.getId()).isEqualTo(queueName);
+    assertThat(queue.getAsyncEventListener().getClassName())
+        .isEqualTo(JdbcAsyncWriter.class.getName());
+    assertThat(queue.isParallel()).isEqualTo(isParallel);
+  }
+
+  private static void validateRegionAlteredInClusterConfig(boolean synchronous) {
+    CacheConfig cacheConfig =
+        InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(null);
+    RegionConfig regionConfig = cacheConfig.getRegions().stream()
+        .filter(region -> region.getName().equals(REGION_NAME)).findFirst().orElse(null);
+    RegionAttributesType attributes = regionConfig.getRegionAttributes().get(0);
+    assertThat(attributes.getCacheLoader().getClassName()).isEqualTo(JdbcLoader.class.getName());
+    if (synchronous) {
+      assertThat(attributes.getCacheWriter().getClassName()).isEqualTo(JdbcWriter.class.getName());
+    } else {
+      String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+      assertThat(attributes.getAsyncEventQueueIds()).isEqualTo(queueName);
+    }
+  }
+
+  private static void validateAsyncEventQueueCreatedOnServer(boolean isParallel) {
+    InternalCache cache = ClusterStartupRule.getCache();
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+    AsyncEventQueue queue = cache.getAsyncEventQueue(queueName);
+    assertThat(queue).isNotNull();
+    assertThat(queue.getAsyncEventListener()).isInstanceOf(JdbcAsyncWriter.class);
+    assertThat(queue.isParallel()).isEqualTo(isParallel);
+  }
+
+  private static void validateRegionAlteredOnServer(boolean synchronous) {
+    InternalCache cache = ClusterStartupRule.getCache();
+    Region<?, ?> region = cache.getRegion(REGION_NAME);
+    assertThat(region.getAttributes().getCacheLoader()).isInstanceOf(JdbcLoader.class);
+    if (synchronous) {
+      assertThat(region.getAttributes().getCacheWriter()).isInstanceOf(JdbcWriter.class);
+    } else {
+      String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+      assertThat(region.getAttributes().getAsyncEventQueueIds()).contains(queueName);
+    }
+  }
+
   @Test
-  public void createsMappingWithAllOptions() {
+  public void createMappingUpdatesServiceAndClusterConfig() {
+    setupReplicate();
     CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
     csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
     csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
@@ -75,24 +170,57 @@ public class CreateMappingCommandDUnitTest {
 
     gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
 
+    server.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService();
+      assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(mapping.getTableName()).isEqualTo("myTable");
+      assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredOnServer(false);
+      validateAsyncEventQueueCreatedOnServer(false);
+    });
+
     locator.invoke(() -> {
-      String xml = InternalLocator.getLocator().getConfigurationPersistenceService()
-          .getConfiguration("cluster").getCacheXmlContent();
-      assertThat(xml).isNotNull().contains("jdbc:mapping");
+      RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+      assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+      assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredInClusterConfig(false);
+      validateAsyncEventQueueCreatedInClusterConfig(false);
     });
+  }
+
+  @Test
+  public void createSynchronousMappingUpdatesServiceAndClusterConfig() {
+    setupReplicate();
+    CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+    csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+    csb.addOption(CREATE_MAPPING__TABLE_NAME, "myTable");
+    csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+    csb.addOption(CREATE_MAPPING__SYNCHRONOUS_NAME, "true");
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
 
     server.invoke(() -> {
-      InternalCache cache = ClusterStartupRule.getCache();
-      RegionMapping mapping =
-          cache.getService(JdbcConnectorService.class).getMappingForRegion(REGION_NAME);
+      RegionMapping mapping = getRegionMappingFromService();
       assertThat(mapping.getDataSourceName()).isEqualTo("connection");
       assertThat(mapping.getTableName()).isEqualTo("myTable");
       assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredOnServer(true);
+    });
+
+    locator.invoke(() -> {
+      RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+      assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+      assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredInClusterConfig(true);
     });
   }
 
   @Test
-  public void createsRegionMappingUpdatesClusterConfig() {
+  public void createMappingWithPartitionUpdatesServiceAndClusterConfig() {
+    setupPartition();
     CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
     csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
     csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
@@ -101,15 +229,57 @@ public class CreateMappingCommandDUnitTest {
 
     gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
 
+    server.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService();
+      assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(mapping.getTableName()).isEqualTo("myTable");
+      assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredOnServer(false);
+      validateAsyncEventQueueCreatedOnServer(true);
+    });
+
+    locator.invoke(() -> {
+      RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+      assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+      assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredInClusterConfig(false);
+      validateAsyncEventQueueCreatedInClusterConfig(true);
+    });
+  }
+
+  @Test
+  public void createMappingWithNoTable() {
+    setupReplicate();
+    CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+    csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+    csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+    server.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService();
+      assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(mapping.getTableName()).isNull();
+      assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredOnServer(false);
+      validateAsyncEventQueueCreatedOnServer(false);
+    });
+
     locator.invoke(() -> {
-      String xml = InternalLocator.getLocator().getConfigurationPersistenceService()
-          .getConfiguration("cluster").getCacheXmlContent();
-      assertThat(xml).isNotNull().contains("jdbc:mapping");
+      RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+      assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(regionMapping.getTableName()).isNull();
+      assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+      validateRegionAlteredInClusterConfig(false);
+      validateAsyncEventQueueCreatedInClusterConfig(false);
     });
   }
 
   @Test
   public void createExistingRegionMappingFails() {
+    setupReplicate();
     CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
     csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
     csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
@@ -119,27 +289,78 @@ public class CreateMappingCommandDUnitTest {
 
     csb = new CommandStringBuilder(CREATE_MAPPING);
     csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
-    csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
-    csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
-    csb.addOption(CREATE_MAPPING__TABLE_NAME, "bogus");
-    gfsh.executeAndAssertThat(csb.toString()).statusIsError();
+    csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "bogusConnection");
+    csb.addOption(CREATE_MAPPING__PDX_NAME, "bogusPdxClass");
+    csb.addOption(CREATE_MAPPING__TABLE_NAME, "bogusTable");
+    gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+        .containsOutput("A jdbc-mapping for " + REGION_NAME + " already exists");
+
+    server.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService();
+      assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(mapping.getTableName()).isEqualTo("myTable");
+      assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+    });
 
     locator.invoke(() -> {
-      String xml = InternalLocator.getLocator().getConfigurationPersistenceService()
-          .getConfiguration("cluster").getCacheXmlContent();
-      assertThat(xml).isNotNull().contains("jdbc:mapping").contains("myTable")
-          .contains("myPdxClass")
-          .doesNotContain("bogus");
+      RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+      assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+      assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+      assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
     });
   }
 
   @Test
   public void createMappingWithoutPdxNameFails() {
+    setupReplicate();
     CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
     csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
     csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
-    // NOTE: --table is optional so it should not be in the ouput but it is. See GEODE-3468.
+
+    // NOTE: --table is optional so it should not be in the output but it is. See GEODE-3468.
     gfsh.executeAndAssertThat(csb.toString()).statusIsError()
-        .containsOutput("You should specify option (--table, --pdx-name) for this command");
+        .containsOutput(
+            "You should specify option (--table, --pdx-name, --synchronous) for this command");
   }
+
+  @Test
+  public void createMappingWithNonExistentRegionFails() {
+    setupReplicate();
+    CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(CREATE_MAPPING__REGION_NAME, "bogusRegion");
+    csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+    csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+        .containsOutput("A region named bogusRegion must already exist");
+  }
+
+  @Test
+  public void createMappingWithRegionThatHasALoaderFails() {
+    setupReplicate(true);
+    CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+    csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+    csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+        .containsOutput("The existing region " + REGION_NAME
+            + " must not already have a cache-loader, but it has " + JdbcLoader.class.getName());
+  }
+
+  @Test
+  public void createMappingWithExistingQueueFails() {
+    setupReplicate();
+    setupAsyncEventQueue();
+    CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+    csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+    csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+        .containsOutput("An async-event-queue named "
+            + CreateMappingCommand.createAsyncEventQueueName(REGION_NAME)
+            + " must not already exist.");
+  }
+
 }
diff --git a/geode-connectors/src/integrationTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java b/geode-connectors/src/integrationTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java
deleted file mode 100644
index f3d33ee..0000000
--- a/geode-connectors/src/integrationTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.connectors.jdbc.internal.cli;
-
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
-import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
-import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.result.model.ResultModel;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.junit.categories.JDBCConnectorTest;
-
-@Category({JDBCConnectorTest.class})
-public class CreateMappingCommandIntegrationTest {
-
-  private InternalCache cache;
-  private CreateMappingCommand createRegionMappingCommand;
-
-  private String regionName;
-  private String dataSourceName;
-  private String tableName;
-  private String pdxClass;
-
-  @Before
-  public void setup() {
-    regionName = "regionName";
-    dataSourceName = "connection";
-    tableName = "testTable";
-    pdxClass = "myPdxClass";
-
-    cache = (InternalCache) new CacheFactory().set("locators", "").set("mcast-port", "0")
-        .set(ENABLE_CLUSTER_CONFIGURATION, "true").create();
-    cache.createRegionFactory(RegionShortcut.LOCAL).create(regionName);
-
-    createRegionMappingCommand = new CreateMappingCommand();
-    createRegionMappingCommand.setCache(cache);
-  }
-
-  @After
-  public void tearDown() {
-    cache.close();
-  }
-
-  @Test
-  public void createsRegionMappingInService() {
-    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass);
-
-    assertThat(result.getStatus()).isSameAs(Result.Status.OK);
-
-    JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
-    RegionMapping regionMapping = service.getMappingForRegion(regionName);
-
-    assertThat(regionMapping).isNotNull();
-    assertThat(regionMapping.getRegionName()).isEqualTo(regionName);
-    assertThat(regionMapping.getDataSourceName()).isEqualTo(dataSourceName);
-    assertThat(regionMapping.getTableName()).isEqualTo(tableName);
-    assertThat(regionMapping.getPdxName()).isEqualTo(pdxClass);
-  }
-
-  @Test
-  public void createsRegionMappingOnceOnly() {
-    JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
-    ResultModel result;
-    result =
-        createRegionMappingCommand.createMapping(regionName, dataSourceName, tableName, pdxClass);
-    assertThat(result.getStatus()).isSameAs(Result.Status.OK);
-
-    IgnoredException ignoredException =
-        IgnoredException.addIgnoredException(RegionMappingExistsException.class.getName());
-
-    try {
-      result =
-          createRegionMappingCommand.createMapping(regionName, dataSourceName, tableName, pdxClass);
-    } finally {
-      ignoredException.remove();
-    }
-
-    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
-  }
-
-  @Test
-  public void createsRegionMappingWithMinimumParams() {
-    ResultModel result =
-        createRegionMappingCommand.createMapping(regionName, dataSourceName, null, pdxClass);
-
-    assertThat(result.getStatus()).isSameAs(Result.Status.OK);
-
-    JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
-    RegionMapping regionMapping = service.getMappingForRegion(regionName);
-
-    assertThat(regionMapping).isNotNull();
-    assertThat(regionMapping.getRegionName()).isEqualTo(regionName);
-    assertThat(regionMapping.getDataSourceName()).isEqualTo(dataSourceName);
-    assertThat(regionMapping.getTableName()).isNull();
-    assertThat(regionMapping.getPdxName()).isEqualTo(pdxClass);
-  }
-}
diff --git a/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index c55b76e..49b44f7 100644
--- a/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -1,2 +1,3 @@
+org/apache/geode/connectors/jdbc/internal/cli/PreconditionException
 org/apache/geode/connectors/jdbc/internal/xml/ElementType
 org/apache/geode/connectors/jdbc/internal/xml/ElementType$1
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index 1de1062..deca66d 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.connectors.jdbc.internal.cli;
 
+
+
 import java.util.List;
 import java.util.Set;
 
@@ -22,7 +24,16 @@ import org.springframework.shell.core.annotation.CliOption;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
+import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.cache.configuration.RegionAttributesDataPolicy;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
+import org.apache.geode.connectors.jdbc.JdbcWriter;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+import org.apache.geode.distributed.ConfigurationPersistenceService;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.management.cli.CliMetaData;
 import org.apache.geode.management.cli.SingleGfshCommand;
@@ -48,6 +59,13 @@ public class CreateMappingCommand extends SingleGfshCommand {
       "Name of database table for values to be written to.";
   static final String CREATE_MAPPING__DATA_SOURCE_NAME = "data-source";
   static final String CREATE_MAPPING__DATA_SOURCE_NAME__HELP = "Name of JDBC data source to use.";
+  static final String CREATE_MAPPING__SYNCHRONOUS_NAME = "synchronous";
+  static final String CREATE_MAPPING__SYNCHRONOUS_NAME__HELP =
+      "By default, writes will be asynchronous. If true, writes will be synchronous.";
+
+  public static String createAsyncEventQueueName(String regionPath) {
+    return "JDBC#" + regionPath.replace('/', '_');
+  }
 
   @CliCommand(value = CREATE_MAPPING, help = CREATE_MAPPING__HELP)
   @CliMetaData(relatedTopic = CliStrings.DEFAULT_TOPIC_GEODE)
@@ -61,41 +79,192 @@ public class CreateMappingCommand extends SingleGfshCommand {
       @CliOption(key = CREATE_MAPPING__TABLE_NAME,
           help = CREATE_MAPPING__TABLE_NAME__HELP) String table,
       @CliOption(key = CREATE_MAPPING__PDX_NAME, mandatory = true,
-          help = CREATE_MAPPING__PDX_NAME__HELP) String pdxName) {
+          help = CREATE_MAPPING__PDX_NAME__HELP) String pdxName,
+      @CliOption(key = CREATE_MAPPING__SYNCHRONOUS_NAME,
+          help = CREATE_MAPPING__SYNCHRONOUS_NAME__HELP,
+          specifiedDefaultValue = "true", unspecifiedDefaultValue = "false") boolean synchronous) {
     // input
     Set<DistributedMember> targetMembers = getMembers(null, null);
-    RegionMapping mapping = new RegionMapping(regionName,
-        pdxName, table, dataSourceName);
+    RegionMapping mapping = new RegionMapping(regionName, pdxName, table, dataSourceName);
+
+    try {
+      ConfigurationPersistenceService configurationPersistenceService =
+          checkForClusterConfiguration();
+      CacheConfig cacheConfig = configurationPersistenceService.getCacheConfig(null);
+      RegionConfig regionConfig = checkForRegion(regionName, cacheConfig);
+      checkForExistingMapping(regionName, regionConfig);
+      checkForCacheLoader(regionName, regionConfig);
+      checkForCacheWriter(regionName, synchronous, regionConfig);
+      checkForAsyncQueue(regionName, synchronous, cacheConfig);
+    } catch (PreconditionException ex) {
+      return ResultModel.createError(ex.getMessage());
+    }
 
     // action
+    Object[] arguments = new Object[] {mapping, synchronous};
     List<CliFunctionResult> results =
-        executeAndGetFunctionResult(new CreateMappingFunction(), mapping, targetMembers);
+        executeAndGetFunctionResult(new CreateMappingFunction(), arguments, targetMembers);
 
     ResultModel result =
         ResultModel.createMemberStatusResult(results, EXPERIMENTAL, null, false, true);
-    result.setConfigObject(mapping);
+    result.setConfigObject(arguments);
     return result;
   }
 
+  private ConfigurationPersistenceService checkForClusterConfiguration()
+      throws PreconditionException {
+    ConfigurationPersistenceService result = getConfigurationPersistenceService();
+    if (result == null) {
+      throw new PreconditionException("Cluster Configuration must be enabled.");
+    }
+    return result;
+  }
+
+  private RegionConfig checkForRegion(String regionName, CacheConfig cacheConfig)
+      throws PreconditionException {
+    RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
+    if (regionConfig == null) {
+      throw new PreconditionException("A region named " + regionName + " must already exist.");
+    }
+    return regionConfig;
+  }
+
+  private void checkForExistingMapping(String regionName, RegionConfig regionConfig)
+      throws PreconditionException {
+    if (regionConfig.getCustomRegionElements().stream()
+        .anyMatch(element -> element instanceof RegionMapping)) {
+      throw new PreconditionException("A jdbc-mapping for " + regionName + " already exists.");
+    }
+  }
+
+  private void checkForCacheLoader(String regionName, RegionConfig regionConfig)
+      throws PreconditionException {
+    RegionAttributesType regionAttributes = regionConfig.getRegionAttributes().stream()
+        .filter(attributes -> attributes.getCacheLoader() != null).findFirst().orElse(null);
+    if (regionAttributes != null) {
+      DeclarableType loaderDeclarable = regionAttributes.getCacheLoader();
+      if (loaderDeclarable != null) {
+        throw new PreconditionException("The existing region " + regionName
+            + " must not already have a cache-loader, but it has "
+            + loaderDeclarable.getClassName());
+      }
+    }
+  }
+
+  private void checkForCacheWriter(String regionName, boolean synchronous,
+      RegionConfig regionConfig) throws PreconditionException {
+    if (synchronous) {
+      RegionAttributesType writerAttributes = regionConfig.getRegionAttributes().stream()
+          .filter(attributes -> attributes.getCacheWriter() != null).findFirst().orElse(null);
+      if (writerAttributes != null) {
+        DeclarableType writerDeclarable = writerAttributes.getCacheWriter();
+        if (writerDeclarable != null) {
+          throw new PreconditionException("The existing region " + regionName
+              + " must not already have a cache-writer, but it has "
+              + writerDeclarable.getClassName());
+        }
+      }
+    }
+  }
+
+  private void checkForAsyncQueue(String regionName, boolean synchronous, CacheConfig cacheConfig)
+      throws PreconditionException {
+    if (!synchronous) {
+      String queueName = createAsyncEventQueueName(regionName);
+      AsyncEventQueue asyncEventQueue = cacheConfig.getAsyncEventQueues().stream()
+          .filter(queue -> queue.getId().equals(queueName)).findFirst().orElse(null);
+      if (asyncEventQueue != null) {
+        throw new PreconditionException(
+            "An async-event-queue named " + queueName + " must not already exist.");
+      }
+    }
+  }
+
   @Override
   public void updateClusterConfig(String group, CacheConfig cacheConfig, Object element) {
-    RegionMapping newCacheElement = (RegionMapping) element;
-    RegionMapping existingCacheElement = cacheConfig.findCustomRegionElement(
-        newCacheElement.getRegionName(), newCacheElement.getId(), RegionMapping.class);
-
-    if (existingCacheElement != null) {
-      cacheConfig
-          .getRegions()
-          .stream()
-          .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
-          .forEach(
-              regionConfig -> regionConfig.getCustomRegionElements().remove(existingCacheElement));
+    Object[] arguments = (Object[]) element;
+    RegionMapping regionMapping = (RegionMapping) arguments[0];
+    boolean synchronous = (Boolean) arguments[1];
+    String regionName = regionMapping.getRegionName();
+    String queueName = createAsyncEventQueueName(regionName);
+    RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
+    if (regionConfig == null) {
+      return;
+    }
+    RegionAttributesType attributes = getRegionAttributes(regionConfig);
+    addMappingToRegion(regionMapping, regionConfig);
+    if (!synchronous) {
+      createAsyncQueue(cacheConfig, attributes, queueName);
+    }
+    alterRegion(queueName, attributes, synchronous);
+  }
+
+  private void alterRegion(String queueName, RegionAttributesType attributes, boolean synchronous) {
+    setCacheLoader(attributes);
+    if (synchronous) {
+      setCacheWriter(attributes);
+    } else {
+      addAsyncEventQueueId(queueName, attributes);
+    }
+  }
+
+  private void addMappingToRegion(RegionMapping newCacheElement, RegionConfig regionConfig) {
+    regionConfig.getCustomRegionElements().add(newCacheElement);
+  }
+
+  private RegionConfig findRegionConfig(CacheConfig cacheConfig, String regionName) {
+    return cacheConfig.getRegions().stream()
+        .filter(region -> region.getName().equals(regionName)).findFirst().orElse(null);
+  }
+
+  private void createAsyncQueue(CacheConfig cacheConfig, RegionAttributesType attributes,
+      String queueName) {
+    AsyncEventQueue asyncEventQueue = new AsyncEventQueue();
+    asyncEventQueue.setId(queueName);
+    boolean isPartitioned = attributes.getDataPolicy().equals(RegionAttributesDataPolicy.PARTITION)
+        || attributes.getDataPolicy().equals(RegionAttributesDataPolicy.PERSISTENT_PARTITION);
+    asyncEventQueue.setParallel(isPartitioned);
+    DeclarableType listener = new DeclarableType();
+    listener.setClassName(JdbcAsyncWriter.class.getName());
+    asyncEventQueue.setAsyncEventListener(listener);
+    cacheConfig.getAsyncEventQueues().add(asyncEventQueue);
+  }
+
+  private void addAsyncEventQueueId(String queueName, RegionAttributesType attributes) {
+    String asyncEventQueueList = attributes.getAsyncEventQueueIds();
+    if (asyncEventQueueList == null) {
+      asyncEventQueueList = "";
+    }
+    if (!asyncEventQueueList.contains(queueName)) {
+      if (asyncEventQueueList.length() > 0) {
+        asyncEventQueueList += ',';
+      }
+      asyncEventQueueList += queueName;
+      attributes.setAsyncEventQueueIds(asyncEventQueueList);
     }
+  }
 
-    cacheConfig
-        .getRegions()
-        .stream()
-        .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
-        .forEach(regionConfig -> regionConfig.getCustomRegionElements().add(newCacheElement));
+  private void setCacheLoader(RegionAttributesType attributes) {
+    DeclarableType loader = new DeclarableType();
+    loader.setClassName(JdbcLoader.class.getName());
+    attributes.setCacheLoader(loader);
+  }
+
+  private void setCacheWriter(RegionAttributesType attributes) {
+    DeclarableType writer = new DeclarableType();
+    writer.setClassName(JdbcWriter.class.getName());
+    attributes.setCacheWriter(writer);
+  }
+
+  private RegionAttributesType getRegionAttributes(RegionConfig regionConfig) {
+    RegionAttributesType attributes;
+    List<RegionAttributesType> attributesList = regionConfig.getRegionAttributes();
+    if (attributesList.isEmpty()) {
+      attributes = new RegionAttributesType();
+      attributesList.add(attributes);
+    } else {
+      attributes = attributesList.get(0);
+    }
+    return attributes;
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
index ba71dfb..3d2e2ed 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
@@ -16,30 +16,49 @@ package org.apache.geode.connectors.jdbc.internal.cli;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
+import org.apache.geode.connectors.jdbc.JdbcWriter;
 import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.management.cli.CliFunction;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
 
+/**
+ * The Object[] must always be of size two.
+ * The first element must be a RegionMapping.
+ * The second element must be a Boolean that is true if synchronous.
+ */
 @Experimental
-public class CreateMappingFunction extends CliFunction<RegionMapping> {
+public class CreateMappingFunction extends CliFunction<Object[]> {
 
   CreateMappingFunction() {
     super();
   }
 
   @Override
-  public CliFunctionResult executeFunction(FunctionContext<RegionMapping> context)
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context)
       throws Exception {
     JdbcConnectorService service = FunctionContextArgumentProvider.getJdbcConnectorService(context);
     // input
-    RegionMapping regionMapping = context.getArguments();
+    Object[] arguments = context.getArguments();
+    RegionMapping regionMapping = (RegionMapping) arguments[0];
+    boolean synchronous = (boolean) arguments[1];
 
-    verifyRegionExists(context, regionMapping);
+    String regionName = regionMapping.getRegionName();
+    Region<?, ?> region = verifyRegionExists(context.getCache(), regionName);
 
     // action
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+    if (!synchronous) {
+      createAsyncEventQueue(context.getCache(), queueName,
+          region.getAttributes().getDataPolicy().withPartitioning());
+    }
+    alterRegion(region, queueName, synchronous);
     createRegionMapping(service, regionMapping);
 
     // output
@@ -49,14 +68,38 @@ public class CreateMappingFunction extends CliFunction<RegionMapping> {
     return new CliFunctionResult(member, true, message);
   }
 
-  private void verifyRegionExists(FunctionContext<RegionMapping> context,
-      RegionMapping regionMapping) {
-    Cache cache = context.getCache();
-    String regionName = regionMapping.getRegionName();
-    if (cache.getRegion(regionName) == null) {
+  /**
+   * Change the existing region to have
+   * the JdbcLoader as its cache-loader
+   * and the given async-event-queue as one of its queues.
+   */
+  private void alterRegion(Region<?, ?> region, String queueName, boolean synchronous) {
+    region.getAttributesMutator().setCacheLoader(new JdbcLoader());
+    if (synchronous) {
+      region.getAttributesMutator().setCacheWriter(new JdbcWriter());
+    } else {
+      region.getAttributesMutator().addAsyncEventQueueId(queueName);
+    }
+  }
+
+  /**
+   * Create an async-event-queue with the given name.
+   * For a partitioned region a parallel queue is created.
+   * Otherwise a serial queue is created.
+   */
+  private void createAsyncEventQueue(Cache cache, String queueName, boolean isPartitioned) {
+    AsyncEventQueueFactory asyncEventQueueFactory = cache.createAsyncEventQueueFactory();
+    asyncEventQueueFactory.setParallel(isPartitioned);
+    asyncEventQueueFactory.create(queueName, new JdbcAsyncWriter());
+  }
+
+  private Region<?, ?> verifyRegionExists(Cache cache, String regionName) {
+    Region<?, ?> result = cache.getRegion(regionName);
+    if (result == null) {
       throw new IllegalStateException(
           "create jdbc-mapping requires that the region \"" + regionName + "\" exists.");
     }
+    return result;
   }
 
   /**
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/PreconditionException.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/PreconditionException.java
new file mode 100644
index 0000000..a40ae7e
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/PreconditionException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.cli;
+
+/**
+ * Used in gfsh commands that do precondition checks
+ * to indicate that a precondition check failed.
+ */
+@SuppressWarnings("serial")
+public class PreconditionException extends Exception {
+  public PreconditionException(String message) {
+    super(message);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
new file mode 100644
index 0000000..1513f91
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.cli;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
+import org.apache.geode.cache.configuration.CacheElement;
+import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.cache.configuration.RegionAttributesDataPolicy;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+import org.apache.geode.distributed.ConfigurationPersistenceService;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.cli.Result;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+
+public class CreateMappingCommandTest {
+
+  private InternalCache cache;
+  private CreateMappingCommand createRegionMappingCommand;
+
+  private String regionName;
+  private String dataSourceName;
+  private String tableName;
+  private String pdxClass;
+  private DistributionManager distributionManager;
+  private Set<InternalDistributedMember> members;
+  private List<CliFunctionResult> results;
+  private CliFunctionResult successFunctionResult;
+  private RegionMapping mapping;
+  private final Object[] arguments = new Object[2];
+  private CacheConfig cacheConfig;
+  RegionConfig matchingRegion;
+  RegionAttributesType matchingRegionAttributes;
+
+  @Before
+  public void setup() {
+    regionName = "regionName";
+    dataSourceName = "connection";
+    tableName = "testTable";
+    pdxClass = "myPdxClass";
+    cache = mock(InternalCache.class);
+    distributionManager = mock(DistributionManager.class);
+    when(cache.getDistributionManager()).thenReturn(distributionManager);
+    members = new HashSet<>();
+    members.add(mock(InternalDistributedMember.class));
+    when(distributionManager.getNormalDistributionManagerIds()).thenReturn(members);
+    createRegionMappingCommand = spy(CreateMappingCommand.class);
+    createRegionMappingCommand.setCache(cache);
+    results = new ArrayList<>();
+    successFunctionResult = mock(CliFunctionResult.class);
+    when(successFunctionResult.isSuccessful()).thenReturn(true);
+
+    doReturn(results).when(createRegionMappingCommand).executeAndGetFunctionResult(any(), any(),
+        any());
+
+    mapping = mock(RegionMapping.class);
+    when(mapping.getRegionName()).thenReturn(regionName);
+
+    cacheConfig = mock(CacheConfig.class);
+
+    matchingRegion = mock(RegionConfig.class);
+    when(matchingRegion.getName()).thenReturn(regionName);
+    List<RegionAttributesType> attributesList = new ArrayList<>();
+    matchingRegionAttributes = mock(RegionAttributesType.class);
+    when(matchingRegionAttributes.getDataPolicy()).thenReturn(RegionAttributesDataPolicy.REPLICATE);
+    attributesList.add(matchingRegionAttributes);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributesList);
+
+    arguments[0] = mapping;
+    arguments[1] = false;
+  }
+
+  private void setupRequiredPreconditions() {
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    List<RegionConfig> list = new ArrayList<>();
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+  }
+
+  @Test
+  public void createsMappingReturnsStatusOKWhenFunctionResultSuccess() {
+    setupRequiredPreconditions();
+    results.add(successFunctionResult);
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, false);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.OK);
+    Object[] results = (Object[]) result.getConfigObject();
+    RegionMapping regionMapping = (RegionMapping) results[0];
+    boolean synchronous = (boolean) results[1];
+    assertThat(regionMapping).isNotNull();
+    assertThat(regionMapping.getRegionName()).isEqualTo(regionName);
+    assertThat(regionMapping.getDataSourceName()).isEqualTo(dataSourceName);
+    assertThat(regionMapping.getTableName()).isEqualTo(tableName);
+    assertThat(regionMapping.getPdxName()).isEqualTo(pdxClass);
+    assertThat(synchronous).isFalse();
+  }
+
+  @Test
+  public void createsMappingReturnsStatusERRORWhenFunctionResultIsEmpty() {
+    setupRequiredPreconditions();
+    results.clear();
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, false);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+  }
+
+  @Test
+  public void createsMappingReturnsStatusERRORWhenClusterConfigIsDisabled() {
+    results.add(successFunctionResult);
+    doReturn(null).when(createRegionMappingCommand).getConfigurationPersistenceService();
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, false);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+    assertThat(result.toString()).contains("Cluster Configuration must be enabled.");
+  }
+
+  @Test
+  public void createsMappingReturnsStatusERRORWhenClusterConfigDoesNotContainRegion() {
+    results.add(successFunctionResult);
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, false);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+    assertThat(result.toString())
+        .contains("A region named " + regionName + " must already exist.");
+  }
+
+  @Test
+  public void createsMappingReturnsStatusERRORWhenRegionMappingExists() {
+    results.add(successFunctionResult);
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    List<RegionConfig> list = new ArrayList<>();
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<RegionAttributesType> attributes = new ArrayList<>();
+    RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+    DeclarableType loaderDeclarable = mock(DeclarableType.class);
+    when(loaderDeclarable.getClassName()).thenReturn(null);
+    when(loaderAttribute.getCacheLoader()).thenReturn(loaderDeclarable);
+    attributes.add(loaderAttribute);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+    List<CacheElement> customList = new ArrayList<>();
+    RegionMapping existingMapping = mock(RegionMapping.class);
+    customList.add(existingMapping);
+    when(matchingRegion.getCustomRegionElements()).thenReturn(customList);
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, false);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+    assertThat(result.toString()).contains("A jdbc-mapping for " + regionName + " already exists.");
+  }
+
+  @Test
+  public void createsMappingReturnsStatusERRORWhenClusterConfigRegionHasLoader() {
+    results.add(successFunctionResult);
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    List<RegionConfig> list = new ArrayList<>();
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<RegionAttributesType> attributes = new ArrayList<>();
+    RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+    DeclarableType loaderDeclarable = mock(DeclarableType.class);
+    when(loaderDeclarable.getClassName()).thenReturn("MyCacheLoaderClass");
+    when(loaderAttribute.getCacheLoader()).thenReturn(loaderDeclarable);
+    attributes.add(loaderAttribute);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, false);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+    assertThat(result.toString()).contains("The existing region " + regionName
+        + " must not already have a cache-loader, but it has MyCacheLoaderClass");
+  }
+
+  @Test
+  public void createMappingWithSynchronousReturnsStatusERRORWhenClusterConfigRegionHasWriter() {
+    results.add(successFunctionResult);
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    List<RegionConfig> list = new ArrayList<>();
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<RegionAttributesType> attributes = new ArrayList<>();
+    RegionAttributesType writerAttribute = mock(RegionAttributesType.class);
+    DeclarableType writerDeclarable = mock(DeclarableType.class);
+    when(writerDeclarable.getClassName()).thenReturn("MyCacheWriterClass");
+    when(writerAttribute.getCacheWriter()).thenReturn(writerDeclarable);
+    attributes.add(writerAttribute);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, true);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+    assertThat(result.toString()).contains("The existing region " + regionName
+        + " must not already have a cache-writer, but it has MyCacheWriterClass");
+  }
+
+  @Test
+  public void createMappingWithSynchronousReturnsStatusOKWhenAsycnEventQueueAlreadyExists() {
+    results.add(successFunctionResult);
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    List<RegionConfig> list = new ArrayList<>();
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<RegionAttributesType> attributes = new ArrayList<>();
+    RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+    when(loaderAttribute.getCacheLoader()).thenReturn(null);
+    attributes.add(loaderAttribute);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+    List<AsyncEventQueue> asyncEventQueues = new ArrayList<>();
+    AsyncEventQueue matchingQueue = mock(AsyncEventQueue.class);
+    String queueName = createRegionMappingCommand.createAsyncEventQueueName(regionName);
+    when(matchingQueue.getId()).thenReturn(queueName);
+    asyncEventQueues.add(matchingQueue);
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(asyncEventQueues);
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, true);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.OK);
+  }
+
+
+  @Test
+  public void createsMappingReturnsStatusERRORWhenAsycnEventQueueAlreadyExists() {
+    results.add(successFunctionResult);
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    List<RegionConfig> list = new ArrayList<>();
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<RegionAttributesType> attributes = new ArrayList<>();
+    RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+    when(loaderAttribute.getCacheLoader()).thenReturn(null);
+    attributes.add(loaderAttribute);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+    List<AsyncEventQueue> asyncEventQueues = new ArrayList<>();
+    AsyncEventQueue matchingQueue = mock(AsyncEventQueue.class);
+    String queueName = createRegionMappingCommand.createAsyncEventQueueName(regionName);
+    when(matchingQueue.getId()).thenReturn(queueName);
+    asyncEventQueues.add(matchingQueue);
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(asyncEventQueues);
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass, false);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+    assertThat(result.toString())
+        .contains("An async-event-queue named " + queueName + " must not already exist.");
+  }
+
+  @Test
+  public void updateClusterConfigWithNoRegionsDoesNotThrowException() {
+    when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAddsMappingToRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    assertThat(listCacheElements.size()).isEqualTo(1);
+    assertThat(listCacheElements).contains(mapping);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneNonMatchingRegionDoesNotAddMapping() {
+    List<RegionConfig> list = new ArrayList<>();
+    RegionConfig nonMatchingRegion = mock(RegionConfig.class);
+    when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(nonMatchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(nonMatchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    assertThat(listCacheElements).isEmpty();
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionCreatesAsyncEventQueue() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    assertThat(queueList.size()).isEqualTo(1);
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+    AsyncEventQueue createdQueue = queueList.get(0);
+    assertThat(createdQueue.getId()).isEqualTo(queueName);
+    assertThat(createdQueue.isParallel()).isFalse();
+    assertThat(createdQueue.getAsyncEventListener().getClassName())
+        .isEqualTo(JdbcAsyncWriter.class.getName());
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingPartitionedRegionCreatesParallelAsyncEventQueue() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getDataPolicy())
+        .thenReturn(RegionAttributesDataPolicy.PARTITION);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    assertThat(queueList.get(0).isParallel()).isTrue();
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionCallsSetCacheLoader() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    verify(matchingRegionAttributes).setCacheLoader(any());
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndNullQueuesAddsAsyncEventQueueIdToRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(null);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+    verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+    assertThat(argument.getValue()).isEqualTo(queueName);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndEmptyQueuesAddsAsyncEventQueueIdToRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("");
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+    verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+    assertThat(argument.getValue()).isEqualTo(queueName);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndExistingQueuesAddsAsyncEventQueueIdToRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("q1,q2");
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+    verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+    assertThat(argument.getValue()).isEqualTo("q1,q2," + queueName);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndQueuesContainingDuplicateDoesNotModifyAsyncEventQueueIdOnRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+    String existingQueues = "q1," + queueName + ",q2";
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(existingQueues);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    verify(matchingRegionAttributes, never()).setAsyncEventQueueIds(any());
+  }
+
+  @Test
+  public void updateClusterConfigWithSynchronousSetsTheCacheWriterOnTheMatchingRegion() {
+    arguments[1] = true;
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    verify(matchingRegionAttributes).setCacheWriter(any());
+  }
+
+  @Test
+  public void updateClusterConfigWithSynchronousAndOneMatchingRegionAndExistingQueuesDoesNotAddsAsyncEventQueueIdToRegion() {
+    arguments[1] = true;
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("q1,q2");
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    verify(matchingRegionAttributes, never()).setAsyncEventQueueIds(any());
+  }
+
+  @Test
+  public void updateClusterConfigWithSynchronousAndOneMatchingRegionDoesNotCreateAsyncEventQueue() {
+    arguments[1] = true;
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+    assertThat(queueList).isEmpty();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
index acba1cd..955b57f 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
@@ -18,9 +18,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -32,7 +34,11 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
@@ -48,11 +54,15 @@ public class CreateMappingFunctionTest {
   private static final String REGION_NAME = "testRegion";
 
   private RegionMapping regionMapping;
-  private FunctionContext<RegionMapping> context;
+  private FunctionContext<Object[]> context;
   private DistributedMember distributedMember;
   private ResultSender<Object> resultSender;
   private JdbcConnectorService service;
   private InternalCache cache;
+  private Region region;
+  private RegionAttributes attributes;
+  private AsyncEventQueueFactory asyncEventQueueFactory;
+  private final Object[] functionInputs = new Object[2];
 
   private CreateMappingFunction function;
 
@@ -61,7 +71,7 @@ public class CreateMappingFunctionTest {
     context = mock(FunctionContext.class);
     resultSender = mock(ResultSender.class);
     cache = mock(InternalCache.class);
-    Region region = mock(Region.class);
+    region = mock(Region.class);
     DistributedSystem system = mock(DistributedSystem.class);
     distributedMember = mock(DistributedMember.class);
     service = mock(JdbcConnectorService.class);
@@ -73,12 +83,29 @@ public class CreateMappingFunctionTest {
     when(cache.getDistributedSystem()).thenReturn(system);
     when(cache.getRegion(REGION_NAME)).thenReturn(region);
     when(system.getDistributedMember()).thenReturn(distributedMember);
-    when(context.getArguments()).thenReturn(regionMapping);
+    functionInputs[0] = regionMapping;
+    setupAsynchronous();
+    when(context.getArguments()).thenReturn(functionInputs);
     when(cache.getService(eq(JdbcConnectorService.class))).thenReturn(service);
+    attributes = mock(RegionAttributes.class);
+    when(attributes.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+    when(region.getAttributes()).thenReturn(attributes);
+    when(region.getAttributesMutator()).thenReturn(mock(AttributesMutator.class));
+    asyncEventQueueFactory = mock(AsyncEventQueueFactory.class);
+    when(cache.createAsyncEventQueueFactory()).thenReturn(asyncEventQueueFactory);
 
     function = new CreateMappingFunction();
   }
 
+  private void setupAsynchronous() {
+    functionInputs[1] = false;
+  }
+
+  private void setupSynchronous() {
+    functionInputs[1] = true;
+  }
+
+
   @Test
   public void isHAReturnsFalse() {
     assertThat(function.isHA()).isFalse();
@@ -130,9 +157,72 @@ public class CreateMappingFunctionTest {
 
   @Test
   public void executeCreatesMapping() throws Exception {
-    function.execute(context);
+    function.executeFunction(context);
+
+    verify(service, times(1)).createRegionMapping(regionMapping);
+  }
+
+  @Test
+  public void executeAlterRegionLoader() throws Exception {
+    function.executeFunction(context);
+
+    verify(service, times(1)).createRegionMapping(regionMapping);
+    AttributesMutator mutator = region.getAttributesMutator();
+    verify(mutator, times(1)).setCacheLoader(any());
+  }
+
+  @Test
+  public void executeWithSynchronousAltersRegionWriter() throws Exception {
+    setupSynchronous();
+    function.executeFunction(context);
+
+    AttributesMutator mutator = region.getAttributesMutator();
+    verify(mutator, times(1)).setCacheWriter(any());
+  }
+
+  @Test
+  public void executeWithSynchronousNeverAltersRegionAsyncEventQueue() throws Exception {
+    setupSynchronous();
+    function.executeFunction(context);
+
+    AttributesMutator mutator = region.getAttributesMutator();
+    verify(mutator, never()).addAsyncEventQueueId(any());
+  }
+
+  @Test
+  public void executeWithSynchronousNeverCreatesAsyncQueue() throws Exception {
+    setupSynchronous();
+    function.executeFunction(context);
+
+    verify(asyncEventQueueFactory, never()).create(any(), any());
+  }
+
+  @Test
+  public void executeAlterRegionAsyncEventQueue() throws Exception {
+    String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+    function.executeFunction(context);
 
     verify(service, times(1)).createRegionMapping(regionMapping);
+    AttributesMutator mutator = region.getAttributesMutator();
+    verify(mutator, times(1)).addAsyncEventQueueId(queueName);
+  }
+
+  @Test
+  public void executeCreatesSerialAsyncQueueForNonPartitionedRegion() throws Exception {
+    when(attributes.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+    function.executeFunction(context);
+
+    verify(asyncEventQueueFactory, times(1)).create(any(), any());
+    verify(asyncEventQueueFactory, times(1)).setParallel(false);
+  }
+
+  @Test
+  public void executeCreatesParallelAsyncQueueForPartitionedRegion() throws Exception {
+    when(attributes.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+    function.executeFunction(context);
+
+    verify(asyncEventQueueFactory, times(1)).create(any(), any());
+    verify(asyncEventQueueFactory, times(1)).setParallel(true);
   }
 
   @Test