You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jj...@apache.org on 2020/07/03 16:19:40 UTC
[geode] branch support/1.12 updated: GEODE-8029: Allow
OplogEntryIdSet to Overflow (#5329) (#5337)
This is an automated email from the ASF dual-hosted git repository.
jjramos pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new 4be10d6 GEODE-8029: Allow OplogEntryIdSet to Overflow (#5329) (#5337)
4be10d6 is described below
commit 4be10d6a2892cdad7f42ae32f34e0863149f342c
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Fri Jul 3 17:18:58 2020 +0100
GEODE-8029: Allow OplogEntryIdSet to Overflow (#5329) (#5337)
Do not delete drf files during member startup as that should be only
done by the compactor thread. Instead, allow the OplogEntryIdSet to
grow over the default capacity and log a warning message instructing
the user to manually compact the disk-stores.
- Added unit tests.
- Replaced usages of 'junit.Assert' by 'assertj'.
- Modified DiskStoreImpl.deadRecordCount to return long instead of int.
- Added internal overflow implementation to the OplogEntryIdSet so it can
grow above the default limit.
(cherry picked from commit fdc440131f0d562d97f2340d2e7ba5aacf935d62)
---
.../apache/geode/internal/cache/DiskStoreImpl.java | 83 ++++-
.../org/apache/geode/internal/cache/Oplog.java | 11 +-
.../internal/cache/OplogEntryIdSetJUnitTest.java | 74 -----
.../geode/internal/cache/OplogEntryIdSetTest.java | 169 ++++++++++
...SenderWithIsolatedDiskStoreDistributedTest.java | 348 ---------------------
5 files changed, 239 insertions(+), 446 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 4bb870c..f12a513 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -74,6 +74,7 @@ import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.Cache;
@@ -3516,33 +3517,85 @@ public class DiskStoreImpl implements DiskStore {
}
/**
- * Set of OplogEntryIds (longs). Memory is optimized by using an int[] for ids in the unsigned int
- * range.
+ * Set of OplogEntryIds (longs).
+ * Memory is optimized by using an int[] for ids in the unsigned int range.
+ * By default we can't have more than 805306401 ids for a load factor of 0.75, the internal lists
+ * are used to overcome this limit, allowing the disk-store to recover successfully (the internal
+ * class is **only** used during recovery to read all deleted entries).
*/
static class OplogEntryIdSet {
- private final IntOpenHashSet ints = new IntOpenHashSet((int) INVALID_ID);
- private final LongOpenHashSet longs = new LongOpenHashSet((int) INVALID_ID);
+ private final List<IntOpenHashSet> allInts;
+ private final List<LongOpenHashSet> allLongs;
+ private final AtomicReference<IntOpenHashSet> currentInts;
+ private final AtomicReference<LongOpenHashSet> currentLongs;
+
+ // For testing purposes only.
+ @VisibleForTesting
+ OplogEntryIdSet(List<IntOpenHashSet> allInts, List<LongOpenHashSet> allLongs) {
+ this.allInts = allInts;
+ this.currentInts = new AtomicReference<>(this.allInts.get(0));
+
+ this.allLongs = allLongs;
+ this.currentLongs = new AtomicReference<>(this.allLongs.get(0));
+ }
+
+ public OplogEntryIdSet() {
+ IntOpenHashSet intHashSet = new IntOpenHashSet((int) INVALID_ID);
+ this.allInts = new ArrayList<>();
+ this.allInts.add(intHashSet);
+ this.currentInts = new AtomicReference<>(intHashSet);
+
+ LongOpenHashSet longHashSet = new LongOpenHashSet((int) INVALID_ID);
+ this.allLongs = new ArrayList<>();
+ this.allLongs.add(longHashSet);
+ this.currentLongs = new AtomicReference<>(longHashSet);
+ }
public void add(long id) {
if (id == 0) {
throw new IllegalArgumentException();
- } else if (id > 0 && id <= 0x00000000FFFFFFFFL) {
- this.ints.add((int) id);
- } else {
- this.longs.add(id);
+ }
+
+ try {
+ if (id > 0 && id <= 0x00000000FFFFFFFFL) {
+ this.currentInts.get().add((int) id);
+ } else {
+ this.currentLongs.get().add(id);
+ }
+ } catch (IllegalArgumentException illegalArgumentException) {
+ // See GEODE-8029.
+ // Too many entries on the accumulated drf files, overflow and continue.
+ logger.warn(
+ "There is a large number of deleted entries within the disk-store, please execute an offline compaction.");
+
+ // Overflow to the next [Int|Long]OpenHashSet and continue.
+ if (id > 0 && id <= 0x00000000FFFFFFFFL) {
+ IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
+ allInts.add(overflownHashSet);
+ currentInts.set(overflownHashSet);
+
+ currentInts.get().add((int) id);
+ } else {
+ LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
+ allLongs.add(overflownHashSet);
+ currentLongs.set(overflownHashSet);
+
+ currentLongs.get().add(id);
+ }
}
}
public boolean contains(long id) {
if (id >= 0 && id <= 0x00000000FFFFFFFFL) {
- return this.ints.contains((int) id);
+ return allInts.stream().anyMatch(ints -> ints.contains((int) id));
} else {
- return this.longs.contains(id);
+ return allLongs.stream().anyMatch(longs -> longs.contains(id));
}
}
- public int size() {
- return this.ints.size() + this.longs.size();
+ public long size() {
+ return allInts.stream().mapToInt(IntOpenHashSet::size).sum()
+ + allLongs.stream().mapToInt(LongOpenHashSet::size).sum();
}
}
@@ -3972,13 +4025,13 @@ public class DiskStoreImpl implements DiskStore {
return this.liveEntryCount;
}
- private int deadRecordCount;
+ private long deadRecordCount;
- void incDeadRecordCount(int count) {
+ void incDeadRecordCount(long count) {
this.deadRecordCount += count;
}
- public int getDeadRecordCount() {
+ public long getDeadRecordCount() {
return this.deadRecordCount;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index bb80a06..8115365 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -937,17 +937,10 @@ public class Oplog implements CompactableOplog, Flushable {
// this.crf.raf.seek(this.crf.currSize);
} else if (!offline) {
// drf exists but crf has been deleted (because it was empty).
+ // I don't think the drf needs to be opened. It is only used during recovery.
+ // At some point the compacter may identify that it can be deleted.
this.crf.RAFClosed = true;
deleteCRF();
-
- // The drf file needs to be deleted (see GEODE-8029).
- // If compaction is not enabled, or if the compaction-threshold is never reached, there
- // will be orphaned drf files that are not automatically deleted (unless a manual
- // compaction is executed), in which case a later recovery might fail when the amount of
- // deleted records is too high (805306401).
- setHasDeletes(false);
- deleteDRF();
-
this.closed = true;
this.deleted.set(true);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetJUnitTest.java
deleted file mode 100644
index 5b57803..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetJUnitTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;
-
-/**
- * Tests DiskStoreImpl.OplogEntryIdSet
- */
-public class OplogEntryIdSetJUnitTest {
-
- @Test
- public void testBasics() {
- OplogEntryIdSet s = new OplogEntryIdSet();
- for (long i = 1; i < 777777; i++) {
- assertEquals(false, s.contains(i));
- }
- for (long i = 1; i < 777777; i++) {
- s.add(i);
- }
- for (long i = 1; i < 777777; i++) {
- assertEquals(true, s.contains(i));
- }
-
- try {
- s.add(DiskStoreImpl.INVALID_ID);
- fail("expected IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- }
- assertEquals(false, s.contains(0));
-
- assertEquals(false, s.contains(0x00000000FFFFFFFFL));
- s.add(0x00000000FFFFFFFFL);
- assertEquals(true, s.contains(0x00000000FFFFFFFFL));
-
- for (long i = 0x00000000FFFFFFFFL + 1; i < 0x00000000FFFFFFFFL + 777777; i++) {
- assertEquals(false, s.contains(i));
- }
- for (long i = 0x00000000FFFFFFFFL + 1; i < 0x00000000FFFFFFFFL + 777777; i++) {
- s.add(i);
- }
- for (long i = 0x00000000FFFFFFFFL + 1; i < 0x00000000FFFFFFFFL + 777777; i++) {
- assertEquals(true, s.contains(i));
- }
-
- for (long i = 1; i < 777777; i++) {
- assertEquals(true, s.contains(i));
- }
-
- assertEquals(false, s.contains(Long.MAX_VALUE));
- s.add(Long.MAX_VALUE);
- assertEquals(true, s.contains(Long.MAX_VALUE));
- assertEquals(false, s.contains(Long.MIN_VALUE));
- s.add(Long.MIN_VALUE);
- assertEquals(true, s.contains(Long.MIN_VALUE));
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetTest.java
new file mode 100644
index 0000000..53ded13
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;
+
+/**
+ * Tests DiskStoreImpl.OplogEntryIdSet
+ */
+public class OplogEntryIdSetTest {
+
+ @Test
+ public void testBasics() {
+ OplogEntryIdSet s = new OplogEntryIdSet();
+
+ LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isFalse());
+ LongStream.range(1, 777777).forEach(s::add);
+ LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());
+
+ assertThatThrownBy(() -> s.add(DiskStoreImpl.INVALID_ID))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThat(s.contains(0)).isFalse();
+
+ assertThat(s.contains(0x00000000FFFFFFFFL)).isFalse();
+ s.add(0x00000000FFFFFFFFL);
+ assertThat(s.contains(0x00000000FFFFFFFFL)).isTrue();
+
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
+ .forEach(i -> assertThat(s.contains(i)).isFalse());
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777).forEach(s::add);
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
+ .forEach(i -> assertThat(s.contains(i)).isTrue());
+
+ LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());
+
+ assertThat(s.contains(Long.MAX_VALUE)).isFalse();
+ s.add(Long.MAX_VALUE);
+ assertThat(s.contains(Long.MAX_VALUE)).isTrue();
+ assertThat(s.contains(Long.MIN_VALUE)).isFalse();
+ s.add(Long.MIN_VALUE);
+ assertThat(s.contains(Long.MIN_VALUE)).isTrue();
+ }
+
+ @Test
+ public void addMethodOverflowsWhenInternalAddThrowsIllegalArgumentException() {
+ int testEntries = 1000;
+ int magicInt = testEntries + 1;
+ long magicLong = 0x00000000FFFFFFFFL + testEntries + 1;
+
+ Answer<Void> answer = invocationOnMock -> {
+ Number value = invocationOnMock.getArgument(0);
+ if ((value.intValue() == magicInt) || (value.longValue() == magicLong)) {
+ throw new IllegalArgumentException(
+ "Too large (XXXXXXXX expected elements with load factor Y.YY)");
+ }
+ invocationOnMock.callRealMethod();
+ return null;
+ };
+
+ IntOpenHashSet intOpenHashSet = spy(IntOpenHashSet.class);
+ doAnswer(answer).when(intOpenHashSet).add(anyInt());
+ LongOpenHashSet longOpenHashSet = spy(LongOpenHashSet.class);
+ doAnswer(answer).when(longOpenHashSet).add(anyLong());
+ List<IntOpenHashSet> intOpenHashSets =
+ new ArrayList<>(Collections.singletonList(intOpenHashSet));
+ List<LongOpenHashSet> longOpenHashSets =
+ new ArrayList<>(Collections.singletonList(longOpenHashSet));
+ OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intOpenHashSets, longOpenHashSets);
+
+ // Insert some entries.
+ assertThat(intOpenHashSets).hasSize(1);
+ assertThat(longOpenHashSets).hasSize(1);
+ IntStream.range(1, testEntries).forEach(oplogEntryIdSet::add);
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+ .forEach(oplogEntryIdSet::add);
+
+ // Insert an entry that would cause an overflow for ints and longs.
+ oplogEntryIdSet.add(magicInt);
+ oplogEntryIdSet.add(magicLong);
+
+ // Entries should exist and no exception should be thrown (even those that caused the exception)
+ assertThat(intOpenHashSets).hasSize(2);
+ assertThat(longOpenHashSets).hasSize(2);
+ IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+ .forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+ assertThat(oplogEntryIdSet.contains(magicInt)).isTrue();
+ assertThat(oplogEntryIdSet.contains(magicLong)).isTrue();
+ }
+
+ @Test
+ public void sizeShouldIncludeOverflownSets() {
+ int testEntries = 1000;
+ List<IntOpenHashSet> intHashSets = new ArrayList<>();
+ List<LongOpenHashSet> longHashSets = new ArrayList<>();
+
+ IntStream.range(1, testEntries + 1).forEach(value -> {
+ IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
+ intOpenHashSet.add(value);
+ intHashSets.add(intOpenHashSet);
+ });
+
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries + 1)
+ .forEach(value -> {
+ LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
+ longOpenHashSet.add(value);
+ longHashSets.add(longOpenHashSet);
+ });
+
+ OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);
+ assertThat(oplogEntryIdSet.size()).isEqualTo(testEntries * 2);
+ }
+
+ @Test
+ public void containsShouldSearchAcrossOverflownSets() {
+ int testEntries = 1000;
+ List<IntOpenHashSet> intHashSets = new ArrayList<>();
+ List<LongOpenHashSet> longHashSets = new ArrayList<>();
+
+ IntStream.range(1, testEntries).forEach(value -> {
+ IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
+ intOpenHashSet.add(value);
+ intHashSets.add(intOpenHashSet);
+ });
+
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries).forEach(value -> {
+ LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
+ longOpenHashSet.add(value);
+ longHashSets.add(longOpenHashSet);
+ });
+
+ OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);
+
+ // All entries should be searchable across overflown sets
+ IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+ LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+ .forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+ }
+}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
deleted file mode 100644
index bf57a1f..0000000
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.wan;
-
-import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
-import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.apache.geode.test.dunit.VM.getVM;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.io.File;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import junitparams.JUnitParamsRunner;
-import junitparams.Parameters;
-import junitparams.naming.TestCaseName;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.geode.cache.DiskStore;
-import org.apache.geode.cache.DiskStoreFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.wan.GatewayReceiverFactory;
-import org.apache.geode.cache.wan.GatewaySender;
-import org.apache.geode.cache.wan.GatewaySenderFactory;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.RegionQueue;
-import org.apache.geode.internal.cache.persistence.OplogType;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
-import org.apache.geode.test.dunit.rules.DistributedRule;
-import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
-
-/**
- * Tests to verify WAN functionality when the gateway-sender(s) have isolated, non-shared with
- * other region(s), disk-store(s).
- */
-@RunWith(JUnitParamsRunner.class)
-public class PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest implements Serializable {
- private static final String REGION_NAME = "TestRegion";
- private static final String DISK_STORE_ID = "testDisk";
- private static final String GATEWAY_SENDER_ID = "testSender";
- private static final String TEST_CASE_NAME = "[{index}] {method}(RegionType:{0}, Parallel:{1})";
- private int site1Port, site2Port;
- private VM serverCluster1, serverCluster2;
-
- @Rule
- public CacheRule cacheRule = new CacheRule();
-
- @Rule
- public DistributedRule distributedRule = new DistributedRule();
-
- @Rule
- public SerializableTestName testName = new SerializableTestName();
-
- @Rule
- public DistributedDiskDirRule distributedDiskDirRule = new DistributedDiskDirRule();
-
- private Properties createLocatorConfiguration(int distributedSystemId, int localLocatorPort,
- int remoteLocatorPort) {
- Properties config = new Properties();
- config.setProperty(MCAST_PORT, "0");
- config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(distributedSystemId));
- config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
- config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + ']');
- config.setProperty(START_LOCATOR,
- "localhost[" + localLocatorPort + "],server=true,peer=true,hostname-for-clients=localhost");
-
- return config;
- }
-
- private Properties createServerConfiguration(int localLocatorPort) {
- Properties config = new Properties();
- config.setProperty(MCAST_PORT, "0");
- config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
-
- return config;
- }
-
- private void createDiskStore() {
- String basePath = distributedDiskDirRule.getDiskDir().getAbsolutePath();
- File diskDirectory = new File(basePath + File.separator + DISK_STORE_ID);
- DiskStoreFactory diskStoreFactory = cacheRule.getCache().createDiskStoreFactory();
- diskStoreFactory.setAutoCompact(true);
- diskStoreFactory.setAllowForceCompaction(true);
- diskStoreFactory.setDiskDirs(new File[] {diskDirectory});
- diskStoreFactory.create(DISK_STORE_ID);
- }
-
- private void createRegion(RegionShortcut regionShortcut) {
- cacheRule.getCache()
- .<String, String>createRegionFactory(regionShortcut)
- .create(REGION_NAME);
- }
-
- private void createGatewayReceiver() {
- GatewayReceiverFactory gatewayReceiverFactory =
- cacheRule.getCache().createGatewayReceiverFactory();
- gatewayReceiverFactory.setManualStart(false);
- gatewayReceiverFactory.create();
- }
-
- private void createGatewaySender(boolean parallel, int remoteDistributedSystemId) {
- GatewaySenderFactory gatewaySenderFactory = cacheRule.getCache().createGatewaySenderFactory();
- gatewaySenderFactory.setParallel(parallel);
- gatewaySenderFactory.setDiskSynchronous(true);
- gatewaySenderFactory.setPersistenceEnabled(true);
- gatewaySenderFactory.setDiskStoreName(DISK_STORE_ID);
- gatewaySenderFactory.create(GATEWAY_SENDER_ID, remoteDistributedSystemId);
- }
-
- private void createServerWithRegionAndGatewayReceiver(RegionShortcut regionShortcut) {
- createGatewayReceiver();
- createRegion(regionShortcut);
- }
-
- private void createServerWithRegionAndPersistentGatewaySender(RegionShortcut regionShortcut,
- int remoteDistributedSystemId, boolean parallel) {
- createDiskStore();
- createRegion(regionShortcut);
- createGatewaySender(parallel, remoteDistributedSystemId);
- Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
- region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
- }
-
- private void gracefullyDisconnect() {
- InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
- InternalDistributedSystem.getConnectedInstance().disconnect();
- await()
- .untilAsserted(() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
- }
-
- private void awaitForQueueSize(int queueSize) {
- GatewaySender gatewaySender = cacheRule.getCache().getGatewaySender(GATEWAY_SENDER_ID);
- await().untilAsserted(() -> {
- Set<RegionQueue> queues = ((AbstractGatewaySender) gatewaySender).getQueues();
- int totalSize = queues.stream().mapToInt(RegionQueue::size).sum();
- assertThat(queueSize).isEqualTo(totalSize);
- });
- }
-
- @SuppressWarnings("unused")
- static Object[] regionAndGatewayTypes() {
- ArrayList<Object[]> parameters = new ArrayList<>();
- parameters.add(new Object[] {RegionShortcut.PARTITION, true});
- parameters.add(new Object[] {RegionShortcut.PARTITION, false});
- parameters.add(new Object[] {RegionShortcut.REPLICATE, false});
-
- return parameters.toArray();
- }
-
- @Before
- public void setUp() {
- VM locatorCluster1 = getVM(0);
- serverCluster1 = getVM(1);
- VM locatorCluster2 = getVM(2);
- serverCluster2 = getVM(3);
-
- int[] ports = getRandomAvailableTCPPortsForDUnitSite(2);
- site1Port = ports[0];
- site2Port = ports[1];
-
- // Start 2 sites, one locator and one server per site.
- locatorCluster1
- .invoke(() -> cacheRule.createCache(createLocatorConfiguration(1, site1Port, site2Port)));
- locatorCluster2
- .invoke(() -> cacheRule.createCache(createLocatorConfiguration(2, site2Port, site1Port)));
-
- serverCluster1.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
- serverCluster2.invoke(() -> cacheRule.createCache(createServerConfiguration(site2Port)));
- }
-
- /**
- * The tests executes the following:
- * - Creates region and gateway-receiver on cluster2.
- * - Creates the region and gateway-sender on cluster1.
- * - Populates the region and waits until WAN replication has finished.
- * - Restarts server on cluster1, and stops it afterwards (the initial compaction occurs during
- * startup and the disk validation is done offline).
- * - Asserts that there are no orphaned drf files, neither compact-able records on the disks-tore.
- */
- @Test
- @TestCaseName(TEST_CASE_NAME)
- @Parameters(method = "regionAndGatewayTypes")
- public void diskStoreShouldBeCompactedOnMemberRestartWhenAllEventsHaveBeenDispatched(
- RegionShortcut regionShortcut, boolean parallel) throws Exception {
- final int entries = 100;
-
- // Create Region and Receiver on Cluster2
- serverCluster2.invoke(() -> createServerWithRegionAndGatewayReceiver(regionShortcut));
-
- // Create Region, DiskStore and Gateway on Cluster1
- String diskStorePath = serverCluster1.invoke(() -> {
- createServerWithRegionAndPersistentGatewaySender(regionShortcut, 2, parallel);
- DiskStore diskStore = cacheRule.getCache().findDiskStore(DISK_STORE_ID);
- Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
-
- // Insert entries and wait for WAN replication to finish.
- IntStream.range(0, entries).forEach(value -> region.put("Key" + value, "Value" + value));
- awaitForQueueSize(0);
-
- return diskStore.getDiskDirs()[0].getAbsolutePath();
- });
-
- // Wait for Cluster2 to receive all events.
- serverCluster2.invoke(() -> await().untilAsserted(
- () -> assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(entries)));
-
- // Restart and Stop Server on Cluster1
- serverCluster1.invoke(() -> {
- gracefullyDisconnect();
- cacheRule.createCache(createServerConfiguration(site1Port));
- createServerWithRegionAndPersistentGatewaySender(regionShortcut, 2, parallel);
- gracefullyDisconnect();
- });
-
- // There should be no orphaned drf files, neither compact-able records on the disk-store.
- File gatewayDiskStore = new File(diskStorePath);
- assertThat(gatewayDiskStore.list())
- .hasSize(3)
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + ".if")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.drf")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.crf");
-
- DiskStore diskStore =
- DiskStoreImpl.offlineValidate(DISK_STORE_ID, new File[] {gatewayDiskStore});
- assertThat(((DiskStoreImpl) diskStore).getLiveEntryCount()).isEqualTo(0);
- assertThat(((DiskStoreImpl) diskStore).getDeadRecordCount()).isEqualTo(0);
- }
-
- /**
- * The tests executes the following:
- * - Creates the region and a gateway-sender on cluster2.
- * - Populates the region and waits until all events have been enqueued.
- * - Restarts server on cluster2 and stops it afterwards (the initial compaction occurs during
- * startup and the validation is done offline).
- * - Verifies that there are no orphaned files neither compact-able records on the disk-store.
- * - Creates the region and a gateway-receiver on cluster1.
- * - Starts server on cluster2 again and waits for WAN replication to finish.
- * - Restart server on cluster2, and stop it afterwards (the initial compaction occurs during
- * startup and the validation is done offline).
- * - Asserts that there are no orphaned drf files, neither compact-able records on the disks-tore.
- */
- @Test
- @TestCaseName(TEST_CASE_NAME)
- @Parameters(method = "regionAndGatewayTypes")
- public void diskStoreShouldNotBeCompactedOnMemberRestartWhenThereAreNonDispatchedEventsInTheQueue(
- RegionShortcut regionShortcut, boolean parallel) throws Exception {
- final int entries = 1000;
-
- // Create Region, DiskStore and Gateway on Cluster2
- String diskStorePath = serverCluster2.invoke(() -> {
- createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
- DiskStore diskStore = cacheRule.getCache().findDiskStore(DISK_STORE_ID);
- Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
- region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
-
- // Insert entries and wait for all events enqueued.
- IntStream.range(0, entries).forEach(value -> region.put("Key" + value, "Value" + value));
- awaitForQueueSize(entries);
-
- return diskStore.getDiskDirs()[0].getAbsolutePath();
- });
-
- // Restart Server on Cluster2
- serverCluster2.invoke(() -> {
- gracefullyDisconnect();
- cacheRule.createCache(createServerConfiguration(site2Port));
- createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
- gracefullyDisconnect();
- });
-
- // Assert Disk Store status.
- File gatewayDiskStore = new File(diskStorePath);
- assertThat(gatewayDiskStore.list())
- .hasSize(6)
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + ".if")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_1.krf")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_1.drf")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_1.crf")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.drf")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.crf");
- DiskStore diskStore =
- DiskStoreImpl.offlineValidate(DISK_STORE_ID, new File[] {gatewayDiskStore});
- assertThat(((DiskStoreImpl) diskStore).getDeadRecordCount()).isEqualTo(0);
- assertThat(((DiskStoreImpl) diskStore).getLiveEntryCount()).isEqualTo(entries);
-
- // Create Region and Receiver on Cluster1
- serverCluster1.invoke(() -> createServerWithRegionAndGatewayReceiver(regionShortcut));
-
- // Start Server on Cluster2
- serverCluster2.invoke(() -> {
- cacheRule.createCache(createServerConfiguration(site2Port));
- createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
-
- // Await for WAN replication to finish.
- awaitForQueueSize(0);
- });
-
- // Wait for Cluster1 to receive all events.
- serverCluster1.invoke(() -> await().untilAsserted(
- () -> assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(entries)));
-
- // Restart and stop Server on Cluster2
- serverCluster2.invoke(() -> {
- gracefullyDisconnect();
- cacheRule.createCache(createServerConfiguration(site2Port));
- createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
- gracefullyDisconnect();
- });
-
- // Assert Disk Store status.
- assertThat(gatewayDiskStore.list())
- .hasSize(3)
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + ".if")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_4.drf")
- .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_4.crf");
- DiskStore diskStoreFinal =
- DiskStoreImpl.offlineValidate(DISK_STORE_ID, new File[] {gatewayDiskStore});
- assertThat(((DiskStoreImpl) diskStoreFinal).getLiveEntryCount()).isEqualTo(0);
- assertThat(((DiskStoreImpl) diskStoreFinal).getDeadRecordCount()).isEqualTo(0);
- }
-}