You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2022/06/08 13:36:36 UTC
[geode] branch develop updated: GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (#7424)
This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 425bdb8e26 GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (#7424)
425bdb8e26 is described below
commit 425bdb8e2601430f13733bed85566aa976a2c17c
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Wed Jun 8 15:36:30 2022 +0200
GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (#7424)
* GEODE-10068: Make WanCopyRegionFunctionService thread pool configurable through property
* GEODE-10068: Change name of property and add test case
* GEODE-10068: Update after more review comments
---
.../geode/internal/lang/SystemPropertyHelper.java | 8 ++++
.../wan/internal/WanCopyRegionFunctionService.java | 17 +++++--
.../internal/WanCopyRegionFunctionServiceTest.java | 54 ++++++++++++++++++++--
3 files changed, 70 insertions(+), 9 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
index 01bdb67d3b..3efb25a71c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
@@ -113,6 +113,14 @@ public class SystemPropertyHelper {
*/
public static final String RE_AUTHENTICATE_WAIT_TIME = "reauthenticate.wait.time";
+ /**
+ * Maximum number of concurrent executions in a server of wan-copy region commands.
+ * Once the maximum number is reached, subsequent executions will be halted until
+ * a thread for any of the ongoing executions is released.
+ */
+ public static final String WAN_COPY_REGION_MAX_CONCURRENT_THREADS =
+ "geode.wan.copy-region.max-threads";
+
/**
* As of Geode 1.4.0, a region set operation will be in a transaction even if it is the first
* operation in the transaction.
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
index 79000b4a60..a8592a33c5 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
@@ -25,12 +25,17 @@ import java.util.concurrent.TimeUnit;
import org.apache.geode.cache.Cache;
import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.lang.SystemProperty;
+import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
import org.apache.geode.management.internal.functions.CliFunctionResult;
public class WanCopyRegionFunctionService implements CacheService {
+ private static final String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
+ "WAN Copy Region Function Execution Processor";
+
private volatile ExecutorService wanCopyRegionFunctionExecutionPool;
/**
@@ -41,11 +46,15 @@ public class WanCopyRegionFunctionService implements CacheService {
@Override
public boolean init(Cache cache) {
- String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
- "WAN Copy Region Function Execution Processor";
- int WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS = 10;
+ int maxConcurrentThreads = SystemProperty
+ .getProductIntegerProperty(
+ SystemPropertyHelper.WAN_COPY_REGION_MAX_CONCURRENT_THREADS, 10);
+ return init(maxConcurrentThreads);
+ }
+
+ boolean init(int maxConcurrentThreads) {
wanCopyRegionFunctionExecutionPool = LoggingExecutors
- .newFixedThreadPool(WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS,
+ .newFixedThreadPool(maxConcurrentThreads,
WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX, true);
return true;
}
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
index 864aa63e7a..1b5a89821b 100644
--- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
@@ -23,9 +23,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.internal.functions.CliFunctionResult;
@@ -36,7 +37,7 @@ public class WanCopyRegionFunctionServiceTest {
private WanCopyRegionFunctionService service;
private final InternalCache cache = mock(InternalCache.class);
- @Before
+ @BeforeEach
public void setUp() throws Exception {
service = new WanCopyRegionFunctionService();
service.init(cache);
@@ -158,7 +159,7 @@ public class WanCopyRegionFunctionServiceTest {
int executions = 5;
CountDownLatch latch = new CountDownLatch(executions);
for (int i = 0; i < executions; i++) {
- Callable<CliFunctionResult> firstExecution = () -> {
+ Callable<CliFunctionResult> execution = () -> {
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
return null;
};
@@ -167,7 +168,7 @@ public class WanCopyRegionFunctionServiceTest {
CompletableFuture
.supplyAsync(() -> {
try {
- return service.execute(firstExecution, regionName, "mySender1");
+ return service.execute(execution, regionName, "mySender1");
} catch (Exception e) {
return null;
}
@@ -183,4 +184,47 @@ public class WanCopyRegionFunctionServiceTest {
latch.countDown();
}
}
+
+ @Test
+ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() {
+ int maxConcurrentExecutions = 2;
+ service.init(maxConcurrentExecutions);
+
+ int executions = 4;
+ CountDownLatch latch = new CountDownLatch(executions);
+ AtomicInteger concurrentExecutions = new AtomicInteger(0);
+ for (int i = 0; i < executions; i++) {
+ Callable<CliFunctionResult> execution = () -> {
+ concurrentExecutions.incrementAndGet();
+ latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ concurrentExecutions.decrementAndGet();
+ return null;
+ };
+
+ final String regionName = String.valueOf(i);
+ CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ return service.execute(execution, regionName, "mySender1");
+ } catch (Exception e) {
+ return null;
+ }
+ });
+ }
+
+ // Wait for the functions to start execution
+ await().untilAsserted(
+ () -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
+
+ // Make sure concurrent executions does not exceed the maximum
+ assertThat(concurrentExecutions.get()).isEqualTo(maxConcurrentExecutions);
+
+ // End executions
+ for (int i = 0; i < executions; i++) {
+ latch.countDown();
+ }
+
+ await().untilAsserted(() -> assertThat(concurrentExecutions.get()).isEqualTo(0));
+ }
+
}