You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@geode.apache.org by GitBox <gi...@apache.org> on 2020/04/21 08:28:49 UTC

[GitHub] [geode] jujoramos commented on a change in pull request #4970: GEODE-7676: Add PR clear with expiration tests

jujoramos commented on a change in pull request #4970:
URL: https://github.com/apache/geode/pull/4970#discussion_r411981569



##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.cache.ExpirationAction.DESTROY;
+import static org.apache.geode.cache.ExpirationAction.INVALIDATE;
+import static org.apache.geode.cache.RegionShortcut.PARTITION;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_OVERFLOW;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT_OVERFLOW;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+import static org.apache.geode.internal.util.ArrayUtils.asList;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.util.CacheWriterAdapter;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+/**
+ * Tests to verify that {@link PartitionedRegion#clear()} cancels all remaining expiration tasks
+ * on the {@link PartitionedRegion} once the operation is executed.
+ */
+@RunWith(JUnitParamsRunner.class)
+public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable {
+  private static final Integer BUCKETS = 13;
+  private static final Integer EXPIRATION_TIME = 30;
+  private static final String REGION_NAME = "PartitionedRegion";
+  private static final String TEST_CASE_NAME =
+      "[{index}] {method}(Coordinator:{0}, RegionType:{1}, ExpirationAction:{2})";
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule(3);
+
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+
+  @Rule
+  public DistributedDiskDirRule distributedDiskDirRule = new DistributedDiskDirRule();
+
+  private VM accessor, server1, server2;
+
+  private enum TestVM {
+    ACCESSOR(0), SERVER1(1), SERVER2(2);
+
+    final int vmNumber;
+
+    TestVM(int vmNumber) {
+      this.vmNumber = vmNumber;
+    }
+  }
+
+  @SuppressWarnings("unused")
+  static RegionShortcut[] regionTypes() {
+    return new RegionShortcut[] {
+        PARTITION,
+        PARTITION_OVERFLOW,
+        PARTITION_REDUNDANT,
+        PARTITION_REDUNDANT_OVERFLOW,
+
+        PARTITION_PERSISTENT,
+        PARTITION_PERSISTENT_OVERFLOW,
+        PARTITION_REDUNDANT_PERSISTENT,
+        PARTITION_REDUNDANT_PERSISTENT_OVERFLOW
+    };
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] regionTypesAndExpirationActions() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    RegionShortcut[] regionShortcuts = regionTypes();
+
+    Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
+      parameters.add(new Object[] {regionShortcut, DESTROY});
+      parameters.add(new Object[] {regionShortcut, INVALIDATE});
+    });
+
+    return parameters.toArray();
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] vmsRegionTypesAndExpirationActions() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    RegionShortcut[] regionShortcuts = regionTypes();
+
+    Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
+      parameters.add(new Object[] {TestVM.SERVER1, regionShortcut, DESTROY});
+      parameters.add(new Object[] {TestVM.SERVER1, regionShortcut, INVALIDATE});
+      parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut, DESTROY});
+      parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut, INVALIDATE});
+    });
+
+    return parameters.toArray();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    server1 = getVM(TestVM.SERVER1.vmNumber);
+    server2 = getVM(TestVM.SERVER2.vmNumber);
+    accessor = getVM(TestVM.ACCESSOR.vmNumber);
+  }
+
+  private RegionShortcut getRegionAccessorShortcut(RegionShortcut dataStoreRegionShortcut) {
+    if (dataStoreRegionShortcut.isPersistent()) {
+      switch (dataStoreRegionShortcut) {
+        case PARTITION_PERSISTENT:
+          return PARTITION;
+        case PARTITION_PERSISTENT_OVERFLOW:
+          return PARTITION_OVERFLOW;
+        case PARTITION_REDUNDANT_PERSISTENT:
+          return PARTITION_REDUNDANT;
+        case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW:
+          return PARTITION_REDUNDANT_OVERFLOW;
+      }
+    }
+
+    return dataStoreRegionShortcut;
+  }
+
+  private void initAccessor(RegionShortcut regionShortcut,
+      ExpirationAttributes expirationAttributes) {
+    RegionShortcut accessorShortcut = getRegionAccessorShortcut(regionShortcut);
+    PartitionAttributes<String, String> attributes =
+        new PartitionAttributesFactory<String, String>()
+            .setTotalNumBuckets(BUCKETS)
+            .setLocalMaxMemory(0)
+            .create();
+
+    cacheRule.getCache()
+        .<String, String>createRegionFactory(accessorShortcut)
+        .setPartitionAttributes(attributes)
+        .setEntryTimeToLive(expirationAttributes)
+        .setEntryIdleTimeout(expirationAttributes)
+        .create(REGION_NAME);
+  }
+
+  private void initDataStore(RegionShortcut regionShortcut,
+      ExpirationAttributes expirationAttributes) {
+    PartitionAttributes<String, String> attributes =
+        new PartitionAttributesFactory<String, String>()
+            .setTotalNumBuckets(BUCKETS)
+            .create();
+
+    cacheRule.getCache()
+        .<String, String>createRegionFactory(regionShortcut)
+        .setPartitionAttributes(attributes)
+        .setEntryTimeToLive(expirationAttributes)
+        .setEntryIdleTimeout(expirationAttributes)
+        .create(REGION_NAME);
+
+    ExpiryTask.expiryTaskListener = new ExpirationListener();
+  }
+
+  private void parametrizedSetup(RegionShortcut regionShortcut,
+      ExpirationAttributes expirationAttributes) {
+    server1.invoke(() -> initDataStore(regionShortcut, expirationAttributes));
+    server2.invoke(() -> initDataStore(regionShortcut, expirationAttributes));
+    accessor.invoke(() -> initAccessor(regionShortcut, expirationAttributes));
+  }
+
+  private void waitForSilence() {
+    DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats();
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+    PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+
+    await().untilAsserted(() -> {
+      assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
+    });
+  }
+
+  /**
+   * Populates the region and verifies the data on the selected VMs.
+   */
+  private void populateRegion(VM feeder, int entryCount, List<VM> vms) {
+    feeder.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+      IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
+    });
+
+    vms.forEach(vm -> vm.invoke(() -> {
+      waitForSilence();
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+
+      IntStream.range(0, entryCount)
+          .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i));
+    }));
+  }
+
+  /**
+   * Asserts that the region is empty on requested VMs.
+   */
+  private void assertRegionIsEmpty(List<VM> vms) {
+    vms.forEach(vm -> vm.invoke(() -> {
+      waitForSilence();
+      PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+
+      assertThat(region.getLocalSize()).isEqualTo(0);
+    }));
+  }
+
+  /**
+   * Asserts that the region data is consistent across buckets.
+   */
+  private void assertRegionBucketsConsistency() throws ForceReattemptException {
+    waitForSilence();
+    List<BucketDump> bucketDumps;
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+    // Redundant copies + 1 primary.
+    int expectedCopies = region.getRedundantCopies() + 1;
+
+    for (int bucketId = 0; bucketId < BUCKETS; bucketId++) {
+      bucketDumps = region.getAllBucketEntries(bucketId);
+      assertThat(bucketDumps.size()).as("Bucket " + bucketId + " should have " + expectedCopies
+          + " copies, but has " + bucketDumps.size()).isEqualTo(expectedCopies);
+
+      // Check that all copies of the bucket have the same data.
+      if (bucketDumps.size() > 1) {
+        BucketDump firstDump = bucketDumps.get(0);
+
+        for (int j = 1; j < bucketDumps.size(); j++) {
+          BucketDump otherDump = bucketDumps.get(j);
+          assertThat(otherDump.getValues())
+              .as("Values for bucket " + bucketId + " on member " + otherDump.getMember()
+                  + " are not consistent with member " + firstDump.getMember())
+              .isEqualTo(firstDump.getValues());
+          assertThat(otherDump.getVersions())
+              .as("Versions for bucket " + bucketId + " on member " + otherDump.getMember()
+                  + " are not consistent with member " + firstDump.getMember())
+              .isEqualTo(firstDump.getVersions());
+        }
+      }
+    }
+  }
+
+  /**
+   * Register the MemberKiller CacheWriter on the given vms and cancel auto-reconnects.

Review comment:
       Good catch!. I added the "disable auto-reconnect" part when developing the tests but I realised I actually want the member to reconnect afterwards, forgot to update the comments. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org