You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/28 19:14:39 UTC
[geode] 12/23: GEODE-7669 Test coverage for Partitioned Region
clear with Overflow enabled (#5189)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit db89a2ece3f96cfc5ad81da66b7f27ddb5f43836
Author: Jianxia Chen <11...@users.noreply.github.com>
AuthorDate: Thu Jun 4 11:39:04 2020 -0700
GEODE-7669 Test coverage for Partitioned Region clear with Overflow enabled (#5189)
Authored-by: Jianxia Chen <jc...@apache.org>
---
.../PartitionedRegionOverflowClearDUnitTest.java | 380 +++++++++++++++++++++
1 file changed, 380 insertions(+)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionOverflowClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionOverflowClearDUnitTest.java
new file mode 100644
index 0000000..c10d1db
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionOverflowClearDUnitTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.getVMId;
+import static org.apache.geode.test.dunit.VM.toArray;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+public class PartitionedRegionOverflowClearDUnitTest implements Serializable {
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule(5);
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ private VM locator;
+ private VM server1;
+ private VM server2;
+ private VM accessor;
+ private VM client;
+
+ private static final String LOCATOR_NAME = "locator";
+ private static final String SERVER1_NAME = "server1";
+ private static final String SERVER2_NAME = "server2";
+ private static final String SERVER3_NAME = "server3";
+
+ private File locatorDir;
+ private File server1Dir;
+ private File server2Dir;
+ private File server3Dir;
+
+ private String locatorString;
+
+ private int locatorPort;
+ private int locatorJmxPort;
+ private int locatorHttpPort;
+ private int serverPort1;
+ private int serverPort2;
+ private int serverPort3;
+
+ private static final AtomicReference<LocatorLauncher> LOCATOR_LAUNCHER = new AtomicReference<>();
+
+ private static final AtomicReference<ServerLauncher> SERVER_LAUNCHER = new AtomicReference<>();
+
+ private static final AtomicReference<ClientCache> CLIENT_CACHE = new AtomicReference<>();
+
+ private static final String OVERFLOW_REGION_NAME = "testOverflowRegion";
+
+ public static final int NUM_ENTRIES = 1000;
+
+ @Before
+ public void setup() throws Exception {
+ locator = getVM(0);
+ server1 = getVM(1);
+ server2 = getVM(2);
+ accessor = getVM(3);
+ client = getVM(4);
+
+ locatorDir = temporaryFolder.newFolder(LOCATOR_NAME);
+ server1Dir = temporaryFolder.newFolder(SERVER1_NAME);
+ server2Dir = temporaryFolder.newFolder(SERVER2_NAME);
+ server3Dir = temporaryFolder.newFolder(SERVER3_NAME);
+
+ int[] ports = getRandomAvailableTCPPorts(6);
+ locatorPort = ports[0];
+ locatorJmxPort = ports[1];
+ locatorHttpPort = ports[2];
+ serverPort1 = ports[3];
+ serverPort2 = ports[4];
+ serverPort3 = ports[5];
+
+ locator.invoke(
+ () -> startLocator(locatorDir, locatorPort, locatorJmxPort, locatorHttpPort));
+ gfsh.connectAndVerify(locatorJmxPort, GfshCommandRule.PortType.jmxManager);
+
+ locatorString = "localhost[" + locatorPort + "]";
+ server1.invoke(() -> startServer(SERVER1_NAME, server1Dir, locatorString, serverPort1));
+ server2.invoke(() -> startServer(SERVER2_NAME, server2Dir, locatorString, serverPort2));
+ }
+
+ @After
+ public void tearDown() {
+ destroyRegion();
+ destroyDiskStore(DiskStoreFactory.DEFAULT_DISK_STORE_NAME);
+
+ for (VM vm : new VM[] {client, accessor, server1, server2, locator}) {
+ vm.invoke(() -> {
+ if (CLIENT_CACHE.get() != null) {
+ CLIENT_CACHE.get().close();
+ }
+ if (LOCATOR_LAUNCHER.get() != null) {
+ LOCATOR_LAUNCHER.get().stop();
+ }
+ if (SERVER_LAUNCHER.get() != null) {
+ SERVER_LAUNCHER.get().stop();
+ }
+
+ CLIENT_CACHE.set(null);
+ LOCATOR_LAUNCHER.set(null);
+ SERVER_LAUNCHER.set(null);
+ });
+ }
+ }
+
+ @Test
+ public void testGfshClearRegionWithOverflow() throws InterruptedException {
+ createPartitionRedundantPersistentOverflowRegion();
+
+ populateRegion();
+ assertRegionSize(NUM_ENTRIES);
+
+ gfsh.executeAndAssertThat("clear region --name=" + OVERFLOW_REGION_NAME).statusIsSuccess();
+ assertRegionSize(0);
+
+ restartServers();
+
+ assertRegionSize(0);
+ }
+
+ @Test
+ public void testClientRegionClearWithOverflow() throws InterruptedException {
+ createPartitionRedundantPersistentOverflowRegion();
+
+ populateRegion();
+ assertRegionSize(NUM_ENTRIES);
+
+ client.invoke(() -> {
+ if (CLIENT_CACHE.get() == null) {
+ ClientCache clientCache =
+ new ClientCacheFactory().addPoolLocator("localhost", locatorPort).create();
+ CLIENT_CACHE.set(clientCache);
+ }
+
+ CLIENT_CACHE.get().getRegion(OVERFLOW_REGION_NAME).clear();
+ });
+ assertRegionSize(0);
+
+ restartServers();
+
+ assertRegionSize(0);
+ }
+
+ @Test
+ public void testAccessorRegionClearWithOverflow() throws InterruptedException {
+
+ for (VM vm : toArray(server1, server2)) {
+ vm.invoke(this::createRegionWithDefaultDiskStore);
+ }
+
+ accessor.invoke(() -> {
+ startServer(SERVER3_NAME, server3Dir, locatorString, serverPort3);
+ SERVER_LAUNCHER.get().getCache()
+ .createRegionFactory(RegionShortcut.PARTITION_REDUNDANT_OVERFLOW)
+ .setPartitionAttributes(
+ new PartitionAttributesFactory().setLocalMaxMemory(0).create())
+ .create(OVERFLOW_REGION_NAME);
+ });
+
+ populateRegion();
+ assertRegionSize(NUM_ENTRIES);
+
+ accessor.invoke(() -> {
+ assertThat(SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME).size())
+ .isEqualTo(NUM_ENTRIES);
+ SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME).clear();
+ assertThat(SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME).size())
+ .isEqualTo(0);
+ });
+ assertRegionSize(0);
+
+ for (VM vm : toArray(server1, server2)) {
+ vm.invoke(PartitionedRegionOverflowClearDUnitTest::stopServer);
+ }
+
+ gfsh.executeAndAssertThat("list members").statusIsSuccess();
+ assertThat(gfsh.getGfshOutput()).contains("locator");
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ startServer(SERVER1_NAME, server1Dir, locatorString, serverPort1);
+ createRegionWithDefaultDiskStore();
+ });
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ startServer(SERVER2_NAME, server2Dir, locatorString, serverPort2);
+ createRegionWithDefaultDiskStore();
+ });
+ asyncInvocation1.get();
+ asyncInvocation2.get();
+ assertRegionSize(0);
+ }
+
+ private void restartServers() throws InterruptedException {
+ for (VM vm : toArray(server1, server2)) {
+ vm.invoke(PartitionedRegionOverflowClearDUnitTest::stopServer);
+ }
+
+ gfsh.executeAndAssertThat("list members").statusIsSuccess();
+ assertThat(gfsh.getGfshOutput()).contains("locator");
+ AsyncInvocation asyncInvocation1 =
+ server1
+ .invokeAsync(() -> startServer(SERVER1_NAME, server1Dir, locatorString, serverPort1));
+ AsyncInvocation asyncInvocation2 =
+ server2
+ .invokeAsync(() -> startServer(SERVER2_NAME, server2Dir, locatorString, serverPort2));
+ asyncInvocation1.get();
+ asyncInvocation2.get();
+ }
+
+ private void createPartitionRedundantPersistentOverflowRegion() {
+ String command = new CommandStringBuilder("create region")
+ .addOption("name", OVERFLOW_REGION_NAME)
+ .addOption("type", "PARTITION_REDUNDANT_PERSISTENT_OVERFLOW")
+ .addOption("redundant-copies", "1")
+ .addOption("eviction-entry-count", "1")
+ .addOption("eviction-action", "overflow-to-disk")
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+ }
+
+ private void destroyRegion() {
+ server1.invoke(() -> {
+ assertThat(SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME)).isNotNull();
+ SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME).destroyRegion();
+
+ });
+ }
+
+ private void destroyDiskStore(String diskStoreName) {
+ String command = new CommandStringBuilder("destroy disk-store")
+ .addOption("name", diskStoreName)
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+ }
+
+ private void createRegionWithDefaultDiskStore() {
+ SERVER_LAUNCHER.get().getCache().createDiskStoreFactory()
+ .create(DiskStoreFactory.DEFAULT_DISK_STORE_NAME);
+ SERVER_LAUNCHER.get().getCache()
+ .createRegionFactory(RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW)
+ .setPartitionAttributes(
+ new PartitionAttributesFactory().setRedundantCopies(1).create())
+ .setDiskStoreName(DiskStoreFactory.DEFAULT_DISK_STORE_NAME)
+ .setEvictionAttributes(
+ EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK))
+ .create(OVERFLOW_REGION_NAME);
+ }
+
+ private void populateRegion() {
+ client.invoke(() -> {
+ if (CLIENT_CACHE.get() == null) {
+ ClientCache clientCache =
+ new ClientCacheFactory().addPoolLocator("localhost", locatorPort).create();
+ CLIENT_CACHE.set(clientCache);
+ }
+
+ Region<Object, Object> clientRegion = CLIENT_CACHE.get()
+ .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create(OVERFLOW_REGION_NAME);
+
+ IntStream.range(0, NUM_ENTRIES).forEach(i -> clientRegion.put("key-" + i, "value-" + i));
+ });
+ }
+
+ private void assertRegionSize(int size) {
+ server1.invoke(() -> {
+ assertThat(SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME)).isNotNull();
+ assertThat(SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME).size())
+ .isEqualTo(size);
+ });
+ server2.invoke(() -> {
+ assertThat(SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME)).isNotNull();
+ assertThat(SERVER_LAUNCHER.get().getCache().getRegion(OVERFLOW_REGION_NAME).size())
+ .isEqualTo(size);
+ });
+ }
+
+ private void startLocator(File directory, int port, int jmxPort, int httpPort) {
+ LOCATOR_LAUNCHER.set(new LocatorLauncher.Builder()
+ .setMemberName(LOCATOR_NAME)
+ .setPort(port)
+ .setWorkingDirectory(directory.getAbsolutePath())
+ .set(HTTP_SERVICE_PORT, httpPort + "")
+ .set(JMX_MANAGER, "true")
+ .set(JMX_MANAGER_PORT, String.valueOf(jmxPort))
+ .set(JMX_MANAGER_START, "true")
+ .set(LOG_FILE, new File(directory, LOCATOR_NAME + ".log").getAbsolutePath())
+ .set(MAX_WAIT_TIME_RECONNECT, "1000")
+ .set(MEMBER_TIMEOUT, "2000")
+ .set(ENABLE_CLUSTER_CONFIGURATION, "true")
+ .set(USE_CLUSTER_CONFIGURATION, "true")
+ .build());
+
+ LOCATOR_LAUNCHER.get().start();
+
+ await().untilAsserted(() -> {
+ InternalLocator locator = (InternalLocator) LOCATOR_LAUNCHER.get().getLocator();
+ assertThat(locator.isSharedConfigurationRunning())
+ .as("Locator shared configuration is running on locator" + getVMId())
+ .isTrue();
+ });
+ }
+
+ private void startServer(String name, File workingDirectory, String locator, int serverPort) {
+ SERVER_LAUNCHER.set(new ServerLauncher.Builder()
+ .setDeletePidFileOnStop(true)
+ .setMemberName(name)
+ .setWorkingDirectory(workingDirectory.getAbsolutePath())
+ .setServerPort(serverPort)
+ .set(HTTP_SERVICE_PORT, "0")
+ .set(LOCATORS, locator)
+ .set(LOG_FILE, new File(workingDirectory, name + ".log").getAbsolutePath())
+ .set(MAX_WAIT_TIME_RECONNECT, "1000")
+ .set(MEMBER_TIMEOUT, "2000")
+ .set(ENABLE_CLUSTER_CONFIGURATION, "true")
+ .set(USE_CLUSTER_CONFIGURATION, "true")
+ .build());
+
+ SERVER_LAUNCHER.get().start();
+ }
+
+ private static void stopServer() {
+ SERVER_LAUNCHER.get().stop();
+ }
+}