You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/11/14 18:44:54 UTC
[geode] branch develop updated: GEODE-6010: change create
jdbc-mapping to alter region and create async-queue (#2836)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 0dc46ea GEODE-6010: change create jdbc-mapping to alter region and create async-queue (#2836)
0dc46ea is described below
commit 0dc46eab3a95195ea45e2215047a5a79a0bde2e8
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed Nov 14 10:44:45 2018 -0800
GEODE-6010: change create jdbc-mapping to alter region and create async-queue (#2836)
create jdbc-mapping now also does the following:
-- alters the region to have a JdbcLoader as its cache-loader
-- if synchronous, alters the region to have a JdbcWriter as its cache-writer
-- if asynchronous, creates an async-event-queue and alters regions to have it
The async-event-queue is created parallel for partitioned regions, and serial for all other regions.
create jdbc-mapping now requires cluster configuration and uses it to check
all required preconditions before the command starts changing things.
Co-authored-by: Darrel Schneider <ds...@pivotal.io>
Co-authored-by: Jianxia Chen <jc...@pivotal.io>
Co-authored-by: Scott Jewell <sj...@pivotal.io>
---
.../geode/connectors/jdbc/JdbcDistributedTest.java | 180 +++----
.../cli/CreateMappingCommandDUnitTest.java | 267 +++++++++-
.../cli/CreateMappingCommandIntegrationTest.java | 121 -----
.../apache/geode/codeAnalysis/excludedClasses.txt | 1 +
.../jdbc/internal/cli/CreateMappingCommand.java | 211 +++++++-
.../jdbc/internal/cli/CreateMappingFunction.java | 61 ++-
.../jdbc/internal/cli/PreconditionException.java | 28 ++
.../internal/cli/CreateMappingCommandTest.java | 537 +++++++++++++++++++++
.../internal/cli/CreateMappingFunctionTest.java | 98 +++-
9 files changed, 1242 insertions(+), 262 deletions(-)
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
index c0e44c0..94370e9 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
import org.apache.geode.pdx.internal.AutoSerializableManager;
@@ -179,8 +180,11 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception {
createTable();
- createRegionUsingGfsh(true, false, false);
- createJdbcConnection();
+ StringBuffer createRegionCmd = new StringBuffer();
+ createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE"
+ + " --cache-writer=" + JdbcWriter.class.getName());
+ gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+ createJdbcDataSource();
server.invoke(() -> {
PdxInstance pdxEmployee1 =
@@ -197,8 +201,12 @@ public abstract class JdbcDistributedTest implements Serializable {
public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws Exception {
createTable();
IgnoredException.addIgnoredException("JdbcConnectorException");
- createRegionUsingGfsh(false, true, false);
- createJdbcConnection();
+ StringBuffer createRegionCmd = new StringBuffer();
+ createAsyncListener("JAW");
+ createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE"
+ + " --async-event-queue-id=JAW");
+ gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+ createJdbcDataSource();
server.invoke(() -> {
PdxInstance pdxEmployee1 =
@@ -212,32 +220,14 @@ public abstract class JdbcDistributedTest implements Serializable {
await().untilAsserted(() -> {
assertThat(asyncWriter.getFailedEvents()).isEqualTo(1);
});
-
- });
- }
-
- @Test
- public void throwsExceptionWhenNoMappingMatches() throws Exception {
- createTable();
- createRegionUsingGfsh(true, false, false);
- createJdbcConnection();
-
- server.invoke(() -> {
- PdxInstance pdxEmployee1 =
- ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
- .writeString("name", "Emp1").writeInt("age", 55).create();
- Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
- assertThatThrownBy(() -> region.put("key1", pdxEmployee1))
- .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
- "JDBC mapping for region employees not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
});
}
@Test
public void throwsExceptionWhenNoDataSourceExists() throws Exception {
createTable();
- createRegionUsingGfsh(true, false, false);
- createMapping(REGION_NAME, DATA_SOURCE_NAME);
+ createRegionUsingGfsh();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
@@ -262,9 +252,9 @@ public abstract class JdbcDistributedTest implements Serializable {
"Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
+ TestDate.DATE_FIELD_NAME + " date)");
});
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
final String key = "emp1";
final java.sql.Date sqlDate = java.sql.Date.valueOf("1982-09-11");
final Date jdkDate = new Date(sqlDate.getTime());
@@ -297,9 +287,9 @@ public abstract class JdbcDistributedTest implements Serializable {
"Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
+ TestDate.DATE_FIELD_NAME + " time)");
});
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
final String key = "emp1";
final java.sql.Time sqlTime = java.sql.Time.valueOf("23:59:59");
final Date jdkDate = new Date(sqlTime.getTime());
@@ -327,9 +317,9 @@ public abstract class JdbcDistributedTest implements Serializable {
server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
createTableWithTimeStamp(server, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
final String key = "emp1";
final java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1982-09-11 23:59:59.123");
final Date jdkDate = new Date(sqlTimestamp.getTime());
@@ -365,9 +355,9 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void putWritesToDB() throws Exception {
createTable();
- createRegionUsingGfsh(true, false, false);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME);
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -382,9 +372,26 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void putAsyncWritesToDB() throws Exception {
createTable();
- createRegionUsingGfsh(true, false, false);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME);
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+ ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+ String key = "emp1";
+ ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1);
+ assertTableHasEmployeeData(1, pdxEmployee1, key);
+ });
+ }
+
+ @Test
+ public void putAsyncWithPartitionWritesToDB() throws Exception {
+ createTable();
+ createPartitionRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -399,9 +406,9 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void getReadsFromEmptyDB() throws Exception {
createTable();
- createRegionUsingGfsh(false, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME);
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
server.invoke(() -> {
String key = "emp1";
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
@@ -414,9 +421,9 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void getReadsFromDB() throws Exception {
createTable();
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME);
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -442,9 +449,9 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void getReadsFromDBWithAsyncWriter() throws Exception {
createTable();
- createRegionUsingGfsh(false, true, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME);
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
@@ -452,7 +459,8 @@ public abstract class JdbcDistributedTest implements Serializable {
String key = "id1";
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter) ClusterStartupRule.getCache()
- .getAsyncEventQueue("JAW").getAsyncEventListener();
+ .getAsyncEventQueue(CreateMappingCommand.createAsyncEventQueueName(REGION_NAME))
+ .getAsyncEventListener();
region.put(key, pdxEmployee1);
await().untilAsserted(() -> {
@@ -473,9 +481,9 @@ public abstract class JdbcDistributedTest implements Serializable {
@Test
public void getReadsFromDBWithPdxClassName() throws Exception {
createTable();
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true);
server.invoke(() -> {
String key = "id1";
Employee value = new Employee(key, "Emp1", 55);
@@ -495,9 +503,9 @@ public abstract class JdbcDistributedTest implements Serializable {
ClientVM client = getClientVM();
createClientRegion(client);
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
client.invoke(() -> {
String key = "id1";
ClassWithSupportedPdxFields value =
@@ -519,9 +527,9 @@ public abstract class JdbcDistributedTest implements Serializable {
ClientVM client = getClientVM();
createClientRegion(client);
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
client.invoke(() -> {
String key = "id1";
ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
@@ -540,9 +548,9 @@ public abstract class JdbcDistributedTest implements Serializable {
createTableForAllSupportedFields();
ClientVM client = getClientVM();
createClientRegion(client);
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
String key = "id1";
ClassWithSupportedPdxFields value =
new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2,
@@ -569,9 +577,9 @@ public abstract class JdbcDistributedTest implements Serializable {
createTableForAllSupportedFields();
ClientVM client = getClientVM();
createClientRegion(client);
- createRegionUsingGfsh(true, false, true);
- createJdbcConnection();
- createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
+ createRegionUsingGfsh();
+ createJdbcDataSource();
+ createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
String key = "id1";
ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
@@ -607,9 +615,9 @@ public abstract class JdbcDistributedTest implements Serializable {
});
}
- private void createJdbcConnection() {
+ private void createJdbcDataSource() {
final String commandStr =
- "create jndi-binding --type=POOLED --name=" + DATA_SOURCE_NAME + " --url=" + connectionUrl;
+ "create data-source --pooled --name=" + DATA_SOURCE_NAME + " --url=" + connectionUrl;
gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
}
@@ -620,32 +628,36 @@ public abstract class JdbcDistributedTest implements Serializable {
gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
}
- private void createRegionUsingGfsh(boolean withCacheWriter, boolean withAsyncWriter,
- boolean withLoader) {
+ private void createRegionUsingGfsh() {
StringBuffer createRegionCmd = new StringBuffer();
- createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE ");
- if (withCacheWriter) {
- createRegionCmd.append(" --cache-writer=" + JdbcWriter.class.getName());
- }
- if (withLoader) {
- createRegionCmd.append(" --cache-loader=" + JdbcLoader.class.getName());
- }
- if (withAsyncWriter) {
- createAsyncListener("JAW");
- createRegionCmd.append(" --async-event-queue-id=JAW");
- }
+ createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE");
+ gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+ }
+ private void createPartitionRegionUsingGfsh() {
+ StringBuffer createRegionCmd = new StringBuffer();
+ createRegionCmd.append("create region --name=" + REGION_NAME + " --type=PARTITION");
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
}
- private void createMapping(String regionName, String connectionName) {
- createMapping(regionName, connectionName, Employee.class.getName());
+ private void createMapping(String regionName, String connectionName, boolean synchronous) {
+ createMapping(regionName, connectionName, Employee.class.getName(), synchronous);
}
- private void createMapping(String regionName, String connectionName, String pdxClassName) {
- final String commandStr = "create jdbc-mapping --region=" + regionName + " --data-source="
- + connectionName + (pdxClassName != null ? " --pdx-name=" + pdxClassName : "");
+ private void createMapping(String regionName, String connectionName, String pdxClassName,
+ boolean synchronous) {
+ final String commandStr = "create jdbc-mapping --region=" + regionName
+ + " --data-source=" + connectionName
+ + " --synchronous=" + synchronous
+ + " --pdx-name=" + pdxClassName;
gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+ if (!synchronous) {
+ final String alterAsyncQueue =
+ "alter async-event-queue --id="
+ + CreateMappingCommand.createAsyncEventQueueName(regionName)
+ + " --batch-size=1 --batch-time-interval=0";
+ gfsh.executeAndAssertThat(alterAsyncQueue).statusIsSuccess();
+ }
}
private void assertTableHasEmployeeData(int size, PdxInstance employee, String key)
diff --git a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
index 8d86ec9..d9f8e94 100644
--- a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
+++ b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
@@ -18,14 +18,25 @@ import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand
import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__DATA_SOURCE_NAME;
import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__PDX_NAME;
import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__REGION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__SYNCHRONOUS_NAME;
import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__TABLE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.List;
+
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
+import org.apache.geode.connectors.jdbc.JdbcWriter;
import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
import org.apache.geode.distributed.internal.InternalLocator;
@@ -61,12 +72,96 @@ public class CreateMappingCommandDUnitTest {
gfsh.connectAndVerify(locator);
- gfsh.executeAndAssertThat("create region --name=" + REGION_NAME + " --type=REPLICATE")
+ }
+
+ private void setupReplicate() {
+ setupReplicate(false);
+ }
+
+ private void setupReplicate(boolean addLoader) {
+ gfsh.executeAndAssertThat("create region --name=" + REGION_NAME + " --type=REPLICATE"
+ + (addLoader ? " --cache-loader=" + JdbcLoader.class.getName() : ""))
.statusIsSuccess();
}
+ private void setupPartition() {
+ gfsh.executeAndAssertThat("create region --name=" + REGION_NAME + " --type=PARTITION")
+ .statusIsSuccess();
+ }
+
+ private void setupAsyncEventQueue() {
+ gfsh.executeAndAssertThat(
+ "create async-event-queue --id="
+ + CreateMappingCommand.createAsyncEventQueueName(REGION_NAME)
+ + " --listener=" + JdbcAsyncWriter.class.getName())
+ .statusIsSuccess();
+ }
+
+ private static RegionMapping getRegionMappingFromClusterConfig() {
+ CacheConfig cacheConfig =
+ InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(null);
+ RegionConfig regionConfig = cacheConfig.getRegions().stream()
+ .filter(region -> region.getName().equals(REGION_NAME)).findFirst().orElse(null);
+ return (RegionMapping) regionConfig.getCustomRegionElements().stream()
+ .filter(element -> element instanceof RegionMapping).findFirst().orElse(null);
+ }
+
+ private static RegionMapping getRegionMappingFromService() {
+ return ClusterStartupRule.getCache().getService(JdbcConnectorService.class)
+ .getMappingForRegion(REGION_NAME);
+ }
+
+ private static void validateAsyncEventQueueCreatedInClusterConfig(boolean isParallel) {
+ CacheConfig cacheConfig =
+ InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(null);
+ List<CacheConfig.AsyncEventQueue> queueList = cacheConfig.getAsyncEventQueues();
+ CacheConfig.AsyncEventQueue queue = queueList.get(0);
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+ assertThat(queue.getId()).isEqualTo(queueName);
+ assertThat(queue.getAsyncEventListener().getClassName())
+ .isEqualTo(JdbcAsyncWriter.class.getName());
+ assertThat(queue.isParallel()).isEqualTo(isParallel);
+ }
+
+ private static void validateRegionAlteredInClusterConfig(boolean synchronous) {
+ CacheConfig cacheConfig =
+ InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(null);
+ RegionConfig regionConfig = cacheConfig.getRegions().stream()
+ .filter(region -> region.getName().equals(REGION_NAME)).findFirst().orElse(null);
+ RegionAttributesType attributes = regionConfig.getRegionAttributes().get(0);
+ assertThat(attributes.getCacheLoader().getClassName()).isEqualTo(JdbcLoader.class.getName());
+ if (synchronous) {
+ assertThat(attributes.getCacheWriter().getClassName()).isEqualTo(JdbcWriter.class.getName());
+ } else {
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+ assertThat(attributes.getAsyncEventQueueIds()).isEqualTo(queueName);
+ }
+ }
+
+ private static void validateAsyncEventQueueCreatedOnServer(boolean isParallel) {
+ InternalCache cache = ClusterStartupRule.getCache();
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+ AsyncEventQueue queue = cache.getAsyncEventQueue(queueName);
+ assertThat(queue).isNotNull();
+ assertThat(queue.getAsyncEventListener()).isInstanceOf(JdbcAsyncWriter.class);
+ assertThat(queue.isParallel()).isEqualTo(isParallel);
+ }
+
+ private static void validateRegionAlteredOnServer(boolean synchronous) {
+ InternalCache cache = ClusterStartupRule.getCache();
+ Region<?, ?> region = cache.getRegion(REGION_NAME);
+ assertThat(region.getAttributes().getCacheLoader()).isInstanceOf(JdbcLoader.class);
+ if (synchronous) {
+ assertThat(region.getAttributes().getCacheWriter()).isInstanceOf(JdbcWriter.class);
+ } else {
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+ assertThat(region.getAttributes().getAsyncEventQueueIds()).contains(queueName);
+ }
+ }
+
@Test
- public void createsMappingWithAllOptions() {
+ public void createMappingUpdatesServiceAndClusterConfig() {
+ setupReplicate();
CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
@@ -75,24 +170,57 @@ public class CreateMappingCommandDUnitTest {
gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ server.invoke(() -> {
+ RegionMapping mapping = getRegionMappingFromService();
+ assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(mapping.getTableName()).isEqualTo("myTable");
+ assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredOnServer(false);
+ validateAsyncEventQueueCreatedOnServer(false);
+ });
+
locator.invoke(() -> {
- String xml = InternalLocator.getLocator().getConfigurationPersistenceService()
- .getConfiguration("cluster").getCacheXmlContent();
- assertThat(xml).isNotNull().contains("jdbc:mapping");
+ RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+ assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+ assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredInClusterConfig(false);
+ validateAsyncEventQueueCreatedInClusterConfig(false);
});
+ }
+
+ @Test
+ public void createSynchronousMappingUpdatesServiceAndClusterConfig() {
+ setupReplicate();
+ CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+ csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+ csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+ csb.addOption(CREATE_MAPPING__TABLE_NAME, "myTable");
+ csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+ csb.addOption(CREATE_MAPPING__SYNCHRONOUS_NAME, "true");
+
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
server.invoke(() -> {
- InternalCache cache = ClusterStartupRule.getCache();
- RegionMapping mapping =
- cache.getService(JdbcConnectorService.class).getMappingForRegion(REGION_NAME);
+ RegionMapping mapping = getRegionMappingFromService();
assertThat(mapping.getDataSourceName()).isEqualTo("connection");
assertThat(mapping.getTableName()).isEqualTo("myTable");
assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredOnServer(true);
+ });
+
+ locator.invoke(() -> {
+ RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+ assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+ assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredInClusterConfig(true);
});
}
@Test
- public void createsRegionMappingUpdatesClusterConfig() {
+ public void createMappingWithPartitionUpdatesServiceAndClusterConfig() {
+ setupPartition();
CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
@@ -101,15 +229,57 @@ public class CreateMappingCommandDUnitTest {
gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ server.invoke(() -> {
+ RegionMapping mapping = getRegionMappingFromService();
+ assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(mapping.getTableName()).isEqualTo("myTable");
+ assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredOnServer(false);
+ validateAsyncEventQueueCreatedOnServer(true);
+ });
+
+ locator.invoke(() -> {
+ RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+ assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+ assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredInClusterConfig(false);
+ validateAsyncEventQueueCreatedInClusterConfig(true);
+ });
+ }
+
+ @Test
+ public void createMappingWithNoTable() {
+ setupReplicate();
+ CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+ csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+ csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+ csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ server.invoke(() -> {
+ RegionMapping mapping = getRegionMappingFromService();
+ assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(mapping.getTableName()).isNull();
+ assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredOnServer(false);
+ validateAsyncEventQueueCreatedOnServer(false);
+ });
+
locator.invoke(() -> {
- String xml = InternalLocator.getLocator().getConfigurationPersistenceService()
- .getConfiguration("cluster").getCacheXmlContent();
- assertThat(xml).isNotNull().contains("jdbc:mapping");
+ RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+ assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(regionMapping.getTableName()).isNull();
+ assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
+ validateRegionAlteredInClusterConfig(false);
+ validateAsyncEventQueueCreatedInClusterConfig(false);
});
}
@Test
public void createExistingRegionMappingFails() {
+ setupReplicate();
CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
@@ -119,27 +289,78 @@ public class CreateMappingCommandDUnitTest {
csb = new CommandStringBuilder(CREATE_MAPPING);
csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
- csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
- csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
- csb.addOption(CREATE_MAPPING__TABLE_NAME, "bogus");
- gfsh.executeAndAssertThat(csb.toString()).statusIsError();
+ csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "bogusConnection");
+ csb.addOption(CREATE_MAPPING__PDX_NAME, "bogusPdxClass");
+ csb.addOption(CREATE_MAPPING__TABLE_NAME, "bogusTable");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+ .containsOutput("A jdbc-mapping for " + REGION_NAME + " already exists");
+
+ server.invoke(() -> {
+ RegionMapping mapping = getRegionMappingFromService();
+ assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(mapping.getTableName()).isEqualTo("myTable");
+ assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
+ });
locator.invoke(() -> {
- String xml = InternalLocator.getLocator().getConfigurationPersistenceService()
- .getConfiguration("cluster").getCacheXmlContent();
- assertThat(xml).isNotNull().contains("jdbc:mapping").contains("myTable")
- .contains("myPdxClass")
- .doesNotContain("bogus");
+ RegionMapping regionMapping = getRegionMappingFromClusterConfig();
+ assertThat(regionMapping.getDataSourceName()).isEqualTo("connection");
+ assertThat(regionMapping.getTableName()).isEqualTo("myTable");
+ assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
});
}
@Test
public void createMappingWithoutPdxNameFails() {
+ setupReplicate();
CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
- // NOTE: --table is optional so it should not be in the ouput but it is. See GEODE-3468.
+
+ // NOTE: --table is optional so it should not be in the output but it is. See GEODE-3468.
gfsh.executeAndAssertThat(csb.toString()).statusIsError()
- .containsOutput("You should specify option (--table, --pdx-name) for this command");
+ .containsOutput(
+ "You should specify option (--table, --pdx-name, --synchronous) for this command");
}
+
+ @Test
+ public void createMappingWithNonExistentRegionFails() {
+ setupReplicate();
+ CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+ csb.addOption(CREATE_MAPPING__REGION_NAME, "bogusRegion");
+ csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+ csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+ gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+ .containsOutput("A region named bogusRegion must already exist");
+ }
+
+ @Test
+ public void createMappingWithRegionThatHasALoaderFails() {
+ setupReplicate(true);
+ CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+ csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+ csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+ csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+ gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+ .containsOutput("The existing region " + REGION_NAME
+ + " must not already have a cache-loader, but it has " + JdbcLoader.class.getName());
+ }
+
+ @Test
+ public void createMappingWithExistingQueueFails() {
+ setupReplicate();
+ setupAsyncEventQueue();
+ CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+ csb.addOption(CREATE_MAPPING__REGION_NAME, REGION_NAME);
+ csb.addOption(CREATE_MAPPING__DATA_SOURCE_NAME, "connection");
+ csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
+
+ gfsh.executeAndAssertThat(csb.toString()).statusIsError()
+ .containsOutput("An async-event-queue named "
+ + CreateMappingCommand.createAsyncEventQueueName(REGION_NAME)
+ + " must not already exist.");
+ }
+
}
diff --git a/geode-connectors/src/integrationTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java b/geode-connectors/src/integrationTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java
deleted file mode 100644
index f3d33ee..0000000
--- a/geode-connectors/src/integrationTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.connectors.jdbc.internal.cli;
-
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
-import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
-import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.result.model.ResultModel;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.junit.categories.JDBCConnectorTest;
-
-@Category({JDBCConnectorTest.class})
-public class CreateMappingCommandIntegrationTest {
-
- private InternalCache cache;
- private CreateMappingCommand createRegionMappingCommand;
-
- private String regionName;
- private String dataSourceName;
- private String tableName;
- private String pdxClass;
-
- @Before
- public void setup() {
- regionName = "regionName";
- dataSourceName = "connection";
- tableName = "testTable";
- pdxClass = "myPdxClass";
-
- cache = (InternalCache) new CacheFactory().set("locators", "").set("mcast-port", "0")
- .set(ENABLE_CLUSTER_CONFIGURATION, "true").create();
- cache.createRegionFactory(RegionShortcut.LOCAL).create(regionName);
-
- createRegionMappingCommand = new CreateMappingCommand();
- createRegionMappingCommand.setCache(cache);
- }
-
- @After
- public void tearDown() {
- cache.close();
- }
-
- @Test
- public void createsRegionMappingInService() {
- ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
- tableName, pdxClass);
-
- assertThat(result.getStatus()).isSameAs(Result.Status.OK);
-
- JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
- RegionMapping regionMapping = service.getMappingForRegion(regionName);
-
- assertThat(regionMapping).isNotNull();
- assertThat(regionMapping.getRegionName()).isEqualTo(regionName);
- assertThat(regionMapping.getDataSourceName()).isEqualTo(dataSourceName);
- assertThat(regionMapping.getTableName()).isEqualTo(tableName);
- assertThat(regionMapping.getPdxName()).isEqualTo(pdxClass);
- }
-
- @Test
- public void createsRegionMappingOnceOnly() {
- JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
- ResultModel result;
- result =
- createRegionMappingCommand.createMapping(regionName, dataSourceName, tableName, pdxClass);
- assertThat(result.getStatus()).isSameAs(Result.Status.OK);
-
- IgnoredException ignoredException =
- IgnoredException.addIgnoredException(RegionMappingExistsException.class.getName());
-
- try {
- result =
- createRegionMappingCommand.createMapping(regionName, dataSourceName, tableName, pdxClass);
- } finally {
- ignoredException.remove();
- }
-
- assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
- }
-
- @Test
- public void createsRegionMappingWithMinimumParams() {
- ResultModel result =
- createRegionMappingCommand.createMapping(regionName, dataSourceName, null, pdxClass);
-
- assertThat(result.getStatus()).isSameAs(Result.Status.OK);
-
- JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
- RegionMapping regionMapping = service.getMappingForRegion(regionName);
-
- assertThat(regionMapping).isNotNull();
- assertThat(regionMapping.getRegionName()).isEqualTo(regionName);
- assertThat(regionMapping.getDataSourceName()).isEqualTo(dataSourceName);
- assertThat(regionMapping.getTableName()).isNull();
- assertThat(regionMapping.getPdxName()).isEqualTo(pdxClass);
- }
-}
diff --git a/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index c55b76e..49b44f7 100644
--- a/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-connectors/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -1,2 +1,3 @@
+org/apache/geode/connectors/jdbc/internal/cli/PreconditionException
org/apache/geode/connectors/jdbc/internal/xml/ElementType
org/apache/geode/connectors/jdbc/internal/xml/ElementType$1
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index 1de1062..deca66d 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.connectors.jdbc.internal.cli;
+
+
import java.util.List;
import java.util.Set;
@@ -22,7 +24,16 @@ import org.springframework.shell.core.annotation.CliOption;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
+import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.cache.configuration.RegionAttributesDataPolicy;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
+import org.apache.geode.connectors.jdbc.JdbcWriter;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+import org.apache.geode.distributed.ConfigurationPersistenceService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.SingleGfshCommand;
@@ -48,6 +59,13 @@ public class CreateMappingCommand extends SingleGfshCommand {
"Name of database table for values to be written to.";
static final String CREATE_MAPPING__DATA_SOURCE_NAME = "data-source";
static final String CREATE_MAPPING__DATA_SOURCE_NAME__HELP = "Name of JDBC data source to use.";
+ static final String CREATE_MAPPING__SYNCHRONOUS_NAME = "synchronous";
+ static final String CREATE_MAPPING__SYNCHRONOUS_NAME__HELP =
+ "By default, writes will be asynchronous. If true, writes will be synchronous.";
+
+ public static String createAsyncEventQueueName(String regionPath) {
+ return "JDBC#" + regionPath.replace('/', '_');
+ }
@CliCommand(value = CREATE_MAPPING, help = CREATE_MAPPING__HELP)
@CliMetaData(relatedTopic = CliStrings.DEFAULT_TOPIC_GEODE)
@@ -61,41 +79,192 @@ public class CreateMappingCommand extends SingleGfshCommand {
@CliOption(key = CREATE_MAPPING__TABLE_NAME,
help = CREATE_MAPPING__TABLE_NAME__HELP) String table,
@CliOption(key = CREATE_MAPPING__PDX_NAME, mandatory = true,
- help = CREATE_MAPPING__PDX_NAME__HELP) String pdxName) {
+ help = CREATE_MAPPING__PDX_NAME__HELP) String pdxName,
+ @CliOption(key = CREATE_MAPPING__SYNCHRONOUS_NAME,
+ help = CREATE_MAPPING__SYNCHRONOUS_NAME__HELP,
+ specifiedDefaultValue = "true", unspecifiedDefaultValue = "false") boolean synchronous) {
// input
Set<DistributedMember> targetMembers = getMembers(null, null);
- RegionMapping mapping = new RegionMapping(regionName,
- pdxName, table, dataSourceName);
+ RegionMapping mapping = new RegionMapping(regionName, pdxName, table, dataSourceName);
+
+ try {
+ ConfigurationPersistenceService configurationPersistenceService =
+ checkForClusterConfiguration();
+ CacheConfig cacheConfig = configurationPersistenceService.getCacheConfig(null);
+ RegionConfig regionConfig = checkForRegion(regionName, cacheConfig);
+ checkForExistingMapping(regionName, regionConfig);
+ checkForCacheLoader(regionName, regionConfig);
+ checkForCacheWriter(regionName, synchronous, regionConfig);
+ checkForAsyncQueue(regionName, synchronous, cacheConfig);
+ } catch (PreconditionException ex) {
+ return ResultModel.createError(ex.getMessage());
+ }
// action
+ Object[] arguments = new Object[] {mapping, synchronous};
List<CliFunctionResult> results =
- executeAndGetFunctionResult(new CreateMappingFunction(), mapping, targetMembers);
+ executeAndGetFunctionResult(new CreateMappingFunction(), arguments, targetMembers);
ResultModel result =
ResultModel.createMemberStatusResult(results, EXPERIMENTAL, null, false, true);
- result.setConfigObject(mapping);
+ result.setConfigObject(arguments);
return result;
}
+ private ConfigurationPersistenceService checkForClusterConfiguration()
+ throws PreconditionException {
+ ConfigurationPersistenceService result = getConfigurationPersistenceService();
+ if (result == null) {
+ throw new PreconditionException("Cluster Configuration must be enabled.");
+ }
+ return result;
+ }
+
+ private RegionConfig checkForRegion(String regionName, CacheConfig cacheConfig)
+ throws PreconditionException {
+ RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
+ if (regionConfig == null) {
+ throw new PreconditionException("A region named " + regionName + " must already exist.");
+ }
+ return regionConfig;
+ }
+
+ private void checkForExistingMapping(String regionName, RegionConfig regionConfig)
+ throws PreconditionException {
+ if (regionConfig.getCustomRegionElements().stream()
+ .anyMatch(element -> element instanceof RegionMapping)) {
+ throw new PreconditionException("A jdbc-mapping for " + regionName + " already exists.");
+ }
+ }
+
+ private void checkForCacheLoader(String regionName, RegionConfig regionConfig)
+ throws PreconditionException {
+ RegionAttributesType regionAttributes = regionConfig.getRegionAttributes().stream()
+ .filter(attributes -> attributes.getCacheLoader() != null).findFirst().orElse(null);
+ if (regionAttributes != null) {
+ DeclarableType loaderDeclarable = regionAttributes.getCacheLoader();
+ if (loaderDeclarable != null) {
+ throw new PreconditionException("The existing region " + regionName
+ + " must not already have a cache-loader, but it has "
+ + loaderDeclarable.getClassName());
+ }
+ }
+ }
+
+ private void checkForCacheWriter(String regionName, boolean synchronous,
+ RegionConfig regionConfig) throws PreconditionException {
+ if (synchronous) {
+ RegionAttributesType writerAttributes = regionConfig.getRegionAttributes().stream()
+ .filter(attributes -> attributes.getCacheWriter() != null).findFirst().orElse(null);
+ if (writerAttributes != null) {
+ DeclarableType writerDeclarable = writerAttributes.getCacheWriter();
+ if (writerDeclarable != null) {
+ throw new PreconditionException("The existing region " + regionName
+ + " must not already have a cache-writer, but it has "
+ + writerDeclarable.getClassName());
+ }
+ }
+ }
+ }
+
+ private void checkForAsyncQueue(String regionName, boolean synchronous, CacheConfig cacheConfig)
+ throws PreconditionException {
+ if (!synchronous) {
+ String queueName = createAsyncEventQueueName(regionName);
+ AsyncEventQueue asyncEventQueue = cacheConfig.getAsyncEventQueues().stream()
+ .filter(queue -> queue.getId().equals(queueName)).findFirst().orElse(null);
+ if (asyncEventQueue != null) {
+ throw new PreconditionException(
+ "An async-event-queue named " + queueName + " must not already exist.");
+ }
+ }
+ }
+
@Override
public void updateClusterConfig(String group, CacheConfig cacheConfig, Object element) {
- RegionMapping newCacheElement = (RegionMapping) element;
- RegionMapping existingCacheElement = cacheConfig.findCustomRegionElement(
- newCacheElement.getRegionName(), newCacheElement.getId(), RegionMapping.class);
-
- if (existingCacheElement != null) {
- cacheConfig
- .getRegions()
- .stream()
- .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
- .forEach(
- regionConfig -> regionConfig.getCustomRegionElements().remove(existingCacheElement));
+ Object[] arguments = (Object[]) element;
+ RegionMapping regionMapping = (RegionMapping) arguments[0];
+ boolean synchronous = (Boolean) arguments[1];
+ String regionName = regionMapping.getRegionName();
+ String queueName = createAsyncEventQueueName(regionName);
+ RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
+ if (regionConfig == null) {
+ return;
+ }
+ RegionAttributesType attributes = getRegionAttributes(regionConfig);
+ addMappingToRegion(regionMapping, regionConfig);
+ if (!synchronous) {
+ createAsyncQueue(cacheConfig, attributes, queueName);
+ }
+ alterRegion(queueName, attributes, synchronous);
+ }
+
+ private void alterRegion(String queueName, RegionAttributesType attributes, boolean synchronous) {
+ setCacheLoader(attributes);
+ if (synchronous) {
+ setCacheWriter(attributes);
+ } else {
+ addAsyncEventQueueId(queueName, attributes);
+ }
+ }
+
+ private void addMappingToRegion(RegionMapping newCacheElement, RegionConfig regionConfig) {
+ regionConfig.getCustomRegionElements().add(newCacheElement);
+ }
+
+ private RegionConfig findRegionConfig(CacheConfig cacheConfig, String regionName) {
+ return cacheConfig.getRegions().stream()
+ .filter(region -> region.getName().equals(regionName)).findFirst().orElse(null);
+ }
+
+ private void createAsyncQueue(CacheConfig cacheConfig, RegionAttributesType attributes,
+ String queueName) {
+ AsyncEventQueue asyncEventQueue = new AsyncEventQueue();
+ asyncEventQueue.setId(queueName);
+ boolean isPartitioned = attributes.getDataPolicy().equals(RegionAttributesDataPolicy.PARTITION)
+ || attributes.getDataPolicy().equals(RegionAttributesDataPolicy.PERSISTENT_PARTITION);
+ asyncEventQueue.setParallel(isPartitioned);
+ DeclarableType listener = new DeclarableType();
+ listener.setClassName(JdbcAsyncWriter.class.getName());
+ asyncEventQueue.setAsyncEventListener(listener);
+ cacheConfig.getAsyncEventQueues().add(asyncEventQueue);
+ }
+
+ private void addAsyncEventQueueId(String queueName, RegionAttributesType attributes) {
+ String asyncEventQueueList = attributes.getAsyncEventQueueIds();
+ if (asyncEventQueueList == null) {
+ asyncEventQueueList = "";
+ }
+ if (!asyncEventQueueList.contains(queueName)) {
+ if (asyncEventQueueList.length() > 0) {
+ asyncEventQueueList += ',';
+ }
+ asyncEventQueueList += queueName;
+ attributes.setAsyncEventQueueIds(asyncEventQueueList);
}
+ }
- cacheConfig
- .getRegions()
- .stream()
- .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
- .forEach(regionConfig -> regionConfig.getCustomRegionElements().add(newCacheElement));
+ private void setCacheLoader(RegionAttributesType attributes) {
+ DeclarableType loader = new DeclarableType();
+ loader.setClassName(JdbcLoader.class.getName());
+ attributes.setCacheLoader(loader);
+ }
+
+ private void setCacheWriter(RegionAttributesType attributes) {
+ DeclarableType writer = new DeclarableType();
+ writer.setClassName(JdbcWriter.class.getName());
+ attributes.setCacheWriter(writer);
+ }
+
+ private RegionAttributesType getRegionAttributes(RegionConfig regionConfig) {
+ RegionAttributesType attributes;
+ List<RegionAttributesType> attributesList = regionConfig.getRegionAttributes();
+ if (attributesList.isEmpty()) {
+ attributes = new RegionAttributesType();
+ attributesList.add(attributes);
+ } else {
+ attributes = attributesList.get(0);
+ }
+ return attributes;
}
}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
index ba71dfb..3d2e2ed 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
@@ -16,30 +16,49 @@ package org.apache.geode.connectors.jdbc.internal.cli;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
+import org.apache.geode.connectors.jdbc.JdbcWriter;
import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
import org.apache.geode.management.cli.CliFunction;
import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+/**
+ * The Object[] must always be of size two.
+ * The first element must be a RegionMapping.
+ * The second element must be a Boolean that is true if synchronous.
+ */
@Experimental
-public class CreateMappingFunction extends CliFunction<RegionMapping> {
+public class CreateMappingFunction extends CliFunction<Object[]> {
CreateMappingFunction() {
super();
}
@Override
- public CliFunctionResult executeFunction(FunctionContext<RegionMapping> context)
+ public CliFunctionResult executeFunction(FunctionContext<Object[]> context)
throws Exception {
JdbcConnectorService service = FunctionContextArgumentProvider.getJdbcConnectorService(context);
// input
- RegionMapping regionMapping = context.getArguments();
+ Object[] arguments = context.getArguments();
+ RegionMapping regionMapping = (RegionMapping) arguments[0];
+ boolean synchronous = (boolean) arguments[1];
- verifyRegionExists(context, regionMapping);
+ String regionName = regionMapping.getRegionName();
+ Region<?, ?> region = verifyRegionExists(context.getCache(), regionName);
// action
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+ if (!synchronous) {
+ createAsyncEventQueue(context.getCache(), queueName,
+ region.getAttributes().getDataPolicy().withPartitioning());
+ }
+ alterRegion(region, queueName, synchronous);
createRegionMapping(service, regionMapping);
// output
@@ -49,14 +68,38 @@ public class CreateMappingFunction extends CliFunction<RegionMapping> {
return new CliFunctionResult(member, true, message);
}
- private void verifyRegionExists(FunctionContext<RegionMapping> context,
- RegionMapping regionMapping) {
- Cache cache = context.getCache();
- String regionName = regionMapping.getRegionName();
- if (cache.getRegion(regionName) == null) {
+ /**
+ * Change the existing region to have
+ * the JdbcLoader as its cache-loader
+ * and the given async-event-queue as one of its queues.
+ */
+ private void alterRegion(Region<?, ?> region, String queueName, boolean synchronous) {
+ region.getAttributesMutator().setCacheLoader(new JdbcLoader());
+ if (synchronous) {
+ region.getAttributesMutator().setCacheWriter(new JdbcWriter());
+ } else {
+ region.getAttributesMutator().addAsyncEventQueueId(queueName);
+ }
+ }
+
+ /**
+ * Create an async-event-queue with the given name.
+ * For a partitioned region a parallel queue is created.
+ * Otherwise a serial queue is created.
+ */
+ private void createAsyncEventQueue(Cache cache, String queueName, boolean isPartitioned) {
+ AsyncEventQueueFactory asyncEventQueueFactory = cache.createAsyncEventQueueFactory();
+ asyncEventQueueFactory.setParallel(isPartitioned);
+ asyncEventQueueFactory.create(queueName, new JdbcAsyncWriter());
+ }
+
+ private Region<?, ?> verifyRegionExists(Cache cache, String regionName) {
+ Region<?, ?> result = cache.getRegion(regionName);
+ if (result == null) {
throw new IllegalStateException(
"create jdbc-mapping requires that the region \"" + regionName + "\" exists.");
}
+ return result;
}
/**
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/PreconditionException.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/PreconditionException.java
new file mode 100644
index 0000000..a40ae7e
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/PreconditionException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.cli;
+
+/**
+ * Used in gfsh commands that do precondition checks
+ * to indicate that a precondition check failed.
+ */
+@SuppressWarnings("serial")
+public class PreconditionException extends Exception {
+ public PreconditionException(String message) {
+ super(message);
+ }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
new file mode 100644
index 0000000..1513f91
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.cli;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
+import org.apache.geode.cache.configuration.CacheElement;
+import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.cache.configuration.RegionAttributesDataPolicy;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+import org.apache.geode.distributed.ConfigurationPersistenceService;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.cli.Result;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+
+public class CreateMappingCommandTest {
+
+ private InternalCache cache;
+ private CreateMappingCommand createRegionMappingCommand;
+
+ private String regionName;
+ private String dataSourceName;
+ private String tableName;
+ private String pdxClass;
+ private DistributionManager distributionManager;
+ private Set<InternalDistributedMember> members;
+ private List<CliFunctionResult> results;
+ private CliFunctionResult successFunctionResult;
+ private RegionMapping mapping;
+ private final Object[] arguments = new Object[2];
+ private CacheConfig cacheConfig;
+ RegionConfig matchingRegion;
+ RegionAttributesType matchingRegionAttributes;
+
+ @Before
+ public void setup() {
+ regionName = "regionName";
+ dataSourceName = "connection";
+ tableName = "testTable";
+ pdxClass = "myPdxClass";
+ cache = mock(InternalCache.class);
+ distributionManager = mock(DistributionManager.class);
+ when(cache.getDistributionManager()).thenReturn(distributionManager);
+ members = new HashSet<>();
+ members.add(mock(InternalDistributedMember.class));
+ when(distributionManager.getNormalDistributionManagerIds()).thenReturn(members);
+ createRegionMappingCommand = spy(CreateMappingCommand.class);
+ createRegionMappingCommand.setCache(cache);
+ results = new ArrayList<>();
+ successFunctionResult = mock(CliFunctionResult.class);
+ when(successFunctionResult.isSuccessful()).thenReturn(true);
+
+ doReturn(results).when(createRegionMappingCommand).executeAndGetFunctionResult(any(), any(),
+ any());
+
+ mapping = mock(RegionMapping.class);
+ when(mapping.getRegionName()).thenReturn(regionName);
+
+ cacheConfig = mock(CacheConfig.class);
+
+ matchingRegion = mock(RegionConfig.class);
+ when(matchingRegion.getName()).thenReturn(regionName);
+ List<RegionAttributesType> attributesList = new ArrayList<>();
+ matchingRegionAttributes = mock(RegionAttributesType.class);
+ when(matchingRegionAttributes.getDataPolicy()).thenReturn(RegionAttributesDataPolicy.REPLICATE);
+ attributesList.add(matchingRegionAttributes);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributesList);
+
+ arguments[0] = mapping;
+ arguments[1] = false;
+ }
+
+ private void setupRequiredPreconditions() {
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ List<RegionConfig> list = new ArrayList<>();
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ }
+
+ @Test
+ public void createsMappingReturnsStatusOKWhenFunctionResultSuccess() {
+ setupRequiredPreconditions();
+ results.add(successFunctionResult);
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, false);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.OK);
+ Object[] results = (Object[]) result.getConfigObject();
+ RegionMapping regionMapping = (RegionMapping) results[0];
+ boolean synchronous = (boolean) results[1];
+ assertThat(regionMapping).isNotNull();
+ assertThat(regionMapping.getRegionName()).isEqualTo(regionName);
+ assertThat(regionMapping.getDataSourceName()).isEqualTo(dataSourceName);
+ assertThat(regionMapping.getTableName()).isEqualTo(tableName);
+ assertThat(regionMapping.getPdxName()).isEqualTo(pdxClass);
+ assertThat(synchronous).isFalse();
+ }
+
+ @Test
+ public void createsMappingReturnsStatusERRORWhenFunctionResultIsEmpty() {
+ setupRequiredPreconditions();
+ results.clear();
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, false);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ }
+
+ @Test
+ public void createsMappingReturnsStatusERRORWhenClusterConfigIsDisabled() {
+ results.add(successFunctionResult);
+ doReturn(null).when(createRegionMappingCommand).getConfigurationPersistenceService();
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, false);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ assertThat(result.toString()).contains("Cluster Configuration must be enabled.");
+ }
+
+ @Test
+ public void createsMappingReturnsStatusERRORWhenClusterConfigDoesNotContainRegion() {
+ results.add(successFunctionResult);
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, false);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ assertThat(result.toString())
+ .contains("A region named " + regionName + " must already exist.");
+ }
+
+ @Test
+ public void createsMappingReturnsStatusERRORWhenRegionMappingExists() {
+ results.add(successFunctionResult);
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ List<RegionConfig> list = new ArrayList<>();
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<RegionAttributesType> attributes = new ArrayList<>();
+ RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+ DeclarableType loaderDeclarable = mock(DeclarableType.class);
+ when(loaderDeclarable.getClassName()).thenReturn(null);
+ when(loaderAttribute.getCacheLoader()).thenReturn(loaderDeclarable);
+ attributes.add(loaderAttribute);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+ List<CacheElement> customList = new ArrayList<>();
+ RegionMapping existingMapping = mock(RegionMapping.class);
+ customList.add(existingMapping);
+ when(matchingRegion.getCustomRegionElements()).thenReturn(customList);
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, false);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ assertThat(result.toString()).contains("A jdbc-mapping for " + regionName + " already exists.");
+ }
+
+ @Test
+ public void createsMappingReturnsStatusERRORWhenClusterConfigRegionHasLoader() {
+ results.add(successFunctionResult);
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ List<RegionConfig> list = new ArrayList<>();
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<RegionAttributesType> attributes = new ArrayList<>();
+ RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+ DeclarableType loaderDeclarable = mock(DeclarableType.class);
+ when(loaderDeclarable.getClassName()).thenReturn("MyCacheLoaderClass");
+ when(loaderAttribute.getCacheLoader()).thenReturn(loaderDeclarable);
+ attributes.add(loaderAttribute);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, false);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ assertThat(result.toString()).contains("The existing region " + regionName
+ + " must not already have a cache-loader, but it has MyCacheLoaderClass");
+ }
+
+ @Test
+ public void createMappingWithSynchronousReturnsStatusERRORWhenClusterConfigRegionHasWriter() {
+ results.add(successFunctionResult);
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ List<RegionConfig> list = new ArrayList<>();
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<RegionAttributesType> attributes = new ArrayList<>();
+ RegionAttributesType writerAttribute = mock(RegionAttributesType.class);
+ DeclarableType writerDeclarable = mock(DeclarableType.class);
+ when(writerDeclarable.getClassName()).thenReturn("MyCacheWriterClass");
+ when(writerAttribute.getCacheWriter()).thenReturn(writerDeclarable);
+ attributes.add(writerAttribute);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, true);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ assertThat(result.toString()).contains("The existing region " + regionName
+ + " must not already have a cache-writer, but it has MyCacheWriterClass");
+ }
+
+ @Test
+ public void createMappingWithSynchronousReturnsStatusOKWhenAsycnEventQueueAlreadyExists() {
+ results.add(successFunctionResult);
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ List<RegionConfig> list = new ArrayList<>();
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<RegionAttributesType> attributes = new ArrayList<>();
+ RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+ when(loaderAttribute.getCacheLoader()).thenReturn(null);
+ attributes.add(loaderAttribute);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+ List<AsyncEventQueue> asyncEventQueues = new ArrayList<>();
+ AsyncEventQueue matchingQueue = mock(AsyncEventQueue.class);
+ String queueName = createRegionMappingCommand.createAsyncEventQueueName(regionName);
+ when(matchingQueue.getId()).thenReturn(queueName);
+ asyncEventQueues.add(matchingQueue);
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(asyncEventQueues);
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, true);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.OK);
+ }
+
+
+ @Test
+ public void createsMappingReturnsStatusERRORWhenAsycnEventQueueAlreadyExists() {
+ results.add(successFunctionResult);
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ List<RegionConfig> list = new ArrayList<>();
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<RegionAttributesType> attributes = new ArrayList<>();
+ RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+ when(loaderAttribute.getCacheLoader()).thenReturn(null);
+ attributes.add(loaderAttribute);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+ List<AsyncEventQueue> asyncEventQueues = new ArrayList<>();
+ AsyncEventQueue matchingQueue = mock(AsyncEventQueue.class);
+ String queueName = createRegionMappingCommand.createAsyncEventQueueName(regionName);
+ when(matchingQueue.getId()).thenReturn(queueName);
+ asyncEventQueues.add(matchingQueue);
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(asyncEventQueues);
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass, false);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ assertThat(result.toString())
+ .contains("An async-event-queue named " + queueName + " must not already exist.");
+ }
+
+ @Test
+ public void updateClusterConfigWithNoRegionsDoesNotThrowException() {
+ when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAddsMappingToRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ assertThat(listCacheElements.size()).isEqualTo(1);
+ assertThat(listCacheElements).contains(mapping);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneNonMatchingRegionDoesNotAddMapping() {
+ List<RegionConfig> list = new ArrayList<>();
+ RegionConfig nonMatchingRegion = mock(RegionConfig.class);
+ when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(nonMatchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(nonMatchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ assertThat(listCacheElements).isEmpty();
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionCreatesAsyncEventQueue() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ assertThat(queueList.size()).isEqualTo(1);
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+ AsyncEventQueue createdQueue = queueList.get(0);
+ assertThat(createdQueue.getId()).isEqualTo(queueName);
+ assertThat(createdQueue.isParallel()).isFalse();
+ assertThat(createdQueue.getAsyncEventListener().getClassName())
+ .isEqualTo(JdbcAsyncWriter.class.getName());
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingPartitionedRegionCreatesParallelAsyncEventQueue() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getDataPolicy())
+ .thenReturn(RegionAttributesDataPolicy.PARTITION);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ assertThat(queueList.get(0).isParallel()).isTrue();
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionCallsSetCacheLoader() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ verify(matchingRegionAttributes).setCacheLoader(any());
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndNullQueuesAddsAsyncEventQueueIdToRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(null);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+ assertThat(argument.getValue()).isEqualTo(queueName);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndEmptyQueuesAddsAsyncEventQueueIdToRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("");
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+ assertThat(argument.getValue()).isEqualTo(queueName);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndExistingQueuesAddsAsyncEventQueueIdToRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("q1,q2");
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+ assertThat(argument.getValue()).isEqualTo("q1,q2," + queueName);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndQueuesContainingDuplicateDoesNotModifyAsyncEventQueueIdOnRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(regionName);
+ String existingQueues = "q1," + queueName + ",q2";
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(existingQueues);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ verify(matchingRegionAttributes, never()).setAsyncEventQueueIds(any());
+ }
+
+ @Test
+ public void updateClusterConfigWithSynchronousSetsTheCacheWriterOnTheMatchingRegion() {
+ arguments[1] = true;
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ verify(matchingRegionAttributes).setCacheWriter(any());
+ }
+
+ @Test
+ public void updateClusterConfigWithSynchronousAndOneMatchingRegionAndExistingQueuesDoesNotAddsAsyncEventQueueIdToRegion() {
+ arguments[1] = true;
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("q1,q2");
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ verify(matchingRegionAttributes, never()).setAsyncEventQueueIds(any());
+ }
+
+ @Test
+ public void updateClusterConfigWithSynchronousAndOneMatchingRegionDoesNotCreateAsyncEventQueue() {
+ arguments[1] = true;
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, arguments);
+
+ assertThat(queueList).isEmpty();
+ }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
index acba1cd..955b57f 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
@@ -18,9 +18,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -32,7 +34,11 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
@@ -48,11 +54,15 @@ public class CreateMappingFunctionTest {
private static final String REGION_NAME = "testRegion";
private RegionMapping regionMapping;
- private FunctionContext<RegionMapping> context;
+ private FunctionContext<Object[]> context;
private DistributedMember distributedMember;
private ResultSender<Object> resultSender;
private JdbcConnectorService service;
private InternalCache cache;
+ private Region region;
+ private RegionAttributes attributes;
+ private AsyncEventQueueFactory asyncEventQueueFactory;
+ private final Object[] functionInputs = new Object[2];
private CreateMappingFunction function;
@@ -61,7 +71,7 @@ public class CreateMappingFunctionTest {
context = mock(FunctionContext.class);
resultSender = mock(ResultSender.class);
cache = mock(InternalCache.class);
- Region region = mock(Region.class);
+ region = mock(Region.class);
DistributedSystem system = mock(DistributedSystem.class);
distributedMember = mock(DistributedMember.class);
service = mock(JdbcConnectorService.class);
@@ -73,12 +83,29 @@ public class CreateMappingFunctionTest {
when(cache.getDistributedSystem()).thenReturn(system);
when(cache.getRegion(REGION_NAME)).thenReturn(region);
when(system.getDistributedMember()).thenReturn(distributedMember);
- when(context.getArguments()).thenReturn(regionMapping);
+ functionInputs[0] = regionMapping;
+ setupAsynchronous();
+ when(context.getArguments()).thenReturn(functionInputs);
when(cache.getService(eq(JdbcConnectorService.class))).thenReturn(service);
+ attributes = mock(RegionAttributes.class);
+ when(attributes.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+ when(region.getAttributes()).thenReturn(attributes);
+ when(region.getAttributesMutator()).thenReturn(mock(AttributesMutator.class));
+ asyncEventQueueFactory = mock(AsyncEventQueueFactory.class);
+ when(cache.createAsyncEventQueueFactory()).thenReturn(asyncEventQueueFactory);
function = new CreateMappingFunction();
}
+ private void setupAsynchronous() {
+ functionInputs[1] = false;
+ }
+
+ private void setupSynchronous() {
+ functionInputs[1] = true;
+ }
+
+
@Test
public void isHAReturnsFalse() {
assertThat(function.isHA()).isFalse();
@@ -130,9 +157,72 @@ public class CreateMappingFunctionTest {
@Test
public void executeCreatesMapping() throws Exception {
- function.execute(context);
+ function.executeFunction(context);
+
+ verify(service, times(1)).createRegionMapping(regionMapping);
+ }
+
+ @Test
+ public void executeAlterRegionLoader() throws Exception {
+ function.executeFunction(context);
+
+ verify(service, times(1)).createRegionMapping(regionMapping);
+ AttributesMutator mutator = region.getAttributesMutator();
+ verify(mutator, times(1)).setCacheLoader(any());
+ }
+
+ @Test
+ public void executeWithSynchronousAltersRegionWriter() throws Exception {
+ setupSynchronous();
+ function.executeFunction(context);
+
+ AttributesMutator mutator = region.getAttributesMutator();
+ verify(mutator, times(1)).setCacheWriter(any());
+ }
+
+ @Test
+ public void executeWithSynchronousNeverAltersRegionAsyncEventQueue() throws Exception {
+ setupSynchronous();
+ function.executeFunction(context);
+
+ AttributesMutator mutator = region.getAttributesMutator();
+ verify(mutator, never()).addAsyncEventQueueId(any());
+ }
+
+ @Test
+ public void executeWithSynchronousNeverCreatesAsyncQueue() throws Exception {
+ setupSynchronous();
+ function.executeFunction(context);
+
+ verify(asyncEventQueueFactory, never()).create(any(), any());
+ }
+
+ @Test
+ public void executeAlterRegionAsyncEventQueue() throws Exception {
+ String queueName = CreateMappingCommand.createAsyncEventQueueName(REGION_NAME);
+ function.executeFunction(context);
verify(service, times(1)).createRegionMapping(regionMapping);
+ AttributesMutator mutator = region.getAttributesMutator();
+ verify(mutator, times(1)).addAsyncEventQueueId(queueName);
+ }
+
+ @Test
+ public void executeCreatesSerialAsyncQueueForNonPartitionedRegion() throws Exception {
+ when(attributes.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+ function.executeFunction(context);
+
+ verify(asyncEventQueueFactory, times(1)).create(any(), any());
+ verify(asyncEventQueueFactory, times(1)).setParallel(false);
+ }
+
+ @Test
+ public void executeCreatesParallelAsyncQueueForPartitionedRegion() throws Exception {
+ when(attributes.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+ function.executeFunction(context);
+
+ verify(asyncEventQueueFactory, times(1)).create(any(), any());
+ verify(asyncEventQueueFactory, times(1)).setParallel(true);
}
@Test