You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jj...@apache.org on 2020/07/03 16:19:40 UTC

[geode] branch support/1.12 updated: GEODE-8029: Allow OplogEntryIdSet to Overflow (#5329) (#5337)

This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 4be10d6  GEODE-8029: Allow OplogEntryIdSet to Overflow (#5329) (#5337)
4be10d6 is described below

commit 4be10d6a2892cdad7f42ae32f34e0863149f342c
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Fri Jul 3 17:18:58 2020 +0100

    GEODE-8029: Allow OplogEntryIdSet to Overflow (#5329) (#5337)
    
    Do not delete drf files during member startup as that should be only
    done by the compactor thread. Instead, allow the OplogEntryIdSet to
    grow over the default capacity and log a warning message instructing
    the user to manually compact the disk-stores.
    
    - Added unit tests.
    - Replaced usages of 'junit.Assert' by 'assertj'.
    - Modified DiskStoreImpl.deadRecordCount to return long instead of int.
    - Added internal overflow implementation to the OplogEntryIdSet so it can
      grow above the default limit.
    
    (cherry picked from commit fdc440131f0d562d97f2340d2e7ba5aacf935d62)
---
 .../apache/geode/internal/cache/DiskStoreImpl.java |  83 ++++-
 .../org/apache/geode/internal/cache/Oplog.java     |  11 +-
 .../internal/cache/OplogEntryIdSetJUnitTest.java   |  74 -----
 .../geode/internal/cache/OplogEntryIdSetTest.java  | 169 ++++++++++
 ...SenderWithIsolatedDiskStoreDistributedTest.java | 348 ---------------------
 5 files changed, 239 insertions(+), 446 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 4bb870c..f12a513 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -74,6 +74,7 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.annotations.internal.MutableForTesting;
 import org.apache.geode.cache.Cache;
@@ -3516,33 +3517,85 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   /**
-   * Set of OplogEntryIds (longs). Memory is optimized by using an int[] for ids in the unsigned int
-   * range.
+   * Set of OplogEntryIds (longs).
+   * Memory is optimized by using an int[] for ids in the unsigned int range.
+   * By default we can't have more than 805306401 ids for a load factor of 0.75, the internal lists
+   * are used to overcome this limit, allowing the disk-store to recover successfully (the internal
+   * class is **only** used during recovery to read all deleted entries).
    */
   static class OplogEntryIdSet {
-    private final IntOpenHashSet ints = new IntOpenHashSet((int) INVALID_ID);
-    private final LongOpenHashSet longs = new LongOpenHashSet((int) INVALID_ID);
+    private final List<IntOpenHashSet> allInts;
+    private final List<LongOpenHashSet> allLongs;
+    private final AtomicReference<IntOpenHashSet> currentInts;
+    private final AtomicReference<LongOpenHashSet> currentLongs;
+
+    // For testing purposes only.
+    @VisibleForTesting
+    OplogEntryIdSet(List<IntOpenHashSet> allInts, List<LongOpenHashSet> allLongs) {
+      this.allInts = allInts;
+      this.currentInts = new AtomicReference<>(this.allInts.get(0));
+
+      this.allLongs = allLongs;
+      this.currentLongs = new AtomicReference<>(this.allLongs.get(0));
+    }
+
+    public OplogEntryIdSet() {
+      IntOpenHashSet intHashSet = new IntOpenHashSet((int) INVALID_ID);
+      this.allInts = new ArrayList<>();
+      this.allInts.add(intHashSet);
+      this.currentInts = new AtomicReference<>(intHashSet);
+
+      LongOpenHashSet longHashSet = new LongOpenHashSet((int) INVALID_ID);
+      this.allLongs = new ArrayList<>();
+      this.allLongs.add(longHashSet);
+      this.currentLongs = new AtomicReference<>(longHashSet);
+    }
 
     public void add(long id) {
       if (id == 0) {
         throw new IllegalArgumentException();
-      } else if (id > 0 && id <= 0x00000000FFFFFFFFL) {
-        this.ints.add((int) id);
-      } else {
-        this.longs.add(id);
+      }
+
+      try {
+        if (id > 0 && id <= 0x00000000FFFFFFFFL) {
+          this.currentInts.get().add((int) id);
+        } else {
+          this.currentLongs.get().add(id);
+        }
+      } catch (IllegalArgumentException illegalArgumentException) {
+        // See GEODE-8029.
+        // Too many entries on the accumulated drf files, overflow and continue.
+        logger.warn(
+            "There is a large number of deleted entries within the disk-store, please execute an offline compaction.");
+
+        // Overflow to the next [Int|Long]OpenHashSet and continue.
+        if (id > 0 && id <= 0x00000000FFFFFFFFL) {
+          IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
+          allInts.add(overflownHashSet);
+          currentInts.set(overflownHashSet);
+
+          currentInts.get().add((int) id);
+        } else {
+          LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
+          allLongs.add(overflownHashSet);
+          currentLongs.set(overflownHashSet);
+
+          currentLongs.get().add(id);
+        }
       }
     }
 
     public boolean contains(long id) {
       if (id >= 0 && id <= 0x00000000FFFFFFFFL) {
-        return this.ints.contains((int) id);
+        return allInts.stream().anyMatch(ints -> ints.contains((int) id));
       } else {
-        return this.longs.contains(id);
+        return allLongs.stream().anyMatch(longs -> longs.contains(id));
       }
     }
 
-    public int size() {
-      return this.ints.size() + this.longs.size();
+    public long size() {
+      return allInts.stream().mapToInt(IntOpenHashSet::size).sum()
+          + allLongs.stream().mapToInt(LongOpenHashSet::size).sum();
     }
   }
 
@@ -3972,13 +4025,13 @@ public class DiskStoreImpl implements DiskStore {
     return this.liveEntryCount;
   }
 
-  private int deadRecordCount;
+  private long deadRecordCount;
 
-  void incDeadRecordCount(int count) {
+  void incDeadRecordCount(long count) {
     this.deadRecordCount += count;
   }
 
-  public int getDeadRecordCount() {
+  public long getDeadRecordCount() {
     return this.deadRecordCount;
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index bb80a06..8115365 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -937,17 +937,10 @@ public class Oplog implements CompactableOplog, Flushable {
         // this.crf.raf.seek(this.crf.currSize);
       } else if (!offline) {
         // drf exists but crf has been deleted (because it was empty).
+        // I don't think the drf needs to be opened. It is only used during recovery.
+        // At some point the compacter may identify that it can be deleted.
         this.crf.RAFClosed = true;
         deleteCRF();
-
-        // The drf file needs to be deleted (see GEODE-8029).
-        // If compaction is not enabled, or if the compaction-threshold is never reached, there
-        // will be orphaned drf files that are not automatically deleted (unless a manual
-        // compaction is executed), in which case a later recovery might fail when the amount of
-        // deleted records is too high (805306401).
-        setHasDeletes(false);
-        deleteDRF();
-
         this.closed = true;
         this.deleted.set(true);
       }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetJUnitTest.java
deleted file mode 100644
index 5b57803..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetJUnitTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;
-
-/**
- * Tests DiskStoreImpl.OplogEntryIdSet
- */
-public class OplogEntryIdSetJUnitTest {
-
-  @Test
-  public void testBasics() {
-    OplogEntryIdSet s = new OplogEntryIdSet();
-    for (long i = 1; i < 777777; i++) {
-      assertEquals(false, s.contains(i));
-    }
-    for (long i = 1; i < 777777; i++) {
-      s.add(i);
-    }
-    for (long i = 1; i < 777777; i++) {
-      assertEquals(true, s.contains(i));
-    }
-
-    try {
-      s.add(DiskStoreImpl.INVALID_ID);
-      fail("expected IllegalArgumentException");
-    } catch (IllegalArgumentException expected) {
-    }
-    assertEquals(false, s.contains(0));
-
-    assertEquals(false, s.contains(0x00000000FFFFFFFFL));
-    s.add(0x00000000FFFFFFFFL);
-    assertEquals(true, s.contains(0x00000000FFFFFFFFL));
-
-    for (long i = 0x00000000FFFFFFFFL + 1; i < 0x00000000FFFFFFFFL + 777777; i++) {
-      assertEquals(false, s.contains(i));
-    }
-    for (long i = 0x00000000FFFFFFFFL + 1; i < 0x00000000FFFFFFFFL + 777777; i++) {
-      s.add(i);
-    }
-    for (long i = 0x00000000FFFFFFFFL + 1; i < 0x00000000FFFFFFFFL + 777777; i++) {
-      assertEquals(true, s.contains(i));
-    }
-
-    for (long i = 1; i < 777777; i++) {
-      assertEquals(true, s.contains(i));
-    }
-
-    assertEquals(false, s.contains(Long.MAX_VALUE));
-    s.add(Long.MAX_VALUE);
-    assertEquals(true, s.contains(Long.MAX_VALUE));
-    assertEquals(false, s.contains(Long.MIN_VALUE));
-    s.add(Long.MIN_VALUE);
-    assertEquals(true, s.contains(Long.MIN_VALUE));
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetTest.java
new file mode 100644
index 0000000..53ded13
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;
+
+/**
+ * Tests DiskStoreImpl.OplogEntryIdSet
+ */
+public class OplogEntryIdSetTest {
+
+  @Test
+  public void testBasics() {
+    OplogEntryIdSet s = new OplogEntryIdSet();
+
+    LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isFalse());
+    LongStream.range(1, 777777).forEach(s::add);
+    LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());
+
+    assertThatThrownBy(() -> s.add(DiskStoreImpl.INVALID_ID))
+        .isInstanceOf(IllegalArgumentException.class);
+    assertThat(s.contains(0)).isFalse();
+
+    assertThat(s.contains(0x00000000FFFFFFFFL)).isFalse();
+    s.add(0x00000000FFFFFFFFL);
+    assertThat(s.contains(0x00000000FFFFFFFFL)).isTrue();
+
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
+        .forEach(i -> assertThat(s.contains(i)).isFalse());
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777).forEach(s::add);
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
+        .forEach(i -> assertThat(s.contains(i)).isTrue());
+
+    LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());
+
+    assertThat(s.contains(Long.MAX_VALUE)).isFalse();
+    s.add(Long.MAX_VALUE);
+    assertThat(s.contains(Long.MAX_VALUE)).isTrue();
+    assertThat(s.contains(Long.MIN_VALUE)).isFalse();
+    s.add(Long.MIN_VALUE);
+    assertThat(s.contains(Long.MIN_VALUE)).isTrue();
+  }
+
+  @Test
+  public void addMethodOverflowsWhenInternalAddThrowsIllegalArgumentException() {
+    int testEntries = 1000;
+    int magicInt = testEntries + 1;
+    long magicLong = 0x00000000FFFFFFFFL + testEntries + 1;
+
+    Answer<Void> answer = invocationOnMock -> {
+      Number value = invocationOnMock.getArgument(0);
+      if ((value.intValue() == magicInt) || (value.longValue() == magicLong)) {
+        throw new IllegalArgumentException(
+            "Too large (XXXXXXXX expected elements with load factor Y.YY)");
+      }
+      invocationOnMock.callRealMethod();
+      return null;
+    };
+
+    IntOpenHashSet intOpenHashSet = spy(IntOpenHashSet.class);
+    doAnswer(answer).when(intOpenHashSet).add(anyInt());
+    LongOpenHashSet longOpenHashSet = spy(LongOpenHashSet.class);
+    doAnswer(answer).when(longOpenHashSet).add(anyLong());
+    List<IntOpenHashSet> intOpenHashSets =
+        new ArrayList<>(Collections.singletonList(intOpenHashSet));
+    List<LongOpenHashSet> longOpenHashSets =
+        new ArrayList<>(Collections.singletonList(longOpenHashSet));
+    OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intOpenHashSets, longOpenHashSets);
+
+    // Insert some entries.
+    assertThat(intOpenHashSets).hasSize(1);
+    assertThat(longOpenHashSets).hasSize(1);
+    IntStream.range(1, testEntries).forEach(oplogEntryIdSet::add);
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+        .forEach(oplogEntryIdSet::add);
+
+    // Insert an entry that would cause an overflow for ints and longs.
+    oplogEntryIdSet.add(magicInt);
+    oplogEntryIdSet.add(magicLong);
+
+    // Entries should exist and no exception should be thrown (even those that caused the exception)
+    assertThat(intOpenHashSets).hasSize(2);
+    assertThat(longOpenHashSets).hasSize(2);
+    IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+        .forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+    assertThat(oplogEntryIdSet.contains(magicInt)).isTrue();
+    assertThat(oplogEntryIdSet.contains(magicLong)).isTrue();
+  }
+
+  @Test
+  public void sizeShouldIncludeOverflownSets() {
+    int testEntries = 1000;
+    List<IntOpenHashSet> intHashSets = new ArrayList<>();
+    List<LongOpenHashSet> longHashSets = new ArrayList<>();
+
+    IntStream.range(1, testEntries + 1).forEach(value -> {
+      IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
+      intOpenHashSet.add(value);
+      intHashSets.add(intOpenHashSet);
+    });
+
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries + 1)
+        .forEach(value -> {
+          LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
+          longOpenHashSet.add(value);
+          longHashSets.add(longOpenHashSet);
+        });
+
+    OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);
+    assertThat(oplogEntryIdSet.size()).isEqualTo(testEntries * 2);
+  }
+
+  @Test
+  public void containsShouldSearchAcrossOverflownSets() {
+    int testEntries = 1000;
+    List<IntOpenHashSet> intHashSets = new ArrayList<>();
+    List<LongOpenHashSet> longHashSets = new ArrayList<>();
+
+    IntStream.range(1, testEntries).forEach(value -> {
+      IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
+      intOpenHashSet.add(value);
+      intHashSets.add(intOpenHashSet);
+    });
+
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries).forEach(value -> {
+      LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
+      longOpenHashSet.add(value);
+      longHashSets.add(longOpenHashSet);
+    });
+
+    OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);
+
+    // All entries should be searchable across overflown sets
+    IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+        .forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+  }
+}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
deleted file mode 100644
index bf57a1f..0000000
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.wan;
-
-import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
-import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.apache.geode.test.dunit.VM.getVM;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.io.File;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import junitparams.JUnitParamsRunner;
-import junitparams.Parameters;
-import junitparams.naming.TestCaseName;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.geode.cache.DiskStore;
-import org.apache.geode.cache.DiskStoreFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.wan.GatewayReceiverFactory;
-import org.apache.geode.cache.wan.GatewaySender;
-import org.apache.geode.cache.wan.GatewaySenderFactory;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.RegionQueue;
-import org.apache.geode.internal.cache.persistence.OplogType;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
-import org.apache.geode.test.dunit.rules.DistributedRule;
-import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
-
-/**
- * Tests to verify WAN functionality when the gateway-sender(s) have isolated, non-shared with
- * other region(s), disk-store(s).
- */
-@RunWith(JUnitParamsRunner.class)
-public class PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest implements Serializable {
-  private static final String REGION_NAME = "TestRegion";
-  private static final String DISK_STORE_ID = "testDisk";
-  private static final String GATEWAY_SENDER_ID = "testSender";
-  private static final String TEST_CASE_NAME = "[{index}] {method}(RegionType:{0}, Parallel:{1})";
-  private int site1Port, site2Port;
-  private VM serverCluster1, serverCluster2;
-
-  @Rule
-  public CacheRule cacheRule = new CacheRule();
-
-  @Rule
-  public DistributedRule distributedRule = new DistributedRule();
-
-  @Rule
-  public SerializableTestName testName = new SerializableTestName();
-
-  @Rule
-  public DistributedDiskDirRule distributedDiskDirRule = new DistributedDiskDirRule();
-
-  private Properties createLocatorConfiguration(int distributedSystemId, int localLocatorPort,
-      int remoteLocatorPort) {
-    Properties config = new Properties();
-    config.setProperty(MCAST_PORT, "0");
-    config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(distributedSystemId));
-    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
-    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + ']');
-    config.setProperty(START_LOCATOR,
-        "localhost[" + localLocatorPort + "],server=true,peer=true,hostname-for-clients=localhost");
-
-    return config;
-  }
-
-  private Properties createServerConfiguration(int localLocatorPort) {
-    Properties config = new Properties();
-    config.setProperty(MCAST_PORT, "0");
-    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
-
-    return config;
-  }
-
-  private void createDiskStore() {
-    String basePath = distributedDiskDirRule.getDiskDir().getAbsolutePath();
-    File diskDirectory = new File(basePath + File.separator + DISK_STORE_ID);
-    DiskStoreFactory diskStoreFactory = cacheRule.getCache().createDiskStoreFactory();
-    diskStoreFactory.setAutoCompact(true);
-    diskStoreFactory.setAllowForceCompaction(true);
-    diskStoreFactory.setDiskDirs(new File[] {diskDirectory});
-    diskStoreFactory.create(DISK_STORE_ID);
-  }
-
-  private void createRegion(RegionShortcut regionShortcut) {
-    cacheRule.getCache()
-        .<String, String>createRegionFactory(regionShortcut)
-        .create(REGION_NAME);
-  }
-
-  private void createGatewayReceiver() {
-    GatewayReceiverFactory gatewayReceiverFactory =
-        cacheRule.getCache().createGatewayReceiverFactory();
-    gatewayReceiverFactory.setManualStart(false);
-    gatewayReceiverFactory.create();
-  }
-
-  private void createGatewaySender(boolean parallel, int remoteDistributedSystemId) {
-    GatewaySenderFactory gatewaySenderFactory = cacheRule.getCache().createGatewaySenderFactory();
-    gatewaySenderFactory.setParallel(parallel);
-    gatewaySenderFactory.setDiskSynchronous(true);
-    gatewaySenderFactory.setPersistenceEnabled(true);
-    gatewaySenderFactory.setDiskStoreName(DISK_STORE_ID);
-    gatewaySenderFactory.create(GATEWAY_SENDER_ID, remoteDistributedSystemId);
-  }
-
-  private void createServerWithRegionAndGatewayReceiver(RegionShortcut regionShortcut) {
-    createGatewayReceiver();
-    createRegion(regionShortcut);
-  }
-
-  private void createServerWithRegionAndPersistentGatewaySender(RegionShortcut regionShortcut,
-      int remoteDistributedSystemId, boolean parallel) {
-    createDiskStore();
-    createRegion(regionShortcut);
-    createGatewaySender(parallel, remoteDistributedSystemId);
-    Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
-    region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
-  }
-
-  private void gracefullyDisconnect() {
-    InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
-    InternalDistributedSystem.getConnectedInstance().disconnect();
-    await()
-        .untilAsserted(() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
-  }
-
-  private void awaitForQueueSize(int queueSize) {
-    GatewaySender gatewaySender = cacheRule.getCache().getGatewaySender(GATEWAY_SENDER_ID);
-    await().untilAsserted(() -> {
-      Set<RegionQueue> queues = ((AbstractGatewaySender) gatewaySender).getQueues();
-      int totalSize = queues.stream().mapToInt(RegionQueue::size).sum();
-      assertThat(queueSize).isEqualTo(totalSize);
-    });
-  }
-
-  @SuppressWarnings("unused")
-  static Object[] regionAndGatewayTypes() {
-    ArrayList<Object[]> parameters = new ArrayList<>();
-    parameters.add(new Object[] {RegionShortcut.PARTITION, true});
-    parameters.add(new Object[] {RegionShortcut.PARTITION, false});
-    parameters.add(new Object[] {RegionShortcut.REPLICATE, false});
-
-    return parameters.toArray();
-  }
-
-  @Before
-  public void setUp() {
-    VM locatorCluster1 = getVM(0);
-    serverCluster1 = getVM(1);
-    VM locatorCluster2 = getVM(2);
-    serverCluster2 = getVM(3);
-
-    int[] ports = getRandomAvailableTCPPortsForDUnitSite(2);
-    site1Port = ports[0];
-    site2Port = ports[1];
-
-    // Start 2 sites, one locator and one server per site.
-    locatorCluster1
-        .invoke(() -> cacheRule.createCache(createLocatorConfiguration(1, site1Port, site2Port)));
-    locatorCluster2
-        .invoke(() -> cacheRule.createCache(createLocatorConfiguration(2, site2Port, site1Port)));
-
-    serverCluster1.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
-    serverCluster2.invoke(() -> cacheRule.createCache(createServerConfiguration(site2Port)));
-  }
-
-  /**
-   * The tests executes the following:
-   * - Creates region and gateway-receiver on cluster2.
-   * - Creates the region and gateway-sender on cluster1.
-   * - Populates the region and waits until WAN replication has finished.
-   * - Restarts server on cluster1, and stops it afterwards (the initial compaction occurs during
-   * startup and the disk validation is done offline).
-   * - Asserts that there are no orphaned drf files, neither compact-able records on the disks-tore.
-   */
-  @Test
-  @TestCaseName(TEST_CASE_NAME)
-  @Parameters(method = "regionAndGatewayTypes")
-  public void diskStoreShouldBeCompactedOnMemberRestartWhenAllEventsHaveBeenDispatched(
-      RegionShortcut regionShortcut, boolean parallel) throws Exception {
-    final int entries = 100;
-
-    // Create Region and Receiver on Cluster2
-    serverCluster2.invoke(() -> createServerWithRegionAndGatewayReceiver(regionShortcut));
-
-    // Create Region, DiskStore and Gateway on Cluster1
-    String diskStorePath = serverCluster1.invoke(() -> {
-      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 2, parallel);
-      DiskStore diskStore = cacheRule.getCache().findDiskStore(DISK_STORE_ID);
-      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
-
-      // Insert entries and wait for WAN replication to finish.
-      IntStream.range(0, entries).forEach(value -> region.put("Key" + value, "Value" + value));
-      awaitForQueueSize(0);
-
-      return diskStore.getDiskDirs()[0].getAbsolutePath();
-    });
-
-    // Wait for Cluster2 to receive all events.
-    serverCluster2.invoke(() -> await().untilAsserted(
-        () -> assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(entries)));
-
-    // Restart and Stop Server on Cluster1
-    serverCluster1.invoke(() -> {
-      gracefullyDisconnect();
-      cacheRule.createCache(createServerConfiguration(site1Port));
-      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 2, parallel);
-      gracefullyDisconnect();
-    });
-
-    // There should be no orphaned drf files, neither compact-able records on the disk-store.
-    File gatewayDiskStore = new File(diskStorePath);
-    assertThat(gatewayDiskStore.list())
-        .hasSize(3)
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + ".if")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.drf")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.crf");
-
-    DiskStore diskStore =
-        DiskStoreImpl.offlineValidate(DISK_STORE_ID, new File[] {gatewayDiskStore});
-    assertThat(((DiskStoreImpl) diskStore).getLiveEntryCount()).isEqualTo(0);
-    assertThat(((DiskStoreImpl) diskStore).getDeadRecordCount()).isEqualTo(0);
-  }
-
-  /**
-   * The tests executes the following:
-   * - Creates the region and a gateway-sender on cluster2.
-   * - Populates the region and waits until all events have been enqueued.
-   * - Restarts server on cluster2 and stops it afterwards (the initial compaction occurs during
-   * startup and the validation is done offline).
-   * - Verifies that there are no orphaned files neither compact-able records on the disk-store.
-   * - Creates the region and a gateway-receiver on cluster1.
-   * - Starts server on cluster2 again and waits for WAN replication to finish.
-   * - Restart server on cluster2, and stop it afterwards (the initial compaction occurs during
-   * startup and the validation is done offline).
-   * - Asserts that there are no orphaned drf files, neither compact-able records on the disks-tore.
-   */
-  @Test
-  @TestCaseName(TEST_CASE_NAME)
-  @Parameters(method = "regionAndGatewayTypes")
-  public void diskStoreShouldNotBeCompactedOnMemberRestartWhenThereAreNonDispatchedEventsInTheQueue(
-      RegionShortcut regionShortcut, boolean parallel) throws Exception {
-    final int entries = 1000;
-
-    // Create Region, DiskStore and Gateway on Cluster2
-    String diskStorePath = serverCluster2.invoke(() -> {
-      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
-      DiskStore diskStore = cacheRule.getCache().findDiskStore(DISK_STORE_ID);
-      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
-      region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
-
-      // Insert entries and wait for all events enqueued.
-      IntStream.range(0, entries).forEach(value -> region.put("Key" + value, "Value" + value));
-      awaitForQueueSize(entries);
-
-      return diskStore.getDiskDirs()[0].getAbsolutePath();
-    });
-
-    // Restart Server on Cluster2
-    serverCluster2.invoke(() -> {
-      gracefullyDisconnect();
-      cacheRule.createCache(createServerConfiguration(site2Port));
-      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
-      gracefullyDisconnect();
-    });
-
-    // Assert Disk Store status.
-    File gatewayDiskStore = new File(diskStorePath);
-    assertThat(gatewayDiskStore.list())
-        .hasSize(6)
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + ".if")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_1.krf")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_1.drf")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_1.crf")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.drf")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.crf");
-    DiskStore diskStore =
-        DiskStoreImpl.offlineValidate(DISK_STORE_ID, new File[] {gatewayDiskStore});
-    assertThat(((DiskStoreImpl) diskStore).getDeadRecordCount()).isEqualTo(0);
-    assertThat(((DiskStoreImpl) diskStore).getLiveEntryCount()).isEqualTo(entries);
-
-    // Create Region and Receiver on Cluster1
-    serverCluster1.invoke(() -> createServerWithRegionAndGatewayReceiver(regionShortcut));
-
-    // Start Server on Cluster2
-    serverCluster2.invoke(() -> {
-      cacheRule.createCache(createServerConfiguration(site2Port));
-      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
-
-      // Await for WAN replication to finish.
-      awaitForQueueSize(0);
-    });
-
-    // Wait for Cluster1 to receive all events.
-    serverCluster1.invoke(() -> await().untilAsserted(
-        () -> assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(entries)));
-
-    // Restart and stop Server on Cluster2
-    serverCluster2.invoke(() -> {
-      gracefullyDisconnect();
-      cacheRule.createCache(createServerConfiguration(site2Port));
-      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 1, parallel);
-      gracefullyDisconnect();
-    });
-
-    // Assert Disk Store status.
-    assertThat(gatewayDiskStore.list())
-        .hasSize(3)
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + ".if")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_4.drf")
-        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_4.crf");
-    DiskStore diskStoreFinal =
-        DiskStoreImpl.offlineValidate(DISK_STORE_ID, new File[] {gatewayDiskStore});
-    assertThat(((DiskStoreImpl) diskStoreFinal).getLiveEntryCount()).isEqualTo(0);
-    assertThat(((DiskStoreImpl) diskStoreFinal).getDeadRecordCount()).isEqualTo(0);
-  }
-}