You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/11/09 01:00:42 UTC

[geode] branch feature/GEODE-6010 updated: wip: function now creates async-event-queue and alters regions. need to change updateClusterConfig to alter the region config.

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch feature/GEODE-6010
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6010 by this push:
     new 19198b4  wip: function now creates async-event-queue and alters regions. need to change updateClusterConfig to alter the region config.
19198b4 is described below

commit 19198b4b6aaf4d36ff17b727eaa76f2e04ae300c
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Nov 8 16:59:22 2018 -0800

    wip: function now creates async-event-queue and alters regions.
    need to change updateClusterConfig to alter the region config.
---
 .../jdbc/internal/cli/CreateMappingCommand.java    | 36 +++++-----
 .../jdbc/internal/cli/CreateMappingFunction.java   | 41 +++++++++--
 .../internal/cli/CreateMappingCommandTest.java     | 79 ++++++++++------------
 .../internal/cli/CreateMappingFunctionTest.java    | 54 ++++++++++++++-
 4 files changed, 142 insertions(+), 68 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index 0218e61..f3a00de 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -91,6 +91,12 @@ public class CreateMappingCommand extends SingleGfshCommand {
           .createError("A region named " + regionName + " must already exist.");
     }
 
+    if (regionConfig.getCustomRegionElements().stream()
+        .anyMatch(element -> element instanceof RegionMapping)) {
+      return ResultModel
+          .createError("A jdbc-mapping for " + regionName + " already exists.");
+    }
+
     RegionAttributesType regionAttributes = regionConfig.getRegionAttributes().stream()
         .filter(attributes -> attributes.getCacheLoader() != null).findFirst().orElse(null);
     if (regionAttributes != null) {
@@ -120,29 +126,25 @@ public class CreateMappingCommand extends SingleGfshCommand {
     return result;
   }
 
-  String getAsyncEventQueueName(String regionName) {
+  static String getAsyncEventQueueName(String regionName) {
     return "JDBC-" + regionName;
   }
 
   @Override
   public void updateClusterConfig(String group, CacheConfig cacheConfig, Object element) {
     RegionMapping newCacheElement = (RegionMapping) element;
-    RegionMapping existingCacheElement = cacheConfig.findCustomRegionElement(
-        newCacheElement.getRegionName(), newCacheElement.getId(), RegionMapping.class);
-
-    if (existingCacheElement != null) {
-      cacheConfig
-          .getRegions()
-          .stream()
-          .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
-          .forEach(
-              regionConfig -> regionConfig.getCustomRegionElements().remove(existingCacheElement));
+    RegionConfig regionConfig = cacheConfig.getRegions().stream()
+        .filter(region -> region.getName().equals(newCacheElement.getRegionName())).findFirst()
+        .orElse(null);
+    if (regionConfig != null) {
+      regionConfig.getCustomRegionElements().add(newCacheElement);
     }
-
-    cacheConfig
-        .getRegions()
-        .stream()
-        .filter(regionConfig -> regionConfig.getName().equals(newCacheElement.getRegionName()))
-        .forEach(regionConfig -> regionConfig.getCustomRegionElements().add(newCacheElement));
+    AsyncEventQueue asyncEventQueue = new AsyncEventQueue();
+    asyncEventQueue.setId(getAsyncEventQueueName(newCacheElement.getRegionName()));
+    boolean isPartitioned = regionConfig.getRegionAttributes().stream()
+        .anyMatch(attributes -> attributes.getPartitionAttributes() != null);
+    asyncEventQueue.setParallel(isPartitioned);
+    cacheConfig.getAsyncEventQueues().add(asyncEventQueue);
+    // TODO alter region config
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
index ba71dfb..ed03ff7 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
@@ -16,7 +16,11 @@ package org.apache.geode.connectors.jdbc.internal.cli;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
 import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
@@ -37,9 +41,14 @@ public class CreateMappingFunction extends CliFunction<RegionMapping> {
     // input
     RegionMapping regionMapping = context.getArguments();
 
-    verifyRegionExists(context, regionMapping);
+    String regionName = regionMapping.getRegionName();
+    Region<?, ?> region = verifyRegionExists(context, regionName);
 
     // action
+    String queueName = CreateMappingCommand.getAsyncEventQueueName(regionName);
+    createAsyncEventQueue(context.getCache(), queueName,
+        region.getAttributes().getPartitionAttributes() != null);
+    alterRegion(region, queueName);
     createRegionMapping(service, regionMapping);
 
     // output
@@ -49,14 +58,36 @@ public class CreateMappingFunction extends CliFunction<RegionMapping> {
     return new CliFunctionResult(member, true, message);
   }
 
-  private void verifyRegionExists(FunctionContext<RegionMapping> context,
-      RegionMapping regionMapping) {
+  /**
+   * Change the existing region to have
+   * the JdbcLoader as its cache-loader
+   * and the given async-event-queue as one of its queues.
+   */
+  private void alterRegion(Region<?, ?> region, String queueName) {
+    region.getAttributesMutator().setCacheLoader(new JdbcLoader());
+    region.getAttributesMutator().addAsyncEventQueueId(queueName);
+  }
+
+  /**
+   * Create an async-event-queue with the given name.
+   * For a partitioned region a parallel queue is created.
+   * Otherwise a serial queue is created.
+   */
+  private void createAsyncEventQueue(Cache cache, String queueName, boolean isPartitioned) {
+    AsyncEventQueueFactory asyncEventQueueFactory = cache.createAsyncEventQueueFactory();
+    asyncEventQueueFactory.setParallel(isPartitioned);
+    asyncEventQueueFactory.create(queueName, new JdbcAsyncWriter());
+  }
+
+  private Region<?, ?> verifyRegionExists(FunctionContext<RegionMapping> context,
+      String regionName) {
     Cache cache = context.getCache();
-    String regionName = regionMapping.getRegionName();
-    if (cache.getRegion(regionName) == null) {
+    Region<?, ?> result = cache.getRegion(regionName);
+    if (result == null) {
       throw new IllegalStateException(
           "create jdbc-mapping requires that the region \"" + regionName + "\" exists.");
     }
+    return result;
   }
 
   /**
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
index 37d7901..830075e 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
@@ -163,6 +163,37 @@ public class CreateMappingCommandTest {
   }
 
   @Test
+  public void createsMappingReturnsStatusERRORWhenRegionMappingExists() {
+    results.add(successFunctionResult);
+    ConfigurationPersistenceService configurationPersistenceService =
+        mock(ConfigurationPersistenceService.class);
+    doReturn(configurationPersistenceService).when(createRegionMappingCommand)
+        .getConfigurationPersistenceService();
+    when(configurationPersistenceService.getCacheConfig(null)).thenReturn(cacheConfig);
+    List<RegionConfig> list = new ArrayList<>();
+    list.add(matchingRegion);
+    when(cacheConfig.getRegions()).thenReturn(list);
+    List<RegionAttributesType> attributes = new ArrayList<>();
+    RegionAttributesType loaderAttribute = mock(RegionAttributesType.class);
+    DeclarableType loaderDeclarable = mock(DeclarableType.class);
+    when(loaderDeclarable.getClassName()).thenReturn(null);
+    when(loaderAttribute.getCacheLoader()).thenReturn(loaderDeclarable);
+    attributes.add(loaderAttribute);
+    when(matchingRegion.getRegionAttributes()).thenReturn(attributes);
+    List<CacheElement> customList = new ArrayList<>();
+    RegionMapping existingMapping = mock(RegionMapping.class);
+    customList.add(existingMapping);
+    when(matchingRegion.getCustomRegionElements()).thenReturn(customList);
+
+    ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
+        tableName, pdxClass);
+
+    assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
+    assertThat(result.toString()).contains("A jdbc-mapping for " + regionName + " already exists.");
+  }
+
+
+  @Test
   public void createsMappingReturnsStatusERRORWhenClusterConfigRegionHasLoader() {
     results.add(successFunctionResult);
     ConfigurationPersistenceService configurationPersistenceService =
@@ -222,53 +253,17 @@ public class CreateMappingCommandTest {
   }
 
   @Test
-  public void testUpdateClusterConfigWithNoRegionsAndNoExistingElement() {
-    doReturn(null).when(cacheConfig).findCustomRegionElement(any(), any(), any());
+  public void testUpdateClusterConfigWithNoRegions() {
     when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
-    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
-  }
-
-  @Test
-  public void testUpdateClusterConfigWithOneMatchingRegionAndNoExistingElement() {
-    doReturn(null).when(cacheConfig).findCustomRegionElement(any(), any(), any());
-    List<RegionConfig> list = new ArrayList<>();
-    List<CacheElement> listCacheElements = new ArrayList<>();
-    when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
-    list.add(matchingRegion);
-    when(cacheConfig.getRegions()).thenReturn(list);
 
     createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
-
-    assertThat(listCacheElements.size()).isEqualTo(1);
-    assertThat(listCacheElements).contains(mapping);
   }
 
   @Test
-  public void testUpdateClusterConfigWithOneNonMatchingRegionAndNoExistingElement() {
-    doReturn(null).when(cacheConfig).findCustomRegionElement(any(), any(), any());
+  public void testUpdateClusterConfigWithOneMatchingRegion() {
     List<RegionConfig> list = new ArrayList<>();
-    RegionConfig nonMatchingRegion = mock(RegionConfig.class);
-    when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
-    List<CacheElement> listCacheElements = new ArrayList<>();
-    when(nonMatchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
-    list.add(nonMatchingRegion);
-    when(cacheConfig.getRegions()).thenReturn(list);
-
-    createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
-
-    assertThat(listCacheElements).isEmpty();
-  }
-
-  @Test
-  public void testUpdateClusterConfigWithOneMatchingRegionAndExistingElement() {
-    RegionMapping existingMapping = mock(RegionMapping.class);
-    doReturn(existingMapping).when(cacheConfig).findCustomRegionElement(any(), any(), any());
-    RegionConfig matchingRegion = mock(RegionConfig.class);
-    when(matchingRegion.getName()).thenReturn(regionName);
     List<CacheElement> listCacheElements = new ArrayList<>();
-    listCacheElements.add(existingMapping);
     when(matchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
-    List<RegionConfig> list = new ArrayList<>();
     list.add(matchingRegion);
     when(cacheConfig.getRegions()).thenReturn(list);
 
@@ -279,22 +274,18 @@ public class CreateMappingCommandTest {
   }
 
   @Test
-  public void testUpdateClusterConfigWithOneNonMatchingRegionAndExistingElement() {
-    RegionMapping existingMapping = mock(RegionMapping.class);
-    doReturn(existingMapping).when(cacheConfig).findCustomRegionElement(any(), any(), any());
+  public void testUpdateClusterConfigWithOneNonMatchingRegion() {
     List<RegionConfig> list = new ArrayList<>();
     RegionConfig nonMatchingRegion = mock(RegionConfig.class);
     when(nonMatchingRegion.getName()).thenReturn("nonMatchingRegion");
     List<CacheElement> listCacheElements = new ArrayList<>();
-    listCacheElements.add(existingMapping);
     when(nonMatchingRegion.getCustomRegionElements()).thenReturn(listCacheElements);
     list.add(nonMatchingRegion);
     when(cacheConfig.getRegions()).thenReturn(list);
 
     createRegionMappingCommand.updateClusterConfig(null, cacheConfig, mapping);
 
-    assertThat(listCacheElements.size()).isEqualTo(1);
-    assertThat(listCacheElements).contains(existingMapping);
+    assertThat(listCacheElements).isEmpty();
   }
 
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
index eac2343..eb54614 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
@@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -32,7 +33,11 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
@@ -53,6 +58,8 @@ public class CreateMappingFunctionTest {
   private ResultSender<Object> resultSender;
   private JdbcConnectorService service;
   private InternalCache cache;
+  private Region region;
+  private AsyncEventQueueFactory asyncEventQueueFactory;
 
   private CreateMappingFunction function;
 
@@ -61,7 +68,7 @@ public class CreateMappingFunctionTest {
     context = mock(FunctionContext.class);
     resultSender = mock(ResultSender.class);
     cache = mock(InternalCache.class);
-    Region region = mock(Region.class);
+    region = mock(Region.class);
     DistributedSystem system = mock(DistributedSystem.class);
     distributedMember = mock(DistributedMember.class);
     service = mock(JdbcConnectorService.class);
@@ -75,6 +82,10 @@ public class CreateMappingFunctionTest {
     when(system.getDistributedMember()).thenReturn(distributedMember);
     when(context.getArguments()).thenReturn(regionMapping);
     when(cache.getService(eq(JdbcConnectorService.class))).thenReturn(service);
+    when(region.getAttributes()).thenReturn(mock(RegionAttributes.class));
+    when(region.getAttributesMutator()).thenReturn(mock(AttributesMutator.class));
+    asyncEventQueueFactory = mock(AsyncEventQueueFactory.class);
+    when(cache.createAsyncEventQueueFactory()).thenReturn(asyncEventQueueFactory);
 
     function = new CreateMappingFunction();
   }
@@ -130,9 +141,48 @@ public class CreateMappingFunctionTest {
 
   @Test
   public void executeCreatesMapping() throws Exception {
-    function.execute(context);
+    function.executeFunction(context);
+
+    verify(service, times(1)).createRegionMapping(regionMapping);
+  }
+
+  @Test
+  public void executeAlterRegionLoader() throws Exception {
+    function.executeFunction(context);
 
     verify(service, times(1)).createRegionMapping(regionMapping);
+    AttributesMutator mutator = region.getAttributesMutator();
+    verify(mutator, times(1)).setCacheLoader(any());
+  }
+
+  @Test
+  public void executeAlterRegionAsyncEventQueue() throws Exception {
+    String queueName = CreateMappingCommand.getAsyncEventQueueName(REGION_NAME);
+    function.executeFunction(context);
+
+    verify(service, times(1)).createRegionMapping(regionMapping);
+    AttributesMutator mutator = region.getAttributesMutator();
+    verify(mutator, times(1)).addAsyncEventQueueId(queueName);
+  }
+
+  @Test
+  public void executeCreatesSerialAsyncQueueForNonPartitionedRegion() throws Exception {
+    RegionAttributes attributes = region.getAttributes();
+    when(attributes.getPartitionAttributes()).thenReturn(null);
+    function.executeFunction(context);
+
+    verify(asyncEventQueueFactory, times(1)).create(any(), any());
+    verify(asyncEventQueueFactory, times(1)).setParallel(false);
+  }
+
+  @Test
+  public void executeCreatesParallelAsyncQueueForPartitionedRegion() throws Exception {
+    RegionAttributes attributes = region.getAttributes();
+    when(attributes.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
+    function.executeFunction(context);
+
+    verify(asyncEventQueueFactory, times(1)).create(any(), any());
+    verify(asyncEventQueueFactory, times(1)).setParallel(true);
   }
 
   @Test