You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/04/15 21:59:26 UTC

[geode] 01/01: GEODE-6640: add dunit test case: put into jdbc-mapping on accessor then do get

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-6640
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8f5f0f1eba4663ec02b653ef446f77ab3b1b79f9
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Apr 15 14:57:57 2019 -0700

    GEODE-6640: add dunit test case: put into jdbc-mapping on accessor then do get
    
        Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
        Co-authored-by: Benjamin Ross <br...@pivotal.io>
---
 .../geode/connectors/jdbc/JdbcDistributedTest.java | 203 ++++++++++++++++++---
 1 file changed, 175 insertions(+), 28 deletions(-)

diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
index 40772bd..2253d80 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
@@ -37,6 +37,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
@@ -80,7 +81,8 @@ public abstract class JdbcDistributedTest implements Serializable {
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
 
-  private MemberVM server;
+  private MemberVM dataserver;
+  private MemberVM accessorserver;
   private MemberVM locator;
   private String connectionUrl;
 
@@ -96,13 +98,25 @@ public abstract class JdbcDistributedTest implements Serializable {
   public abstract String getConnectionUrl() throws IOException, InterruptedException;
 
   private void createTable() throws SQLException {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
     Connection connection = getConnection();
     Statement statement = connection.createStatement();
     statement.execute("Create Table " + TABLE_NAME
         + " (id varchar(10) primary key not null, name varchar(10), age int not null)");
   }
 
+  private void createTableForGroup(int idx, String groupName) throws SQLException {
+    dataserver = startupRule.startServerVM(idx, groupName, locator.getPort());
+    Connection connection = getConnection();
+    Statement statement = connection.createStatement();
+    statement.execute("Create Table " + TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int not null)");
+  }
+
+  private void addServerForAccessor(int idx, String groupName) throws SQLException {
+    accessorserver = startupRule.startServerVM(idx, groupName, locator.getPort());
+  }
+
   private void alterTable() throws SQLException {
     Connection connection = getConnection();
     Statement statement = connection.createStatement();
@@ -111,7 +125,7 @@ public abstract class JdbcDistributedTest implements Serializable {
   }
 
   private void createTableForAllSupportedFields() throws SQLException {
-    server = startupRule.startServerVM(1,
+    dataserver = startupRule.startServerVM(1,
         x -> x.withConnectionToLocator(locator.getPort()).withPDXReadSerialized());
     Connection connection = getConnection();
     DatabaseMetaData metaData = connection.getMetaData();
@@ -199,7 +213,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
     createJdbcDataSource();
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("name", "Emp1").writeInt("age", 55).create();
@@ -221,7 +235,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
     createJdbcDataSource();
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("name", "Emp1").writeInt("age", 55).create();
@@ -246,7 +260,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
     alterTable();
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -269,7 +283,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -282,7 +296,7 @@ public abstract class JdbcDistributedTest implements Serializable {
       region.invalidate(key);
     });
     alterTable();
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       String key = "id1";
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       assertThatThrownBy(() -> region.get(key))
@@ -305,12 +319,106 @@ public abstract class JdbcDistributedTest implements Serializable {
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
     alterTable();
     assertThatThrownBy(
-        () -> startupRule.startServerVM(2, x -> x.withConnectionToLocator(locator.getPort())))
+        () -> startupRule.startServerVM(3, x -> x.withConnectionToLocator(locator.getPort())))
             .hasCauseExactlyInstanceOf(JdbcConnectorException.class).hasStackTraceContaining(
                 "Jdbc mapping for \"employees\" does not match table definition, check logs for more details.");
   }
 
   @Test
+  public void startAccessorForPRThenPutAndGet() throws Exception {
+    createTableForGroup(1, "datagroup");
+    addServerForAccessor(3, "accessorgroup");
+    createPartitionRegionUsingGfshForGroup(false, "datagroup");
+    createPartitionRegionUsingGfshForGroup(true, "accessorgroup");
+    createJdbcDataSource();
+    createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+        "datagroup", null);
+    createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+        "accessorgroup", null);
+
+    dataserver.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+      String pdxkey1 = "pdxkey1";
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      region.put(pdxkey1, pdxEmployee1);
+      region.put("key1", new Employee("key1", "name1", 30));
+      region.invalidate(pdxkey1);
+      region.invalidate("key1");
+      await().untilAsserted(() -> {
+        assertThat(region.get(pdxkey1)).isNotNull();
+        assertThat(region.get("key1")).isNotNull();
+      });
+    });
+
+    accessorserver.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id2").writeString("name", "Emp2").writeInt("age", 56).create();
+
+      String pdxkey2 = "pdxkey2";
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      region.put(pdxkey2, pdxEmployee1);
+      region.put("key2", new Employee("key2", "name2", 30));
+      region.invalidate(pdxkey2);
+      region.invalidate("key2");
+      await().untilAsserted(() -> {
+        assertThat(region.get(pdxkey2)).isNotNull();
+        assertThat(region.get("key2")).isNotNull();
+      });
+    });
+  }
+
+  @Test
+  public void startAccessorForRRThenPutAndGet() throws Exception {
+    createTableForGroup(1, "datagroup");
+    addServerForAccessor(3, "accessorgroup");
+    createRegionUsingGfshForGroup(false, "datagroup");
+    createRegionUsingGfshForGroup(true, "accessorgroup");
+    createJdbcDataSource();
+    createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+        "datagroup", null);
+    createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+        "accessorgroup", null);
+
+    dataserver.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+      String pdxkey1 = "pdxkey1";
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      region.put(pdxkey1, pdxEmployee1);
+      region.put("key1", new Employee("key1", "name1", 30));
+      region.invalidate(pdxkey1);
+      region.invalidate("key1");
+      await().untilAsserted(() -> {
+        assertThat(region.get(pdxkey1)).isNotNull();
+        assertThat(region.get("key1")).isNotNull();
+      });
+    });
+
+    accessorserver.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id2").writeString("name", "Emp2").writeInt("age", 56).create();
+
+      String pdxkey2 = "pdxkey2";
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      region.put(pdxkey2, pdxEmployee1);
+      region.put("key2", new Employee("key2", "name2", 30));
+      region.invalidate(pdxkey2);
+      region.invalidate("key2");
+      await().untilAsserted(() -> {
+        assertThat(region.get(pdxkey2)).isNotNull();
+        assertThat(region.get("key2")).isNotNull();
+      });
+    });
+  }
+
+  @Test
   public void throwsExceptionWhenNoDataSourceExists() throws Exception {
     createTable();
     createRegionUsingGfsh();
@@ -359,8 +467,8 @@ public abstract class JdbcDistributedTest implements Serializable {
 
   @Test
   public void verifyDateToDate() throws Exception {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
-    server.invoke(() -> {
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    dataserver.invoke(() -> {
       Connection connection = DriverManager.getConnection(connectionUrl);
       Statement statement = connection.createStatement();
       statement.execute(
@@ -373,7 +481,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     final String key = "emp1";
     final java.sql.Date sqlDate = java.sql.Date.valueOf("1982-09-11");
     final Date jdkDate = new Date(sqlDate.getTime());
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance testDateInput =
           ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
               .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -394,8 +502,8 @@ public abstract class JdbcDistributedTest implements Serializable {
 
   @Test
   public void verifyDateToTime() throws Exception {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
-    server.invoke(() -> {
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    dataserver.invoke(() -> {
       Connection connection = DriverManager.getConnection(connectionUrl);
       Statement statement = connection.createStatement();
       statement.execute(
@@ -408,7 +516,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     final String key = "emp1";
     final java.sql.Time sqlTime = java.sql.Time.valueOf("23:59:59");
     final Date jdkDate = new Date(sqlTime.getTime());
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance testDateInput =
           ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
               .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -429,8 +537,8 @@ public abstract class JdbcDistributedTest implements Serializable {
 
   @Test
   public void verifyDateToTimestamp() throws Exception {
-    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
-    createTableWithTimeStamp(server, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
+    dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    createTableWithTimeStamp(dataserver, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
 
     createRegionUsingGfsh();
     createJdbcDataSource();
@@ -438,7 +546,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     final String key = "emp1";
     final java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1982-09-11 23:59:59.123");
     final Date jdkDate = new Date(sqlTimestamp.getTime());
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance testDateInput =
           ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
               .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -473,7 +581,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -490,7 +598,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -507,7 +615,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createPartitionRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -524,7 +632,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       String key = "emp1";
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       Object value = region.get(key);
@@ -539,7 +647,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -567,7 +675,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true, "id,age");
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -597,7 +705,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       PdxInstance pdxEmployee1 =
           ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
               .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -629,7 +737,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     createRegionUsingGfsh();
     createJdbcDataSource();
     createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true);
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       String key = "id1";
       Employee value = new Employee(key, "Emp1", 55);
       Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
@@ -701,7 +809,7 @@ public abstract class JdbcDistributedTest implements Serializable {
         new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2,
             3, 4, 5.5f, 6.0, "BigEmp", new Date(0), "BigEmpObject", new byte[] {1, 2}, 'c');
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       insertDataForAllSupportedFieldsTable(key, value);
     });
 
@@ -728,7 +836,7 @@ public abstract class JdbcDistributedTest implements Serializable {
     String key = "id1";
     ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
 
-    server.invoke(() -> {
+    dataserver.invoke(() -> {
       insertNullDataForAllSupportedFieldsTable(key);
     });
 
@@ -779,6 +887,45 @@ public abstract class JdbcDistributedTest implements Serializable {
     gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
   }
 
+  private void createRegionUsingGfshForGroup(boolean isAccessor, String groupName) {
+    StringBuffer createRegionCmd = new StringBuffer();
+    createRegionCmd.append("create region --name=" + REGION_NAME + " --groups=" + groupName
+        + (isAccessor
+            ? " --type=" + RegionShortcut.REPLICATE_PROXY.name()
+            : " --type=" + RegionShortcut.REPLICATE.name()));
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+  }
+
+  private void createPartitionRegionUsingGfshForGroup(boolean isAccessor, String groupName) {
+    StringBuffer createRegionCmd = new StringBuffer();
+    createRegionCmd
+        .append("create region --name=" + REGION_NAME + " --groups=" + groupName
+            + (isAccessor
+                ? " --type=" + RegionShortcut.PARTITION_PROXY.name()
+                : " --type=" + RegionShortcut.PARTITION.name())
+            + " --redundant-copies=1");
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+  }
+
+  private void createMappingForGroup(String regionName, String connectionName, String pdxClassName,
+      boolean synchronous, String groupName, String ids) {
+    final String commandStr = "create jdbc-mapping --region=" + regionName
+        + " --data-source=" + connectionName
+        + " --table=" + TABLE_NAME
+        + " --synchronous=" + synchronous
+        + " --pdx-name=" + pdxClassName
+        + " --groups=" + groupName
+        + ((ids != null) ? (" --id=" + ids) : "");
+    gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+    if (!synchronous) {
+      final String alterAsyncQueue =
+          "alter async-event-queue --id="
+              + MappingCommandUtils.createAsyncEventQueueName(regionName)
+              + " --batch-size=1 --batch-time-interval=0";
+      gfsh.executeAndAssertThat(alterAsyncQueue).statusIsSuccess();
+    }
+  }
+
   private void createPartitionRegionUsingGfsh() {
     StringBuffer createRegionCmd = new StringBuffer();
     createRegionCmd