You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/04/15 21:59:26 UTC
[geode] 01/01: GEODE-6640: add dunit test case: put into
jdbc-mapping on accessor then do get
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-6640
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 8f5f0f1eba4663ec02b653ef446f77ab3b1b79f9
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Apr 15 14:57:57 2019 -0700
GEODE-6640: add dunit test case: put into jdbc-mapping on accessor then do get
Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
Co-authored-by: Benjamin Ross <br...@pivotal.io>
---
.../geode/connectors/jdbc/JdbcDistributedTest.java | 203 ++++++++++++++++++---
1 file changed, 175 insertions(+), 28 deletions(-)
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
index 40772bd..2253d80 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
@@ -37,6 +37,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
@@ -80,7 +81,8 @@ public abstract class JdbcDistributedTest implements Serializable {
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
- private MemberVM server;
+ private MemberVM dataserver;
+ private MemberVM accessorserver;
private MemberVM locator;
private String connectionUrl;
@@ -96,13 +98,25 @@ public abstract class JdbcDistributedTest implements Serializable {
public abstract String getConnectionUrl() throws IOException, InterruptedException;
private void createTable() throws SQLException {
- server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+ dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
Connection connection = getConnection();
Statement statement = connection.createStatement();
statement.execute("Create Table " + TABLE_NAME
+ " (id varchar(10) primary key not null, name varchar(10), age int not null)");
}
+ private void createTableForGroup(int idx, String groupName) throws SQLException {
+ dataserver = startupRule.startServerVM(idx, groupName, locator.getPort());
+ Connection connection = getConnection();
+ Statement statement = connection.createStatement();
+ statement.execute("Create Table " + TABLE_NAME
+ + " (id varchar(10) primary key not null, name varchar(10), age int not null)");
+ }
+
+ private void addServerForAccessor(int idx, String groupName) throws SQLException {
+ accessorserver = startupRule.startServerVM(idx, groupName, locator.getPort());
+ }
+
private void alterTable() throws SQLException {
Connection connection = getConnection();
Statement statement = connection.createStatement();
@@ -111,7 +125,7 @@ public abstract class JdbcDistributedTest implements Serializable {
}
private void createTableForAllSupportedFields() throws SQLException {
- server = startupRule.startServerVM(1,
+ dataserver = startupRule.startServerVM(1,
x -> x.withConnectionToLocator(locator.getPort()).withPDXReadSerialized());
Connection connection = getConnection();
DatabaseMetaData metaData = connection.getMetaData();
@@ -199,7 +213,7 @@ public abstract class JdbcDistributedTest implements Serializable {
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
createJdbcDataSource();
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("name", "Emp1").writeInt("age", 55).create();
@@ -221,7 +235,7 @@ public abstract class JdbcDistributedTest implements Serializable {
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
createJdbcDataSource();
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("name", "Emp1").writeInt("age", 55).create();
@@ -246,7 +260,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
alterTable();
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -269,7 +283,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -282,7 +296,7 @@ public abstract class JdbcDistributedTest implements Serializable {
region.invalidate(key);
});
alterTable();
- server.invoke(() -> {
+ dataserver.invoke(() -> {
String key = "id1";
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
assertThatThrownBy(() -> region.get(key))
@@ -305,12 +319,106 @@ public abstract class JdbcDistributedTest implements Serializable {
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
alterTable();
assertThatThrownBy(
- () -> startupRule.startServerVM(2, x -> x.withConnectionToLocator(locator.getPort())))
+ () -> startupRule.startServerVM(3, x -> x.withConnectionToLocator(locator.getPort())))
.hasCauseExactlyInstanceOf(JdbcConnectorException.class).hasStackTraceContaining(
"Jdbc mapping for \"employees\" does not match table definition, check logs for more details.");
}
@Test
+ public void startAccessorForPRThenPutAndGet() throws Exception {
+ createTableForGroup(1, "datagroup");
+ addServerForAccessor(3, "accessorgroup");
+ createPartitionRegionUsingGfshForGroup(false, "datagroup");
+ createPartitionRegionUsingGfshForGroup(true, "accessorgroup");
+ createJdbcDataSource();
+ createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+ "datagroup", null);
+ createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+ "accessorgroup", null);
+
+ dataserver.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+ ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+ String pdxkey1 = "pdxkey1";
+ Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+ region.put(pdxkey1, pdxEmployee1);
+ region.put("key1", new Employee("key1", "name1", 30));
+ region.invalidate(pdxkey1);
+ region.invalidate("key1");
+ await().untilAsserted(() -> {
+ assertThat(region.get(pdxkey1)).isNotNull();
+ assertThat(region.get("key1")).isNotNull();
+ });
+ });
+
+ accessorserver.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+ ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "id2").writeString("name", "Emp2").writeInt("age", 56).create();
+
+ String pdxkey2 = "pdxkey2";
+ Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+ region.put(pdxkey2, pdxEmployee1);
+ region.put("key2", new Employee("key2", "name2", 30));
+ region.invalidate(pdxkey2);
+ region.invalidate("key2");
+ await().untilAsserted(() -> {
+ assertThat(region.get(pdxkey2)).isNotNull();
+ assertThat(region.get("key2")).isNotNull();
+ });
+ });
+ }
+
+ @Test
+ public void startAccessorForRRThenPutAndGet() throws Exception {
+ createTableForGroup(1, "datagroup");
+ addServerForAccessor(3, "accessorgroup");
+ createRegionUsingGfshForGroup(false, "datagroup");
+ createRegionUsingGfshForGroup(true, "accessorgroup");
+ createJdbcDataSource();
+ createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+ "datagroup", null);
+ createMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), false,
+ "accessorgroup", null);
+
+ dataserver.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+ ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+ String pdxkey1 = "pdxkey1";
+ Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+ region.put(pdxkey1, pdxEmployee1);
+ region.put("key1", new Employee("key1", "name1", 30));
+ region.invalidate(pdxkey1);
+ region.invalidate("key1");
+ await().untilAsserted(() -> {
+ assertThat(region.get(pdxkey1)).isNotNull();
+ assertThat(region.get("key1")).isNotNull();
+ });
+ });
+
+ accessorserver.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+ ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "id2").writeString("name", "Emp2").writeInt("age", 56).create();
+
+ String pdxkey2 = "pdxkey2";
+ Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+ region.put(pdxkey2, pdxEmployee1);
+ region.put("key2", new Employee("key2", "name2", 30));
+ region.invalidate(pdxkey2);
+ region.invalidate("key2");
+ await().untilAsserted(() -> {
+ assertThat(region.get(pdxkey2)).isNotNull();
+ assertThat(region.get("key2")).isNotNull();
+ });
+ });
+ }
+
+ @Test
public void throwsExceptionWhenNoDataSourceExists() throws Exception {
createTable();
createRegionUsingGfsh();
@@ -359,8 +467,8 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void verifyDateToDate() throws Exception {
- server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
- server.invoke(() -> {
+ dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+ dataserver.invoke(() -> {
Connection connection = DriverManager.getConnection(connectionUrl);
Statement statement = connection.createStatement();
statement.execute(
@@ -373,7 +481,7 @@ public abstract class JdbcDistributedTest implements Serializable {
final String key = "emp1";
final java.sql.Date sqlDate = java.sql.Date.valueOf("1982-09-11");
final Date jdkDate = new Date(sqlDate.getTime());
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance testDateInput =
ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
.writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -394,8 +502,8 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void verifyDateToTime() throws Exception {
- server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
- server.invoke(() -> {
+ dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+ dataserver.invoke(() -> {
Connection connection = DriverManager.getConnection(connectionUrl);
Statement statement = connection.createStatement();
statement.execute(
@@ -408,7 +516,7 @@ public abstract class JdbcDistributedTest implements Serializable {
final String key = "emp1";
final java.sql.Time sqlTime = java.sql.Time.valueOf("23:59:59");
final Date jdkDate = new Date(sqlTime.getTime());
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance testDateInput =
ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
.writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -429,8 +537,8 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void verifyDateToTimestamp() throws Exception {
- server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
- createTableWithTimeStamp(server, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
+ dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+ createTableWithTimeStamp(dataserver, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
createRegionUsingGfsh();
createJdbcDataSource();
@@ -438,7 +546,7 @@ public abstract class JdbcDistributedTest implements Serializable {
final String key = "emp1";
final java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1982-09-11 23:59:59.123");
final Date jdkDate = new Date(sqlTimestamp.getTime());
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance testDateInput =
ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName())
.writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create();
@@ -473,7 +581,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -490,7 +598,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -507,7 +615,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createPartitionRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -524,7 +632,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
String key = "emp1";
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
Object value = region.get(key);
@@ -539,7 +647,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -567,7 +675,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true, "id,age");
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -597,7 +705,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
@@ -629,7 +737,7 @@ public abstract class JdbcDistributedTest implements Serializable {
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
String key = "id1";
Employee value = new Employee(key, "Emp1", 55);
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
@@ -701,7 +809,7 @@ public abstract class JdbcDistributedTest implements Serializable {
new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2,
3, 4, 5.5f, 6.0, "BigEmp", new Date(0), "BigEmpObject", new byte[] {1, 2}, 'c');
- server.invoke(() -> {
+ dataserver.invoke(() -> {
insertDataForAllSupportedFieldsTable(key, value);
});
@@ -728,7 +836,7 @@ public abstract class JdbcDistributedTest implements Serializable {
String key = "id1";
ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
- server.invoke(() -> {
+ dataserver.invoke(() -> {
insertNullDataForAllSupportedFieldsTable(key);
});
@@ -779,6 +887,45 @@ public abstract class JdbcDistributedTest implements Serializable {
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
}
+ private void createRegionUsingGfshForGroup(boolean isAccessor, String groupName) {
+ StringBuffer createRegionCmd = new StringBuffer();
+ createRegionCmd.append("create region --name=" + REGION_NAME + " --groups=" + groupName
+ + (isAccessor
+ ? " --type=" + RegionShortcut.REPLICATE_PROXY.name()
+ : " --type=" + RegionShortcut.REPLICATE.name()));
+ gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+ }
+
+ private void createPartitionRegionUsingGfshForGroup(boolean isAccessor, String groupName) {
+ StringBuffer createRegionCmd = new StringBuffer();
+ createRegionCmd
+ .append("create region --name=" + REGION_NAME + " --groups=" + groupName
+ + (isAccessor
+ ? " --type=" + RegionShortcut.PARTITION_PROXY.name()
+ : " --type=" + RegionShortcut.PARTITION.name())
+ + " --redundant-copies=1");
+ gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+ }
+
+ private void createMappingForGroup(String regionName, String connectionName, String pdxClassName,
+ boolean synchronous, String groupName, String ids) {
+ final String commandStr = "create jdbc-mapping --region=" + regionName
+ + " --data-source=" + connectionName
+ + " --table=" + TABLE_NAME
+ + " --synchronous=" + synchronous
+ + " --pdx-name=" + pdxClassName
+ + " --groups=" + groupName
+ + ((ids != null) ? (" --id=" + ids) : "");
+ gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+ if (!synchronous) {
+ final String alterAsyncQueue =
+ "alter async-event-queue --id="
+ + MappingCommandUtils.createAsyncEventQueueName(regionName)
+ + " --batch-size=1 --batch-time-interval=0";
+ gfsh.executeAndAssertThat(alterAsyncQueue).statusIsSuccess();
+ }
+ }
+
private void createPartitionRegionUsingGfsh() {
StringBuffer createRegionCmd = new StringBuffer();
createRegionCmd