You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/11/09 01:00:42 UTC
[geode] branch feature/GEODE-6010 updated: wip: function now
creates async-event-queue and alters regions. need to change
updateClusterConfig to alter the region config.
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch feature/GEODE-6010
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6010 by this push:
new 19198b4 wip: function now creates async-event-queue and alters regions. need to change updateClusterConfig to alter the region config.
19198b4 is described below
commit 19198b4b6aaf4d36ff17b727eaa76f2e04ae300c
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Nov 8 16:59:22 2018 -0800
wip: function now creates async-event-queue and alters regions.
need to change updateClusterConfig to alter the region config.
---
.../jdbc/internal/cli/CreateMappingCommand.java | 36 +++++-----
.../jdbc/internal/cli/CreateMappingFunction.java | 41 +++++++++--
.../internal/cli/CreateMappingCommandTest.java | 79 ++++++++++------------
.../internal/cli/CreateMappingFunctionTest.java | 54 ++++++++++++++-
4 files changed, 142 insertions(+), 68 deletions(-)
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index 0218e61..f3a00de 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -91,6 +91,12 @@ public class CreateMappingCommand extends SingleGfshCommand {
.createError("A region named " + regionName + " must already exist.");
}
+ if (regionConfig.getCustomRegionElements().stream()
+ .anyMatch(element -> element instanceof RegionMapping)) {
+ return ResultModel
+ .createError("A jdbc-mapping for " + regionName + " already exists.");
+ }
+
RegionAttributesType regionAttributes = regionConfig.getRegionAttributes().stream()
.filter(attributes -> attributes.getCacheLoader() != null).findFirst().orElse(null);
if (regionAttributes != null) {
@@ -120,29 +126,25 @@ public class CreateMappingCommand extends SingleGfshCommand {
return result;
}
- String getAsyncEventQueueName(String regionName) {
+ static String getAsyncEventQueueName(String regionName) {
return "JDBC-" + regionName;
}
@Override
public void updateClusterConfig(String group, CacheConfig cacheConfig, Object element) {
RegionMapping newCacheElement = (RegionMapping) element;
- RegionMapping existingCacheElement = cacheConfig.findCustomRegionElement(
- newCacheElement.getRegionName(), newCacheElement.getId(), RegionMapping.class);
-
- if (existingCacheElement != null) {
- cacheConfig
- .getRegions()
- .stream()
- .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
- .forEach(
- regionConfig -> regionConfig.getCustomRegionElements().remove(existingCacheElement));
+ RegionConfig regionConfig = cacheConfig.getRegions().stream()
+ .filter(region -> region.getName().equals(newCacheElement.getRegionName())).findFirst()
+ .orElse(null);
+ if (regionConfig != null) {
+ regionConfig.getCustomRegionElements().add(newCacheElement);
}
-
- cacheConfig
- .getRegions()
- .stream()
- .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
- .forEach(regionConfig -> regionConfig.getCustomRegionElements().add(newCacheElement));
+ AsyncEventQueue asyncEventQueue = new AsyncEventQueue();
+ asyncEventQueue.setId(getAsyncEventQueueName(newCacheElement.getRegionName()));
+ boolean isPartitioned = regionConfig.getRegionAttributes().stream()
+ .anyMatch(attributes -> attributes.getPartitionAttributes() != null);
+ asyncEventQueue.setParallel(isPartitioned);
+ cacheConfig.getAsyncEventQueues().add(asyncEventQueue);
+ // TODO alter region config
}
}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
index ba71dfb..ed03ff7 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
@@ -16,7 +16,11 @@ package org.apache.geode.connectors.jdbc.internal.cli;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
@@ -37,9 +41,14 @@ public class CreateMappingFunction extends CliFunction<RegionMapping> {
// input
RegionMapping regionMapping = context.getArguments();
- verifyRegionExists(context, regionMapping);
+ String regionName = regionMapping.getRegionName();
+ Region<?, ?> region = verifyRegionExists(context, regionName);
// action
+ String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+ createAsyncEventQueue(context.getCache(), queueName,
+ region.getAttributes().getPartitionAttributes() != null);
+ alterRegion(region, queueName);
createRegionMapping(service, regionMapping);
// output
@@ -49,14 +58,36 @@ public class CreateMappingFunction extends CliFunction<RegionMapping> {
return new CliFunctionResult(member, true, message);
}
- private void verifyRegionExists(FunctionContext<RegionMapping> context,
- RegionMapping regionMapping) {
+ /**
+ * Change the existing region to have
+ * the JdbcLoader as its cache-loader
+ * and the given async-event-queue as one of its queues.
+ */
+ private void alterRegion(Region<?, ?> region, String queueName) {
+ region.getAttributesMutator().setCacheLoader(new JdbcLoader());
+ region.getAttributesMutator().addAsyncEventQueueId(queueName);
+ }
+
+ /**
+ * Create an async-event-queue with the given name.
+ * For a partitioned region a parallel queue is created.
+ * Otherwise a serial queue is created.
+ */
+ private void createAsyncEventQueue(Cache cache, String queueName, boolean isPartitioned) {
+ AsyncEventQueueFactory asyncEventQueueFactory = cache.createAsyncEventQueueFactory();
+ asyncEventQueueFactory.setParallel(isPartitioned);
+ asyncEventQueueFactory.create(queueName, new JdbcAsyncWriter());
+ }
+
+ private Region<?, ?> verifyRegionExists(FunctionContext<RegionMapping> context,
+ String regionName) {
Cache cache = context.getCache();
- String regionName = regionMapping.getRegionName();
- if (cache.getRegion(regionName) == null) {
+ Region<?, ?> result = cache.getRegion(regionName);
+ if (result == null) {
throw new IllegalStateException(
"create jdbc-mapping requires that the region \"" + regionName + "\" exists.");
}
+ return result;
}
/**
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
index 37d7901..830075e 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
@@ -163,6 +163,37 @@ public class CreateMappingCommandTest {
}
@Test
+ public void createsMappingReturnsStatusERRORWhenRegionMappingExists() {
+ results.add(successFunctionResult);
+ ConfigurationPersistenceService configurationPersistenceService =
+ mock(ConfigurationPersistenceService.class);
+ doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+ .getConfigurationPersistenceService();
+ when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+ List<RegionConfig> list = new ArrayList<>();
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<RegionAttributesType> attributes = new ArrayList<>();
+ RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+ DeclarableType loaderDeclarable = mock(DeclarableType.class);
+ when(loaderDeclarable.getClassName()).thenReturn(null);
+ when(loaderAttribute.getCacheLoader()).thenReturn(loaderDeclarable);
+ attributes.add(loaderAttribute);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+ List<CacheElement> customList = new ArrayList<>();
+ RegionMapping existingMapping = mock(RegionMapping.class);
+ customList.add(existingMapping);
+ when(matchingRegion.getCustomRegionElements()).thenReturn(customList);
+
+ ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+ tableName, pdxClass);
+
+ assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+ assertThat(result.toString()).contains("A jdbc-mapping for " + regionName + " already exists.");
+ }
+
+
+ @Test
public void createsMappingReturnsStatusERRORWhenClusterConfigRegionHasLoader() {
results.add(successFunctionResult);
ConfigurationPersistenceService configurationPersistenceService =
@@ -222,53 +253,17 @@ public class CreateMappingCommandTest {
}
@Test
- public void testUpdateClusterConfigWithNoRegionsAndNoExistingElement() {
- doReturn(null).when(cacheConfig).findCustomRegionElement(any(), any(), any());
+ public void testUpdateClusterConfigWithNoRegions() {
when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
- createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
- }
-
- @Test
- public void testUpdateClusterConfigWithOneMatchingRegionAndNoExistingElement() {
- doReturn(null).when(cacheConfig).findCustomRegionElement(any(), any(), any());
- List<RegionConfig> list = new ArrayList<>();
- List<CacheElement> listCacheElements = new ArrayList<>();
- when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
- list.add(matchingRegion);
- when(cacheConfig.getRegions()).thenReturn(list);
createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
-
- assertThat(listCacheElements.size()).isEqualTo(1);
- assertThat(listCacheElements).contains(mapping);
}
@Test
- public void testUpdateClusterConfigWithOneNonMatchingRegionAndNoExistingElement() {
- doReturn(null).when(cacheConfig).findCustomRegionElement(any(), any(), any());
+ public void testUpdateClusterConfigWithOneMatchingRegion() {
List<RegionConfig> list = new ArrayList<>();
- RegionConfig nonMatchingRegion = mock(RegionConfig.class);
- when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
- List<CacheElement> listCacheElements = new ArrayList<>();
- when(nonMatchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
- list.add(nonMatchingRegion);
- when(cacheConfig.getRegions()).thenReturn(list);
-
- createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
-
- assertThat(listCacheElements).isEmpty();
- }
-
- @Test
- public void testUpdateClusterConfigWithOneMatchingRegionAndExistingElement() {
- RegionMapping existingMapping = mock(RegionMapping.class);
- doReturn(existingMapping).when(cacheConfig).findCustomRegionElement(any(), any(), any());
- RegionConfig matchingRegion = mock(RegionConfig.class);
- when(matchingRegion.getName()).thenReturn(regionName);
List<CacheElement> listCacheElements = new ArrayList<>();
- listCacheElements.add(existingMapping);
when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
- List<RegionConfig> list = new ArrayList<>();
list.add(matchingRegion);
when(cacheConfig.getRegions()).thenReturn(list);
@@ -279,22 +274,18 @@ public class CreateMappingCommandTest {
}
@Test
- public void testUpdateClusterConfigWithOneNonMatchingRegionAndExistingElement() {
- RegionMapping existingMapping = mock(RegionMapping.class);
- doReturn(existingMapping).when(cacheConfig).findCustomRegionElement(any(), any(), any());
+ public void testUpdateClusterConfigWithOneNonMatchingRegion() {
List<RegionConfig> list = new ArrayList<>();
RegionConfig nonMatchingRegion = mock(RegionConfig.class);
when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
List<CacheElement> listCacheElements = new ArrayList<>();
- listCacheElements.add(existingMapping);
when(nonMatchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
list.add(nonMatchingRegion);
when(cacheConfig.getRegions()).thenReturn(list);
createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
- assertThat(listCacheElements.size()).isEqualTo(1);
- assertThat(listCacheElements).contains(existingMapping);
+ assertThat(listCacheElements).isEmpty();
}
}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
index eac2343..eb54614 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
@@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -32,7 +33,11 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
@@ -53,6 +58,8 @@ public class CreateMappingFunctionTest {
private ResultSender<Object> resultSender;
private JdbcConnectorService service;
private InternalCache cache;
+ private Region region;
+ private AsyncEventQueueFactory asyncEventQueueFactory;
private CreateMappingFunction function;
@@ -61,7 +68,7 @@ public class CreateMappingFunctionTest {
context = mock(FunctionContext.class);
resultSender = mock(ResultSender.class);
cache = mock(InternalCache.class);
- Region region = mock(Region.class);
+ region = mock(Region.class);
DistributedSystem system = mock(DistributedSystem.class);
distributedMember = mock(DistributedMember.class);
service = mock(JdbcConnectorService.class);
@@ -75,6 +82,10 @@ public class CreateMappingFunctionTest {
when(system.getDistributedMember()).thenReturn(distributedMember);
when(context.getArguments()).thenReturn(regionMapping);
when(cache.getService(eq(JdbcConnectorService.class))).thenReturn(service);
+ when(region.getAttributes()).thenReturn(mock(RegionAttributes.class));
+ when(region.getAttributesMutator()).thenReturn(mock(AttributesMutator.class));
+ asyncEventQueueFactory = mock(AsyncEventQueueFactory.class);
+ when(cache.createAsyncEventQueueFactory()).thenReturn(asyncEventQueueFactory);
function = new CreateMappingFunction();
}
@@ -130,9 +141,48 @@ public class CreateMappingFunctionTest {
@Test
public void executeCreatesMapping() throws Exception {
- function.execute(context);
+ function.executeFunction(context);
+
+ verify(service, times(1)).createRegionMapping(regionMapping);
+ }
+
+ @Test
+ public void executeAlterRegionLoader() throws Exception {
+ function.executeFunction(context);
verify(service, times(1)).createRegionMapping(regionMapping);
+ AttributesMutator mutator = region.getAttributesMutator();
+ verify(mutator, times(1)).setCacheLoader(any());
+ }
+
+ @Test
+ public void executeAlterRegionAsyncEventQueue() throws Exception {
+ String queueName = CreateMappingCommand.getAsyncEventQueueName(REGION_NAME);
+ function.executeFunction(context);
+
+ verify(service, times(1)).createRegionMapping(regionMapping);
+ AttributesMutator mutator = region.getAttributesMutator();
+ verify(mutator, times(1)).addAsyncEventQueueId(queueName);
+ }
+
+ @Test
+ public void executeCreatesSerialAsyncQueueForNonPartitionedRegion() throws Exception {
+ RegionAttributes attributes = region.getAttributes();
+ when(attributes.getPartitionAttributes()).thenReturn(null);
+ function.executeFunction(context);
+
+ verify(asyncEventQueueFactory, times(1)).create(any(), any());
+ verify(asyncEventQueueFactory, times(1)).setParallel(false);
+ }
+
+ @Test
+ public void executeCreatesParallelAsyncQueueForPartitionedRegion() throws Exception {
+ RegionAttributes attributes = region.getAttributes();
+ when(attributes.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
+ function.executeFunction(context);
+
+ verify(asyncEventQueueFactory, times(1)).create(any(), any());
+ verify(asyncEventQueueFactory, times(1)).setParallel(true);
}
@Test