You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2022/07/05 11:24:55 UTC
[geode] branch develop updated: GEODE-10392: When gw sender started with cleanqueue remove EvictionController from diskstore (#7817)
This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d6261cc3cc GEODE-10392: When gw sender started with cleanqueue remove EvictionController from diskstore (#7817)
d6261cc3cc is described below
commit d6261cc3cce1b661714fb136c732addb8c11a33e
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Tue Jul 5 13:24:48 2022 +0200
GEODE-10392: When gw sender started with cleanqueue remove EvictionController from diskstore (#7817)
---
.../apache/geode/internal/cache/DiskStoreImpl.java | 9 +
.../geode/internal/cache/PartitionedRegion.java | 9 +
.../wan/parallel/ParallelGatewaySenderQueue.java | 1 +
.../ParallelWANOverflowStatsDistributedTest.java | 367 +++++++++++++++++++++
4 files changed, 386 insertions(+)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 0f9865b720..19990c7030 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -4079,6 +4079,15 @@ public class DiskStoreImpl implements DiskStore {
}
}
+ void clearExistingPREvictionContoller(PartitionedRegion pr) {
+ final String prName = pr.getFullPath();
+ synchronized (prEvictionControllerMap) {
+ prEvictionControllerMap.remove(prName);
+ }
+ }
+
+
+
/**
* Lock the disk store to prevent updates. This is the first step of the backup process. Once all
* disk stores on all members are locked, we still move on to prepareBackup.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 896af32576..e14a159fc2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -556,6 +556,15 @@ public class PartitionedRegion extends LocalRegion
return result;
}
+
+ public void clearPREvictionControllerFromDiskInitialization() {
+ if (getDiskStore() != null) {
+ getDiskStore().clearExistingPREvictionContoller(this);
+ }
+ }
+
+
+
@Override
public boolean remove(Object key, Object value, Object callbackArg) {
final long startTime = prStats.getTime();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index c37b4aefc8..7dc5f18e63 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -497,6 +497,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if ((prQ != null) && (index == 0) && cleanQueues) {
cleanOverflowStats(cache);
+ prQ.clearPREvictionControllerFromDiskInitialization();
prQ.destroyRegion(null);
prQ = null;
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANOverflowStatsDistributedTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANOverflowStatsDistributedTest.java
new file mode 100644
index 0000000000..3e99d320f9
--- /dev/null
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANOverflowStatsDistributedTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.management.GatewaySenderMXBean;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.test.junit.categories.WanTest;
+
+@Category({WanTest.class})
+public class ParallelWANOverflowStatsDistributedTest extends WANTestBase {
+ private static final int NUM_PUTS = 100;
+ private static final long serialVersionUID = 1L;
+
+ private String testName;
+
+ public ParallelWANOverflowStatsDistributedTest() {
+ super();
+ }
+
+ @Override
+ protected final void postSetUpWANTestBase() {
+ testName = getTestMethodName();
+ }
+
+ @Test
+ public void testPutDataAndCheckTotalQueueSizeBytesInUse() {
+ // 1. Create locators
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+
+ vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+
+
+ vm4.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm5.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm6.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm7.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> doPuts(testName, 100));
+
+ vm4.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm5.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm6.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm7.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+
+ }
+
+
+ @Test
+ public void testPutDataThenCleanQueueCheckTotalQueueSizeBytesInUse() {
+ // 1. Create locators
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+
+ vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+
+
+ vm4.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm5.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm6.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm7.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> doPuts(testName, 100));
+
+ vm4.invoke(() -> stopSender("ln"));
+ vm5.invoke(() -> stopSender("ln"));
+ vm6.invoke(() -> stopSender("ln"));
+ vm7.invoke(() -> stopSender("ln"));
+
+ vm4.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm5.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm6.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm7.invoke(() -> waitForSenderNonRunningState("ln"));
+
+ startSenderwithCleanQueuesInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> checkTotalQueueSizeBytesInUseIsZero("ln"));
+ vm5.invoke(() -> checkTotalQueueSizeBytesInUseIsZero("ln"));
+ vm6.invoke(() -> checkTotalQueueSizeBytesInUseIsZero("ln"));
+ vm7.invoke(() -> checkTotalQueueSizeBytesInUseIsZero("ln"));
+
+ }
+
+
+ @Test
+ public void testPutDataThenCleanQueueThenPutDataCheckTotalQueueSizeBytesInUse() {
+ // 1. Create locators
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+
+ vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false));
+
+
+ vm4.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm5.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm6.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm7.invoke(() -> createPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> doPuts(testName, 100));
+
+ vm4.invoke(() -> stopSender("ln"));
+ vm5.invoke(() -> stopSender("ln"));
+ vm6.invoke(() -> stopSender("ln"));
+ vm7.invoke(() -> stopSender("ln"));
+
+ vm4.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm5.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm6.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm7.invoke(() -> waitForSenderNonRunningState("ln"));
+
+ startSenderwithCleanQueuesInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> doPuts(testName, 100));
+
+ vm4.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm5.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm6.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm7.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+
+ }
+
+ @Test
+ public void testPutDataRestartServerThenCheckTotalQueueSizeBytesInUse() {
+ // 1. Create locators
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+ String diskStore2 = vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+ String diskStore3 = vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+ String diskStore4 = vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+
+
+ vm4.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm5.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm6.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm7.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> doPuts(testName, 100));
+
+ vm4.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm5.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm6.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm7.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+
+ vm5.invoke(() -> WANTestBase.killSender());
+
+ createCacheInVMs(lnPort, vm5);
+
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
+ null, diskStore2, false));
+
+ vm5.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> checkTotalQueueSizeBytesInUseAfterRestore("ln"));
+
+ }
+
+ @Test
+ public void testPutDataRestartServerThenCleanQueueThenPutDataCheckTotalQueueSizeBytesInUse() {
+ // 1. Create locators
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+ String diskStore2 = vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+ String diskStore3 = vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+ String diskStore4 = vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, false));
+
+
+ vm4.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm5.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm6.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+ vm7.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> doPuts(testName, 100));
+
+ vm5.invoke(() -> WANTestBase.killSender());
+
+ createCacheInVMs(lnPort, vm5);
+
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
+ null, diskStore2, false));
+
+ vm5.invoke(() -> createPersistentPartitionedRegion(testName, "ln", 1, 100, isOffHeap()));
+
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> checkTotalQueueSizeBytesInUseAfterRestore("ln"));
+
+ vm4.invoke(() -> stopSender("ln"));
+ vm5.invoke(() -> stopSender("ln"));
+ vm6.invoke(() -> stopSender("ln"));
+ vm7.invoke(() -> stopSender("ln"));
+
+ vm4.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm5.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm6.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm7.invoke(() -> waitForSenderNonRunningState("ln"));
+
+ startSenderwithCleanQueuesInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> waitForSenderRunningState("ln"));
+ vm5.invoke(() -> waitForSenderRunningState("ln"));
+ vm6.invoke(() -> waitForSenderRunningState("ln"));
+ vm7.invoke(() -> waitForSenderRunningState("ln"));
+
+ vm4.invoke(() -> doPuts(testName, 100));
+
+ vm4.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm5.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm6.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+ vm7.invoke(() -> checkTotalQueueSizeBytesInUse("ln"));
+
+ }
+
+
+ private void checkTotalQueueSizeBytesInUse(String senderId) throws Exception {
+
+ // Get gateway sender mbean
+ ManagementService service = ManagementService.getManagementService(cache);
+ GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean(senderId);
+ assertThat(bean).isNotNull();
+
+ // Wait for the sampler to take a few samples
+ waitForSamplerToSample(5);
+
+ // Verify the bean attributes match the stat values
+ await().untilAsserted(() -> {
+ assertThat(bean.getTotalQueueSizeBytesInUse()).isGreaterThan(20000);
+ });
+ }
+
+
+ private void checkTotalQueueSizeBytesInUseAfterRestore(String senderId) throws Exception {
+
+ // Get gateway sender mbean
+ ManagementService service = ManagementService.getManagementService(cache);
+ GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean(senderId);
+ assertThat(bean).isNotNull();
+
+ // Wait for the sampler to take a few samples
+ waitForSamplerToSample(5);
+
+ // Verify the bean attributes match the stat values
+ await().untilAsserted(() -> {
+ assertThat(bean.getTotalQueueSizeBytesInUse()).isGreaterThan(12000);
+ });
+ }
+
+ private void checkTotalQueueSizeBytesInUseIsZero(String senderId) throws Exception {
+
+ // Get gateway sender mbean
+ ManagementService service = ManagementService.getManagementService(cache);
+ GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean(senderId);
+ assertThat(bean).isNotNull();
+
+ // Wait for the sampler to take a few samples
+ waitForSamplerToSample(5);
+
+ // Verify the bean attributes match the stat values
+ await().untilAsserted(() -> {
+ assertThat(bean.getTotalQueueSizeBytesInUse()).isEqualTo(0);
+ });
+ }
+
+ private void waitForSamplerToSample(int numTimesToSample) throws Exception {
+ InternalDistributedSystem ids = (InternalDistributedSystem) cache.getDistributedSystem();
+ assertThat(ids.getStatSampler().waitForSampleCollector(60000)).isNotNull();
+ for (int i = 0; i < numTimesToSample; i++) {
+ assertThat(ids.getStatSampler().waitForSample((60000))).isTrue();
+ }
+ }
+
+}