You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/04/17 19:32:51 UTC

[geode] branch develop updated: GEODE-6640: add integration tests to do put, invalidate, then get from async JDBC mapping (#3468)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 96bd60e  GEODE-6640: add integration tests to do put, invalidate, then get from async JDBC mapping (#3468)
96bd60e is described below

commit 96bd60eef2e9275fddb1f10cdb206ade9276bca9
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Wed Apr 17 12:32:39 2019 -0700

    GEODE-6640: add integration tests to do put, invalidate, then get from async JDBC mapping (#3468)
    
    * GEODE-6640: add dunit test case: put into jdbc-mapping on accessor then do get
    
        Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
        Co-authored-by: Benjamin Ross <br...@pivotal.io>
---
 .../geode/connectors/jdbc/JdbcDistributedTest.java | 241 ++++++++++++++++-----
 1 file changed, 189 insertions(+), 52 deletions(-)

diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
index 40772bd..1b0e5c4 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
@@ -28,6 +28,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
@@ -37,6 +38,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
@@ -80,7 +82,7 @@ public abstract class JdbcDistributedTest implements Serializable {
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
 
-  private MemberVM server;
+  private MemberVM dataserver;
   private MemberVM locator;
   private String connectionUrl;
 
@@ -96,13 +98,27 @@ public abstract class JdbcDistributedTest implements Serializable {
   public abstract String getConnectionUrl() throws IOException, InterruptedException;
 
   private void createTable() throws SQLException {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
     Connection connection = getConnection();
     Statement statement = connection.createStatement();
     statement.execute("Create Table " + TABLE_NAME
         + " (id varchar(10) primary key not null, name varchar(10), age int not null)");
   }
 
+  private MemberVM createTableForGroup(int idx, String groupName) throws SQLException {
+    MemberVM server = startupRule.startServerVM(idx, groupName, locator.getPort());
+    Connection connection = getConnection();
+    Statement statement = connection.createStatement();
+    statement.execute("Create Table " + TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int not null)");
+    return server;
+  }
+
+  private MemberVM addServerForGroup(int idx, String groupName) throws SQLException {
+    MemberVM server = startupRule.startServerVM(idx, groupName, locator.getPort());
+    return server;
+  }
+
   private void alterTable() throws SQLException {
     Connection connection = getConnection();
     Statement statement = connection.createStatement();
@@ -111,7 +127,7 @@ public abstract class JdbcDistributedTest implements Serializable {
   }
 
   private void createTableForAllSupportedFields() throws SQLException {
-    server = startupRule.startServerVM(1,
+    dataserver = startupRule.startServerVM(1,
         x -> x.withConnectionToLocator(locator.getPort()).withPDXReadSerialized());
     Connection connection = getConnection();
     DatabaseMetaData metaData = connection.getMetaData();
@@ -199,7 +215,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
     createJdbcDataSource();
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("name", "Emp1").writeInt("age", 55).create();
@@ -221,7 +237,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
     createJdbcDataSource();
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("name", "Emp1").writeInt("age", 55).create();
@@ -242,11 +258,11 @@ public abstract class JdbcDistributedTest implements Serializable {
     IgnoredException.addIgnoredException(
         "Error detected when comparing mapping for region \"employees\" with table definition:");
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
     alterTable();
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -266,10 +282,10 @@ public abstract class JdbcDistributedTest implements Serializable {
     IgnoredException.addIgnoredException(
         "Error detected when comparing mapping for region \"employees\" with table definition:");
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -282,7 +298,7 @@ public abstract class JdbcDistributedTest implements Serializable {
       region.invalidate(key);
     });
     alterTable();
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       String key = "id1";
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       assertThatThrownBy(() -> region.get(key))
@@ -300,20 +316,102 @@ public abstract class JdbcDistributedTest implements Serializable {
     IgnoredException.addIgnoredException(
         "Jdbc mapping for \"employees\" does not match table definition, check logs for more details.");
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
     alterTable();
     assertThatThrownBy(
-        () -> startupRule.startServerVM(2, x -> x.withConnectionToLocator(locator.getPort())))
+        () -> startupRule.startServerVM(3, x -> x.withConnectionToLocator(locator.getPort())))
             .hasCauseExactlyInstanceOf(JdbcConnectorException.class).hasStackTraceContaining(
                 "Jdbc mapping for \"employees\" does not match table definition, check logs for more details.");
   }
 
+  private void validateBothServersAndAccessors(MemberVM server1, MemberVM server2,
+      MemberVM accessor1, MemberVM accessor2) {
+    for (MemberVM server : Arrays.asList(server1, server2, accessor1, accessor2)) {
+      server.invoke(() -> {
+        PdxInstance pdxEmployee1 =
+            ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+                .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+        String pdxkey1 = "pdxkey1";
+        Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+        region.put(pdxkey1, pdxEmployee1);
+        Employee employee1 = new Employee("key1", "name1", 30);
+        region.put("key1", employee1);
+        region.invalidate(pdxkey1);
+        region.invalidate("key1");
+        await().untilAsserted(() -> {
+          assertThat(region.get(pdxkey1)).isNotNull();
+          assertThat(region.get("key1")).isNotNull();
+        });
+        Employee pdxEmployee2 = (Employee) region.get(pdxkey1);
+        Employee employee2 = (Employee) region.get("key1");
+        assertThat(pdxEmployee2.getName()).isEqualTo("Emp1");
+        assertThat(employee2.getName()).isEqualTo("name1");
+      });
+    }
+
+    for (MemberVM server : Arrays.asList(server1, server2)) {
+      server.invoke(() -> {
+        String queueName = MappingCommandUtils.createAsyncEventQueueName(REGION_NAME);
+        AsyncEventQueue queue = ClusterStartupRule.getCache().getAsyncEventQueue(queueName);
+        assertThat(queue).isNotNull();
+        await().untilAsserted(() -> {
+          assertThat(queue.size()).isEqualTo(0);
+        });
+      });
+    }
+  }
+
+  @Test
+  public void startAccessorForPRThenPutAndGet() throws Exception {
+    MemberVM server1 = createTableForGroup(4, "datagroup");
+    MemberVM server2 = addServerForGroup(5, "datagroup");
+    MemberVM accessor1 = addServerForGroup(6, "accessorgroup");
+    MemberVM accessor2 = addServerForGroup(7, "accessorgroup");
+
+    createJdbcDataSource();
+    createPartitionedRegionUsingGfshForGroup(false, "datagroup");
+    createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(),
+        "datagroup");
+    createPartitionedRegionUsingGfshForGroup(true, "accessorgroup");
+    createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(),
+        "accessorgroup");
+
+    validateBothServersAndAccessors(server1, server2, accessor1, accessor2);
+
+    for (int i = 4; i <= 7; i++) {
+      startupRule.stop(i);
+    }
+  }
+
+  @Test
+  public void startAccessorForRRThenPutAndGet() throws Exception {
+    MemberVM server1 = createTableForGroup(4, "datagroup");
+    MemberVM server2 = addServerForGroup(5, "datagroup");
+    MemberVM accessor1 = addServerForGroup(6, "accessorgroup");
+    MemberVM accessor2 = addServerForGroup(7, "accessorgroup");
+
+    createJdbcDataSource();
+    createReplicatedRegionUsingGfshForGroup(false, "datagroup");
+    createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(),
+        "datagroup");
+    createReplicatedRegionUsingGfshForGroup(true, "accessorgroup");
+    createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(),
+        "accessorgroup");
+
+    validateBothServersAndAccessors(server1, server2, accessor1, accessor2);
+
+    for (int i = 4; i <= 7; i++) {
+      startupRule.stop(i);
+    }
+  }
+
   @Test
   public void throwsExceptionWhenNoDataSourceExists() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     IgnoredException.addIgnoredException(JdbcConnectorException.class);
     final String commandStr = "create jdbc-mapping --region=" + REGION_NAME
         + " --data-source=" + DATA_SOURCE_NAME
@@ -328,7 +426,7 @@ public abstract class JdbcDistributedTest implements Serializable {
   public void serverStartupSucceedsForPartitionedRegionAfterMappingIsCreated()
       throws Exception {
     createTable();
-    createPartitionRegionUsingGfsh();
+    createPartitionedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
     MemberVM server3 =
@@ -359,21 +457,21 @@ public abstract class JdbcDistributedTest implements Serializable {
 
   @Test
   public void verifyDateToDate() throws Exception {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
-    server.invoke(() -> {
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    dataserver.invoke(() -> {
       Connection connection = DriverManager.getConnection(connectionUrl);
       Statement statement = connection.createStatement();
       statement.execute(
           "Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
               + TestDate.DATE_FIELD_NAME + " date not null)");
     });
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
     final String key = "emp1";
     final java.sql.Date sqlDate = java.sql.Date.valueOf("1982-09-11");
     final Date jdkDate = new Date(sqlDate.getTime());
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance testDateInput =
           ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
               .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -394,21 +492,21 @@ public abstract class JdbcDistributedTest implements Serializable {
 
   @Test
   public void verifyDateToTime() throws Exception {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
-    server.invoke(() -> {
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    dataserver.invoke(() -> {
       Connection connection = DriverManager.getConnection(connectionUrl);
       Statement statement = connection.createStatement();
       statement.execute(
           "Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
               + TestDate.DATE_FIELD_NAME + " time not null)");
     });
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
     final String key = "emp1";
     final java.sql.Time sqlTime = java.sql.Time.valueOf("23:59:59");
     final Date jdkDate = new Date(sqlTime.getTime());
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance testDateInput =
           ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
               .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -429,16 +527,16 @@ public abstract class JdbcDistributedTest implements Serializable {
 
   @Test
   public void verifyDateToTimestamp() throws Exception {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
-    createTableWithTimeStamp(server, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    createTableWithTimeStamp(dataserver, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
 
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
     final String key = "emp1";
     final java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1982-09-11 23:59:59.123");
     final Date jdkDate = new Date(sqlTimestamp.getTime());
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance testDateInput =
           ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
               .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -470,10 +568,10 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void putWritesToDB() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -487,44 +585,48 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void putAsyncWritesToDB() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
 
       String key = "emp1";
       ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1);
-      assertTableHasEmployeeData(1, pdxEmployee1, key);
+      await().untilAsserted(() -> {
+        assertTableHasEmployeeData(1, pdxEmployee1, key);
+      });
     });
   }
 
   @Test
   public void putAsyncWithPartitionWritesToDB() throws Exception {
     createTable();
-    createPartitionRegionUsingGfsh();
+    createPartitionedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
 
       String key = "emp1";
       ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1);
-      assertTableHasEmployeeData(1, pdxEmployee1, key);
+      await().untilAsserted(() -> {
+        assertTableHasEmployeeData(1, pdxEmployee1, key);
+      });
     });
   }
 
   @Test
   public void getReadsFromEmptyDB() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       String key = "emp1";
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       Object value = region.get(key);
@@ -536,10 +638,10 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromDB() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -564,10 +666,10 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromDBWithCompositeKey() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true, "id,age");
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -594,10 +696,10 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromDBWithAsyncWriter() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -626,10 +728,10 @@ public abstract class JdbcDistributedTest implements Serializable {
   @Test
   public void getReadsFromDBWithPdxClassName() throws Exception {
     createTable();
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       String key = "id1";
       Employee value = new Employee(key, "Emp1", 55);
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
@@ -648,7 +750,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     ClientVM client = getClientVM();
     createClientRegion(client);
 
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     client.invoke(() -> {
@@ -672,7 +774,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     ClientVM client = getClientVM();
     createClientRegion(client);
 
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     client.invoke(() -> {
@@ -693,7 +795,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createTableForAllSupportedFields();
     ClientVM client = getClientVM();
     createClientRegion(client);
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     String key = "id1";
@@ -701,7 +803,7 @@ public abstract class JdbcDistributedTest implements Serializable {
         new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2,
             3, 4, 5.5f, 6.0, "BigEmp", new Date(0), "BigEmpObject", new byte[] {1, 2}, 'c');
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       insertDataForAllSupportedFieldsTable(key, value);
     });
 
@@ -722,13 +824,13 @@ public abstract class JdbcDistributedTest implements Serializable {
     createTableForAllSupportedFields();
     ClientVM client = getClientVM();
     createClientRegion(client);
-    createRegionUsingGfsh();
+    createReplicatedRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
     String key = "id1";
     ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       insertNullDataForAllSupportedFieldsTable(key);
     });
 
@@ -773,13 +875,48 @@ public abstract class JdbcDistributedTest implements Serializable {
     gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
   }
 
-  private void createRegionUsingGfsh() {
+  private void createReplicatedRegionUsingGfsh() {
     StringBuffer createRegionCmd = new StringBuffer();
     createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE");
     gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
   }
 
-  private void createPartitionRegionUsingGfsh() {
+  private void createReplicatedRegionUsingGfshForGroup(boolean isAccessor, String groupName) {
+    StringBuffer createRegionCmd = new StringBuffer();
+    createRegionCmd.append("create region --name=" + REGION_NAME + " --groups=" + groupName
+        + " --if-not-exists=true"
+        + (isAccessor
+            ? " --type=" + RegionShortcut.REPLICATE_PROXY.name()
+            : " --type=" + RegionShortcut.REPLICATE.name()));
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+  }
+
+  private void createPartitionedRegionUsingGfshForGroup(boolean isAccessor, String groupName) {
+    StringBuffer createRegionCmd = new StringBuffer();
+    createRegionCmd
+        .append("create region --name=" + REGION_NAME + " --groups=" + groupName
+            + " --if-not-exists=true"
+            + (isAccessor
+                ? " --type=" + RegionShortcut.PARTITION_PROXY.name()
+                : " --type=" + RegionShortcut.PARTITION.name())
+            + " --redundant-copies=1");
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+  }
+
+  private void createAsyncMappingForGroup(String regionName, String connectionName,
+      String pdxClassName,
+      String groupName) {
+    final String commandStr = "create jdbc-mapping --region=" + regionName
+        + " --data-source=" + connectionName
+        + " --table=" + TABLE_NAME
+        + " --synchronous=false"
+        + " --if-not-exists=true"
+        + " --pdx-name=" + pdxClassName
+        + " --groups=" + groupName;
+    gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+  }
+
+  private void createPartitionedRegionUsingGfsh() {
     StringBuffer createRegionCmd = new StringBuffer();
     createRegionCmd
         .append("create region --name=" + REGION_NAME + " --type=PARTITION --redundant-copies=1");