You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/11/09 20:36:19 UTC
[geode] branch feature/GEODE-6010 updated: implemented and unit
tested the cluster config changes to CreateMappingCommand
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch feature/GEODE-6010
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6010 by this push:
new d685fee implemented and unit tested the cluster config changes to CreateMappingCommand
d685fee is described below
commit d685feedad0756afc90fc8feaafca34d007cce4c
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Fri Nov 9 12:35:23 2018 -0800
implemented and unit tested the cluster config changes to CreateMappingCommand
---
.../jdbc/internal/cli/CreateMappingCommand.java | 75 +++++++++--
.../internal/cli/CreateMappingCommandTest.java | 139 ++++++++++++++++++++-
2 files changed, 199 insertions(+), 15 deletions(-)
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index f3a00de..21ef915 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -28,6 +28,7 @@ import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
import org.apache.geode.cache.configuration.DeclarableType;
import org.apache.geode.cache.configuration.RegionAttributesType;
import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
import org.apache.geode.distributed.ConfigurationPersistenceService;
import org.apache.geode.distributed.DistributedMember;
@@ -83,9 +84,7 @@ public class CreateMappingCommand extends SingleGfshCommand {
CacheConfig cacheConfig = configurationPersistenceService.getCacheConfig(null);
- RegionConfig regionConfig = cacheConfig.getRegions().stream()
- .filter(region -> region.getName().equals(mapping.getRegionName())).findFirst()
- .orElse(null);
+ RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
if (regionConfig == null) {
return ResultModel
.createError("A region named " + regionName + " must already exist.");
@@ -133,18 +132,70 @@ public class CreateMappingCommand extends SingleGfshCommand {
@Override
public void updateClusterConfig(String group, CacheConfig cacheConfig, Object element) {
RegionMapping newCacheElement = (RegionMapping) element;
- RegionConfig regionConfig = cacheConfig.getRegions().stream()
- .filter(region -> region.getName().equals(newCacheElement.getRegionName())).findFirst()
- .orElse(null);
- if (regionConfig != null) {
- regionConfig.getCustomRegionElements().add(newCacheElement);
+ String regionName = newCacheElement.getRegionName();
+ String queueName = getAsyncEventQueueName(regionName);
+ RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
+ if (regionConfig == null) {
+ return;
}
+ RegionAttributesType attributes = getRegionAttributes(regionConfig);
+ addMappingToRegion(newCacheElement, regionConfig);
+ createAsyncQueue(cacheConfig, attributes, queueName);
+ alterRegion(queueName, attributes);
+ }
+
+ private void alterRegion(String queueName, RegionAttributesType attributes) {
+ setCacheLoader(attributes);
+ addAsyncEventQueueId(queueName, attributes);
+ }
+
+ private void addMappingToRegion(RegionMapping newCacheElement, RegionConfig regionConfig) {
+ regionConfig.getCustomRegionElements().add(newCacheElement);
+ }
+
+ private RegionConfig findRegionConfig(CacheConfig cacheConfig, String regionName) {
+ return cacheConfig.getRegions().stream()
+ .filter(region -> region.getName().equals(regionName)).findFirst().orElse(null);
+ }
+
+ private void createAsyncQueue(CacheConfig cacheConfig, RegionAttributesType attributes,
+ String queueName) {
AsyncEventQueue asyncEventQueue = new AsyncEventQueue();
- asyncEventQueue.setId(getAsyncEventQueueName(newCacheElement.getRegionName()));
- boolean isPartitioned = regionConfig.getRegionAttributes().stream()
- .anyMatch(attributes -> attributes.getPartitionAttributes() != null);
+ asyncEventQueue.setId(queueName);
+ boolean isPartitioned = attributes.getPartitionAttributes() != null;
asyncEventQueue.setParallel(isPartitioned);
cacheConfig.getAsyncEventQueues().add(asyncEventQueue);
- // TODO alter region config
+ }
+
+ private void addAsyncEventQueueId(String queueName, RegionAttributesType attributes) {
+ String asyncEventQueueList = attributes.getAsyncEventQueueIds();
+ if (asyncEventQueueList == null) {
+ asyncEventQueueList = "";
+ }
+ if (!asyncEventQueueList.contains(queueName)) {
+ if (asyncEventQueueList.length() > 0) {
+ asyncEventQueueList += ',';
+ }
+ asyncEventQueueList += queueName;
+ attributes.setAsyncEventQueueIds(asyncEventQueueList);
+ }
+ }
+
+ private void setCacheLoader(RegionAttributesType attributes) {
+ DeclarableType loader = new DeclarableType();
+ loader.setClassName(JdbcLoader.class.getName());
+ attributes.setCacheLoader(loader);
+ }
+
+ private RegionAttributesType getRegionAttributes(RegionConfig regionConfig) {
+ RegionAttributesType attributes;
+ List<RegionAttributesType> attributesList = regionConfig.getRegionAttributes();
+ if (attributesList.isEmpty()) {
+ attributes = new RegionAttributesType();
+ attributesList.add(attributes);
+ } else {
+ attributes = attributesList.get(0);
+ }
+ return attributes;
}
}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
index 830075e..c10f8b1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
@@ -18,7 +18,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -29,12 +31,14 @@ import java.util.Set;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
import org.apache.geode.cache.configuration.CacheElement;
import org.apache.geode.cache.configuration.DeclarableType;
import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionAttributesType.PartitionAttributes;
import org.apache.geode.cache.configuration.RegionConfig;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
import org.apache.geode.distributed.ConfigurationPersistenceService;
@@ -61,6 +65,7 @@ public class CreateMappingCommandTest {
private RegionMapping mapping;
private CacheConfig cacheConfig;
RegionConfig matchingRegion;
+ RegionAttributesType matchingRegionAttributes;
@Before
public void setup() {
@@ -90,6 +95,10 @@ public class CreateMappingCommandTest {
matchingRegion = mock(RegionConfig.class);
when(matchingRegion.getName()).thenReturn(regionName);
+ List<RegionAttributesType> attributesList = new ArrayList<>();
+ matchingRegionAttributes = mock(RegionAttributesType.class);
+ attributesList.add(matchingRegionAttributes);
+ when(matchingRegion.getRegionAttributes()).thenReturn(attributesList);
}
@@ -253,14 +262,14 @@ public class CreateMappingCommandTest {
}
@Test
- public void testUpdateClusterConfigWithNoRegions() {
+ public void updateClusterConfigWithNoRegionsDoesNotThrowException() {
when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
}
@Test
- public void testUpdateClusterConfigWithOneMatchingRegion() {
+ public void updateClusterConfigWithOneMatchingRegionAddsMappingToRegion() {
List<RegionConfig> list = new ArrayList<>();
List<CacheElement> listCacheElements = new ArrayList<>();
when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
@@ -274,7 +283,7 @@ public class CreateMappingCommandTest {
}
@Test
- public void testUpdateClusterConfigWithOneNonMatchingRegion() {
+ public void updateClusterConfigWithOneNonMatchingRegionDoesNotAddMapping() {
List<RegionConfig> list = new ArrayList<>();
RegionConfig nonMatchingRegion = mock(RegionConfig.class);
when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
@@ -288,4 +297,128 @@ public class CreateMappingCommandTest {
assertThat(listCacheElements).isEmpty();
}
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionCreatesAsyncEventQueue() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+ assertThat(queueList.size()).isEqualTo(1);
+ String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+ assertThat(queueList.get(0).getId()).isEqualTo(queueName);
+ assertThat(queueList.get(0).isParallel()).isFalse();
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingPartitionedRegionCreatesParallelAsyncEventQueue() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getPartitionAttributes())
+ .thenReturn(mock(PartitionAttributes.class));
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+ assertThat(queueList.get(0).isParallel()).isTrue();
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionCallsSetCacheLoader() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+ verify(matchingRegionAttributes).setCacheLoader(any());
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndNullQueuesAddsAsyncEventQueueIdToRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(null);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+ String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+ assertThat(argument.getValue()).isEqualTo(queueName);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndEmptyQueuesAddsAsyncEventQueueIdToRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("");
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+ String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+ assertThat(argument.getValue()).isEqualTo(queueName);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndExistingQueuesAddsAsyncEventQueueIdToRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("q1,q2");
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+ String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+ assertThat(argument.getValue()).isEqualTo("q1,q2," + queueName);
+ }
+
+ @Test
+ public void updateClusterConfigWithOneMatchingRegionAndQueuesContainingDuplicateDoesNotModifyAsyncEventQueueIdOnRegion() {
+ List<RegionConfig> list = new ArrayList<>();
+ List<CacheElement> listCacheElements = new ArrayList<>();
+ when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+ list.add(matchingRegion);
+ when(cacheConfig.getRegions()).thenReturn(list);
+ List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+ when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+ String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+ String existingQueues = "q1," + queueName + ",q2";
+ when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(existingQueues);
+
+ createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+ verify(matchingRegionAttributes, never()).setAsyncEventQueueIds(any());
+ }
}