You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/11/09 20:36:19 UTC

[geode] branch feature/GEODE-6010 updated: implemented and unit tested the cluster config changes to CreateMappingCommand

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch feature/GEODE-6010
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6010 by this push:
     new d685fee  implemented and unit tested the cluster config changes to CreateMappingCommand
d685fee is described below

commit d685feedad0756afc90fc8feaafca34d007cce4c
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Fri Nov 9 12:35:23 2018 -0800

    implemented and unit tested the cluster config changes to CreateMappingCommand
---
 .../jdbc/internal/cli/CreateMappingCommand.java    |  75 +++++++++--
 .../internal/cli/CreateMappingCommandTest.java     | 139 ++++++++++++++++++++-
 2 files changed, 199 insertions(+), 15 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index f3a00de..21ef915 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -28,6 +28,7 @@ import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
 import org.apache.geode.cache.configuration.DeclarableType;
 import org.apache.geode.cache.configuration.RegionAttributesType;
 import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.distributed.ConfigurationPersistenceService;
 import org.apache.geode.distributed.DistributedMember;
@@ -83,9 +84,7 @@ public class CreateMappingCommand extends SingleGfshCommand {
 
     CacheConfig cacheConfig = configurationPersistenceService.getCacheConfig(null);
 
-    RegionConfig regionConfig = cacheConfig.getRegions().stream()
-        .filter(region -> region.getName().equals(mapping.getRegionName())).findFirst()
-        .orElse(null);
+    RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
     if (regionConfig == null) {
       return ResultModel
           .createError("A region named " + regionName + " must already exist.");
@@ -133,18 +132,70 @@ public class CreateMappingCommand extends SingleGfshCommand {
   @Override
   public void updateClusterConfig(String group, CacheConfig cacheConfig, Object element) {
     RegionMapping newCacheElement = (RegionMapping) element;
-    RegionConfig regionConfig = cacheConfig.getRegions().stream()
-        .filter(region -> region.getName().equals(newCacheElement.getRegionName())).findFirst()
-        .orElse(null);
-    if (regionConfig != null) {
-      regionConfig.getCustomRegionElements().add(newCacheElement);
+    String regionName = newCacheElement.getRegionName();
+    String queueName = getAsyncEventQueueName(regionName);
+    RegionConfig regionConfig = findRegionConfig(cacheConfig, regionName);
+    if (regionConfig == null) {
+      return;
     }
+    RegionAttributesType attributes = getRegionAttributes(regionConfig);
+    addMappingToRegion(newCacheElement, regionConfig);
+    createAsyncQueue(cacheConfig, attributes, queueName);
+    alterRegion(queueName, attributes);
+  }
+
+  private void alterRegion(String queueName, RegionAttributesType attributes) {
+    setCacheLoader(attributes);
+    addAsyncEventQueueId(queueName, attributes);
+  }
+
+  private void addMappingToRegion(RegionMapping newCacheElement, RegionConfig regionConfig) {
+    regionConfig.getCustomRegionElements().add(newCacheElement);
+  }
+
+  private RegionConfig findRegionConfig(CacheConfig cacheConfig, String regionName) {
+    return cacheConfig.getRegions().stream()
+        .filter(region -> region.getName().equals(regionName)).findFirst().orElse(null);
+  }
+
+  private void createAsyncQueue(CacheConfig cacheConfig, RegionAttributesType attributes,
+      String queueName) {
     AsyncEventQueue asyncEventQueue = new AsyncEventQueue();
-    asyncEventQueue.setId(getAsyncEventQueueName(newCacheElement.getRegionName()));
-    boolean isPartitioned = regionConfig.getRegionAttributes().stream()
-        .anyMatch(attributes -> attributes.getPartitionAttributes() != null);
+    asyncEventQueue.setId(queueName);
+    boolean isPartitioned = attributes.getPartitionAttributes() != null;
     asyncEventQueue.setParallel(isPartitioned);
     cacheConfig.getAsyncEventQueues().add(asyncEventQueue);
-    // TODO alter region config
+  }
+
+  private void addAsyncEventQueueId(String queueName, RegionAttributesType attributes) {
+    String asyncEventQueueList = attributes.getAsyncEventQueueIds();
+    if (asyncEventQueueList == null) {
+      asyncEventQueueList = "";
+    }
+    if (!asyncEventQueueList.contains(queueName)) {
+      if (asyncEventQueueList.length() > 0) {
+        asyncEventQueueList += ',';
+      }
+      asyncEventQueueList += queueName;
+      attributes.setAsyncEventQueueIds(asyncEventQueueList);
+    }
+  }
+
+  private void setCacheLoader(RegionAttributesType attributes) {
+    DeclarableType loader = new DeclarableType();
+    loader.setClassName(JdbcLoader.class.getName());
+    attributes.setCacheLoader(loader);
+  }
+
+  private RegionAttributesType getRegionAttributes(RegionConfig regionConfig) {
+    RegionAttributesType attributes;
+    List<RegionAttributesType> attributesList = regionConfig.getRegionAttributes();
+    if (attributesList.isEmpty()) {
+      attributes = new RegionAttributesType();
+      attributesList.add(attributes);
+    } else {
+      attributes = attributesList.get(0);
+    }
+    return attributes;
   }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
index 830075e..c10f8b1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
@@ -18,7 +18,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -29,12 +31,14 @@ import java.util.Set;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import org.apache.geode.cache.configuration.CacheConfig;
 import org.apache.geode.cache.configuration.CacheConfig.AsyncEventQueue;
 import org.apache.geode.cache.configuration.CacheElement;
 import org.apache.geode.cache.configuration.DeclarableType;
 import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionAttributesType.PartitionAttributes;
 import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.distributed.ConfigurationPersistenceService;
@@ -61,6 +65,7 @@ public class CreateMappingCommandTest {
   private RegionMapping mapping;
   private CacheConfig cacheConfig;
   RegionConfig matchingRegion;
+  RegionAttributesType matchingRegionAttributes;
 
   @Before
   public void setup() {
@@ -90,6 +95,10 @@ public class CreateMappingCommandTest {
 
     matchingRegion = mock(RegionConfig.class);
     when(matchingRegion.getName()).thenReturn(regionName);
+    List<RegionAttributesType> attributesList = new ArrayList<>();
+    matchingRegionAttributes = mock(RegionAttributesType.class);
+    attributesList.add(matchingRegionAttributes);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributesList);
 
   }
 
@@ -253,14 +262,14 @@ public class CreateMappingCommandTest {
   }
 
   @Test
-  public void testUpdateClusterConfigWithNoRegions() {
+  public void updateClusterConfigWithNoRegionsDoesNotThrowException() {
     when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
 
     createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
   }
 
   @Test
-  public void testUpdateClusterConfigWithOneMatchingRegion() {
+  public void updateClusterConfigWithOneMatchingRegionAddsMappingToRegion() {
     List<RegionConfig> list = new ArrayList<>();
     List<CacheElement> listCacheElements = new ArrayList<>();
     when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
@@ -274,7 +283,7 @@ public class CreateMappingCommandTest {
   }
 
   @Test
-  public void testUpdateClusterConfigWithOneNonMatchingRegion() {
+  public void updateClusterConfigWithOneNonMatchingRegionDoesNotAddMapping() {
     List<RegionConfig> list = new ArrayList<>();
     RegionConfig nonMatchingRegion = mock(RegionConfig.class);
     when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
@@ -288,4 +297,128 @@ public class CreateMappingCommandTest {
     assertThat(listCacheElements).isEmpty();
   }
 
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionCreatesAsyncEventQueue() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+    assertThat(queueList.size()).isEqualTo(1);
+    String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+    assertThat(queueList.get(0).getId()).isEqualTo(queueName);
+    assertThat(queueList.get(0).isParallel()).isFalse();
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingPartitionedRegionCreatesParallelAsyncEventQueue() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getPartitionAttributes())
+        .thenReturn(mock(PartitionAttributes.class));
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+    assertThat(queueList.get(0).isParallel()).isTrue();
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionCallsSetCacheLoader() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+    verify(matchingRegionAttributes).setCacheLoader(any());
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndNullQueuesAddsAsyncEventQueueIdToRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(null);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+    ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+    verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+    String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+    assertThat(argument.getValue()).isEqualTo(queueName);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndEmptyQueuesAddsAsyncEventQueueIdToRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("");
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+    ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+    verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+    String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+    assertThat(argument.getValue()).isEqualTo(queueName);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndExistingQueuesAddsAsyncEventQueueIdToRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn("q1,q2");
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+    ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+    verify(matchingRegionAttributes).setAsyncEventQueueIds(argument.capture());
+    String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+    assertThat(argument.getValue()).isEqualTo("q1,q2," + queueName);
+  }
+
+  @Test
+  public void updateClusterConfigWithOneMatchingRegionAndQueuesContainingDuplicateDoesNotModifyAsyncEventQueueIdOnRegion() {
+    List<RegionConfig> list = new ArrayList<>();
+    List<CacheElement> listCacheElements = new ArrayList<>();
+    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<CacheConfig.AsyncEventQueue> queueList = new ArrayList<>();
+    when(cacheConfig.getAsyncEventQueues()).thenReturn(queueList);
+    String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+    String existingQueues = "q1," + queueName + ",q2";
+    when(matchingRegionAttributes.getAsyncEventQueueIds()).thenReturn(existingQueues);
+
+    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
+
+    verify(matchingRegionAttributes, never()).setAsyncEventQueueIds(any());
+  }
 }