You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2022/06/08 13:36:36 UTC

[geode] branch develop updated: GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (#7424)

This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 425bdb8e26 GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (#7424)
425bdb8e26 is described below

commit 425bdb8e2601430f13733bed85566aa976a2c17c
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Wed Jun 8 15:36:30 2022 +0200

    GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (#7424)
    
    * GEODE-10068: Make WanCopyRegionFunctionService thread pool configurable through property
    
    * GEODE-10068: Change name of property and add test case
    
    * GEODE-10068: Update after more review comments
---
 .../geode/internal/lang/SystemPropertyHelper.java  |  8 ++++
 .../wan/internal/WanCopyRegionFunctionService.java | 17 +++++--
 .../internal/WanCopyRegionFunctionServiceTest.java | 54 ++++++++++++++++++++--
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
index 01bdb67d3b..3efb25a71c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
@@ -113,6 +113,14 @@ public class SystemPropertyHelper {
    */
   public static final String RE_AUTHENTICATE_WAIT_TIME = "reauthenticate.wait.time";
 
+  /**
+   * Maximum number of concurrent executions in a server of wan-copy region commands.
+   * Once the maximum number is reached, subsequent executions will be halted until
+   * a thread for any of the ongoing executions is released.
+   */
+  public static final String WAN_COPY_REGION_MAX_CONCURRENT_THREADS =
+      "geode.wan.copy-region.max-threads";
+
   /**
    * As of Geode 1.4.0, a region set operation will be in a transaction even if it is the first
    * operation in the transaction.
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
index 79000b4a60..a8592a33c5 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
@@ -25,12 +25,17 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.lang.SystemProperty;
+import org.apache.geode.internal.lang.SystemPropertyHelper;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
 import org.apache.geode.management.internal.functions.CliFunctionResult;
 
 public class WanCopyRegionFunctionService implements CacheService {
 
+  private static final String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
+      "WAN Copy Region Function Execution Processor";
+
   private volatile ExecutorService wanCopyRegionFunctionExecutionPool;
 
   /**
@@ -41,11 +46,15 @@ public class WanCopyRegionFunctionService implements CacheService {
 
   @Override
   public boolean init(Cache cache) {
-    String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
-        "WAN Copy Region Function Execution Processor";
-    int WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS = 10;
+    int maxConcurrentThreads = SystemProperty
+        .getProductIntegerProperty(
+            SystemPropertyHelper.WAN_COPY_REGION_MAX_CONCURRENT_THREADS, 10);
+    return init(maxConcurrentThreads);
+  }
+
+  boolean init(int maxConcurrentThreads) {
     wanCopyRegionFunctionExecutionPool = LoggingExecutors
-        .newFixedThreadPool(WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS,
+        .newFixedThreadPool(maxConcurrentThreads,
             WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX, true);
     return true;
   }
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
index 864aa63e7a..1b5a89821b 100644
--- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
@@ -23,9 +23,10 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.management.internal.functions.CliFunctionResult;
@@ -36,7 +37,7 @@ public class WanCopyRegionFunctionServiceTest {
   private WanCopyRegionFunctionService service;
   private final InternalCache cache = mock(InternalCache.class);
 
-  @Before
+  @BeforeEach
   public void setUp() throws Exception {
     service = new WanCopyRegionFunctionService();
     service.init(cache);
@@ -158,7 +159,7 @@ public class WanCopyRegionFunctionServiceTest {
     int executions = 5;
     CountDownLatch latch = new CountDownLatch(executions);
     for (int i = 0; i < executions; i++) {
-      Callable<CliFunctionResult> firstExecution = () -> {
+      Callable<CliFunctionResult> execution = () -> {
         latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
         return null;
       };
@@ -167,7 +168,7 @@ public class WanCopyRegionFunctionServiceTest {
       CompletableFuture
           .supplyAsync(() -> {
             try {
-              return service.execute(firstExecution, regionName, "mySender1");
+              return service.execute(execution, regionName, "mySender1");
             } catch (Exception e) {
               return null;
             }
@@ -183,4 +184,47 @@ public class WanCopyRegionFunctionServiceTest {
       latch.countDown();
     }
   }
+
+  @Test
+  public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() {
+    int maxConcurrentExecutions = 2;
+    service.init(maxConcurrentExecutions);
+
+    int executions = 4;
+    CountDownLatch latch = new CountDownLatch(executions);
+    AtomicInteger concurrentExecutions = new AtomicInteger(0);
+    for (int i = 0; i < executions; i++) {
+      Callable<CliFunctionResult> execution = () -> {
+        concurrentExecutions.incrementAndGet();
+        latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+        concurrentExecutions.decrementAndGet();
+        return null;
+      };
+
+      final String regionName = String.valueOf(i);
+      CompletableFuture
+          .supplyAsync(() -> {
+            try {
+              return service.execute(execution, regionName, "mySender1");
+            } catch (Exception e) {
+              return null;
+            }
+          });
+    }
+
+    // Wait for the functions to start execution
+    await().untilAsserted(
+        () -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
+
+    // Make sure concurrent executions does not exceed the maximum
+    assertThat(concurrentExecutions.get()).isEqualTo(maxConcurrentExecutions);
+
+    // End executions
+    for (int i = 0; i < executions; i++) {
+      latch.countDown();
+    }
+
+    await().untilAsserted(() -> assertThat(concurrentExecutions.get()).isEqualTo(0));
+  }
+
 }