You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2020/08/24 08:23:45 UTC
[geode] branch develop updated: GEODE-8119: Threads hangs when
offline disk store command is invoked (#5466)
This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new afb161f GEODE-8119: Threads hangs when offline disk store command is invoked (#5466)
afb161f is described below
commit afb161fe793ebaede473c996e88cc161b9c5b6f5
Author: Mario Kevo <48...@users.noreply.github.com>
AuthorDate: Mon Aug 24 10:22:41 2020 +0200
GEODE-8119: Threads hangs when offline disk store command is invoked (#5466)
* GEODE-8119: Threads hangs when offline disk store command is invoked
* changes after comments
---
.../apache/geode/internal/cache/DiskStoreImpl.java | 2 +-
.../cli/commands/DiskStoreCommandsDUnitTest.java | 20 ++
.../OfflineDiskStoreCommandsDUnitTest.java | 246 +++++++++++++++++++++
.../AlterDiskStoreCommandIntegrationTest.java | 32 +++
.../DescribeDiskStoreCommandIntegrationTest.java | 31 ++-
5 files changed, 324 insertions(+), 7 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index bb05e96..718dd7e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -460,7 +460,7 @@ public class DiskStoreImpl implements DiskStore {
} else {
this.asyncQueue = new ForceableLinkedBlockingQueue<Object>();
}
- if (!isValidating() && !isOfflineCompacting()) {
+ if (!isOffline()) {
startAsyncFlusher();
}
diff --git a/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java
index 1dac0b4..a6a3f08 100644
--- a/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java
+++ b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java
@@ -48,6 +48,7 @@ import org.apache.geode.cache.configuration.DiskStoreType;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.DiskInitFile;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.SnapshotTestUtil;
import org.apache.geode.management.internal.cli.result.model.TabularResultModel;
@@ -520,6 +521,25 @@ public class DiskStoreCommandsDUnitTest implements Serializable {
}
@Test
+ @Parameters({"compact offline-disk-store", "describe offline-disk-store",
+ "upgrade offline-disk-store", "validate offline-disk-store",
+ "alter disk-store --region=testRegion --enable-statistics=true"})
+ public void offlineDiskStoreCommandShouldFailWhenDiskStoreFileDoesNotExist(
+ String baseCommand) {
+ Path diskStorePath =
+ Paths.get(tempDir.getRoot().getAbsolutePath());
+ assertThat(Files.exists(diskStorePath)).isTrue();
+ Path diskStoreFilePath =
+ Paths.get(diskStorePath + File.separator + "BACKUPnonExistingDiskStore"
+ + DiskInitFile.IF_FILE_EXT);
+ assertThat(Files.exists(diskStoreFilePath)).isFalse();
+ gfsh.executeAndAssertThat(baseCommand + " --name=nonExistingDiskStore --disk-dirs="
+ + diskStorePath.toAbsolutePath().toString()).statusIsError()
+ .containsOutput("The init file " + diskStoreFilePath + " does not exist.");
+ assertThat(Files.exists(diskStoreFilePath)).isFalse();
+ }
+
+ @Test
@Parameters(method = "getDiskDirNames")
public void validateDiskStoreDiskDirectoryPath(String diskDirectoryName)
throws Exception {
diff --git a/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/OfflineDiskStoreCommandsDUnitTest.java b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/OfflineDiskStoreCommandsDUnitTest.java
new file mode 100644
index 0000000..97c04ea
--- /dev/null
+++ b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/OfflineDiskStoreCommandsDUnitTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.DiskInitFile;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@RunWith(JUnitParamsRunner.class)
+public class OfflineDiskStoreCommandsDUnitTest implements Serializable {
+ private static final String REGION_NAME = "testRegion";
+ private static final String DISK_STORE_ID = "testDisk";
+ private static final String WRONG_DISK_STORE_ID = "wrongTestDisk";
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+
+ private Properties createLocatorConfiguration(int localLocatorPort) {
+ Properties config = new Properties();
+ config.setProperty(MCAST_PORT, "0");
+ config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+ config.setProperty(START_LOCATOR,
+ "localhost[" + localLocatorPort + "],server=true,peer=true,hostname-for-clients=localhost");
+
+ return config;
+ }
+
+ private Properties createServerConfiguration(int localLocatorPort) {
+ Properties config = new Properties();
+ config.setProperty(MCAST_PORT, "0");
+ config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+
+ return config;
+ }
+
+ private void createDiskStore(File[] diskStoreDirectories) {
+ DiskStoreFactory diskStoreFactory = cacheRule.getCache().createDiskStoreFactory();
+ diskStoreFactory.setMaxOplogSize(1);
+ diskStoreFactory.setAutoCompact(true);
+ diskStoreFactory.setAllowForceCompaction(true);
+ diskStoreFactory.setDiskDirs(diskStoreDirectories);
+
+ diskStoreFactory.create(DISK_STORE_ID);
+ }
+
+ private void createRegion() {
+ cacheRule.getCache()
+ .<String, String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
+ .setDiskStoreName(DISK_STORE_ID)
+ .create(REGION_NAME);
+ }
+
+ private void createServerWithRegionAndPersistentRegion(File[] diskStoreDirectories) {
+ createDiskStore(diskStoreDirectories);
+ createRegion();
+ cacheRule.getCache().getRegion(REGION_NAME);
+ }
+
+ private void gracefullyDisconnect() {
+ InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+ InternalDistributedSystem.getConnectedInstance().disconnect();
+ await()
+ .untilAsserted(() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+ }
+
+ @Test
+ @Parameters({"compact offline-disk-store", "describe offline-disk-store",
+ "validate offline-disk-store",
+ "alter disk-store --region=testRegion --enable-statistics=true"})
+ public void offlineDiskStoreCommandsSupportDiskStoresWithMultipleDirectories(String baseCommand)
+ throws IOException {
+ VM locator = getVM(0);
+ VM server = getVM(1);
+ final int ENTRIES = 100000;
+ int site1Port = getRandomAvailableTCPPortsForDUnitSite(1)[0];
+
+ File diskStoreDirectory1 = temporaryFolder.newFolder("diskDir1");
+ File diskStoreDirectory2 = temporaryFolder.newFolder("diskDir2");
+ File diskStoreDirectory3 = temporaryFolder.newFolder("diskDir3");
+ File[] diskStoreDirectories =
+ new File[] {diskStoreDirectory1, diskStoreDirectory2, diskStoreDirectory3};
+ String diskDirs = Arrays.stream(diskStoreDirectories).map(File::getAbsolutePath)
+ .collect(Collectors.joining(","));
+
+ locator.invoke(() -> cacheRule.createCache(createLocatorConfiguration(site1Port)));
+ server.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
+ server.invoke(() -> {
+ createServerWithRegionAndPersistentRegion(diskStoreDirectories);
+ Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+ IntStream.range(0, ENTRIES).forEach(i -> region.put("Key_" + i, "Value_" + i));
+ });
+ locator.invoke(this::gracefullyDisconnect);
+ server.invoke(this::gracefullyDisconnect);
+ gfsh.executeAndAssertThat(
+ baseCommand + " --name=" + DISK_STORE_ID + " --disk-dirs=" + diskDirs)
+ .statusIsSuccess();
+ }
+
+ @Test
+ @Parameters({"describe offline-disk-store",
+ "validate offline-disk-store",
+ "alter disk-store --region=testRegion --enable-statistics=true"})
+ public void offlineDiskStoreCommandsDoNotLeaveLingeringThreadsRunning(String baseCommand)
+ throws IOException {
+ VM locator = getVM(0);
+ VM server = getVM(1);
+ final int ENTRIES = 100000;
+ int site1Port = getRandomAvailableTCPPortsForDUnitSite(1)[0];
+ String threadName = "Asynchronous disk writer for region";
+ int counter = 0;
+
+ File diskStoreDirectory1 = temporaryFolder.newFolder("diskDir1");
+
+ File[] diskStoreDirectories =
+ new File[] {diskStoreDirectory1};
+ String diskDirs = Arrays.stream(diskStoreDirectories).map(File::getAbsolutePath)
+ .collect(Collectors.joining(","));
+
+ locator.invoke(() -> cacheRule.createCache(createLocatorConfiguration(site1Port)));
+ server.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
+ server.invoke(() -> {
+ createServerWithRegionAndPersistentRegion(diskStoreDirectories);
+ Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+ IntStream.range(0, ENTRIES).forEach(i -> region.put("Key_" + i, "Value_" + i));
+ });
+ locator.invoke(this::gracefullyDisconnect);
+ server.invoke(this::gracefullyDisconnect);
+
+ gfsh.executeAndAssertThat(
+ baseCommand + " --name=" + DISK_STORE_ID + " --disk-dirs=" + diskDirs)
+ .statusIsSuccess();
+
+ File tempFile = temporaryFolder.newFile("dumpFile.txt");
+ BufferedWriter writer = new BufferedWriter(new FileWriter(tempFile));
+ ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] infos = bean.dumpAllThreads(true, true);
+ for (ThreadInfo info : infos) {
+ if (info.toString().contains(threadName))
+ writer.append(info.toString());
+ }
+
+ writer.close();
+
+ try (BufferedReader br = new BufferedReader(new FileReader(tempFile))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ if (line.contains(threadName))
+ counter++;
+ }
+ }
+ assertThat(counter).isEqualTo(0);
+ }
+
+ @Test
+ @Parameters({"compact offline-disk-store", "describe offline-disk-store",
+ "validate offline-disk-store",
+ "alter disk-store --region=testRegion --enable-statistics=true"})
+ public void offlineDiskStoreCommandShouldFailWhenDiskStoreFileDoesNotExist(String baseCommand)
+ throws IOException {
+ VM locator = getVM(0);
+ VM server = getVM(1);
+ final int ENTRIES = 100000;
+ int site1Port = getRandomAvailableTCPPortsForDUnitSite(1)[0];
+
+ File diskStoreDirectory1 = temporaryFolder.newFolder("diskDir1");
+ File diskStoreDirectory2 = temporaryFolder.newFolder("diskDir2");
+ File diskStoreDirectory3 = temporaryFolder.newFolder("diskDir3");
+ File[] diskStoreDirectories =
+ new File[] {diskStoreDirectory1, diskStoreDirectory2, diskStoreDirectory3};
+ String diskDirs = Arrays.stream(diskStoreDirectories).map(File::getAbsolutePath)
+ .collect(Collectors.joining(","));
+
+ locator.invoke(() -> cacheRule.createCache(createLocatorConfiguration(site1Port)));
+ server.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
+ server.invoke(() -> {
+ createServerWithRegionAndPersistentRegion(diskStoreDirectories);
+ Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+ IntStream.range(0, ENTRIES).forEach(i -> region.put("Key_" + i, "Value_" + i));
+ });
+ locator.invoke(this::gracefullyDisconnect);
+ server.invoke(this::gracefullyDisconnect);
+ gfsh.executeAndAssertThat(
+ baseCommand + " --name=" + WRONG_DISK_STORE_ID + " --disk-dirs=" + diskDirs)
+ .statusIsError()
+ .containsOutput("The init file " + diskStoreDirectory1 + File.separator + "BACKUP"
+ + WRONG_DISK_STORE_ID + DiskInitFile.IF_FILE_EXT + " does not exist.");
+ }
+}
diff --git a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java
index 30b77cb..e354e04 100644
--- a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java
+++ b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java
@@ -17,11 +17,14 @@ package org.apache.geode.management.internal.cli.commands;
import static org.mockito.Mockito.spy;
+import java.io.File;
+
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.apache.geode.internal.cache.DiskInitFile;
import org.apache.geode.management.cli.GfshCommand;
import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
import org.apache.geode.management.internal.i18n.CliStrings;
@@ -55,4 +58,33 @@ public class AlterDiskStoreCommandIntegrationTest {
gfsh.executeAndAssertThat(command, commandString).statusIsError()
.containsOutput("Cannot use the --remove=true parameter with any other parameters");
}
+
+ @Test
+ public void testDirValidation() {
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.ALTER_DISK_STORE);
+ csb.addOption(CliStrings.ALTER_DISK_STORE__DISKSTORENAME, "diskStoreName");
+ csb.addOption(CliStrings.ALTER_DISK_STORE__REGIONNAME, "regionName");
+ csb.addOption(CliStrings.ALTER_DISK_STORE__DISKDIRS, "wrongDiskDir");
+ csb.addOption(CliStrings.ALTER_DISK_STORE__CONCURRENCY__LEVEL, "5");
+ String commandString = csb.toString();
+
+ gfsh.executeAndAssertThat(command, commandString).statusIsError()
+ .containsOutput("Could not find disk-dirs: \"wrongDiskDir");
+ }
+
+ @Test
+ public void testNameValidation() {
+ String diskStoreName = "diskStoreName";
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.ALTER_DISK_STORE);
+ csb.addOption(CliStrings.ALTER_DISK_STORE__DISKSTORENAME, diskStoreName);
+ csb.addOption(CliStrings.ALTER_DISK_STORE__REGIONNAME, "regionName");
+ csb.addOption(CliStrings.ALTER_DISK_STORE__DISKDIRS, tempDir.getRoot().toString());
+ csb.addOption(CliStrings.ALTER_DISK_STORE__CONCURRENCY__LEVEL, "5");
+ String commandString = csb.toString();
+
+ gfsh.executeAndAssertThat(command, commandString).statusIsError()
+ .containsOutput(
+ "The init file " + tempDir.getRoot().toString() + File.separator + "BACKUP"
+ + diskStoreName + DiskInitFile.IF_FILE_EXT + " does not exist.");
+ }
}
diff --git a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java
index c273fcc..63b93a7 100644
--- a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java
+++ b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java
@@ -20,10 +20,14 @@ import java.util.List;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
import org.apache.geode.test.junit.categories.PersistenceTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;
import org.apache.geode.test.junit.rules.GfshCommandRule.PortType;
@@ -56,8 +60,11 @@ public class DescribeDiskStoreCommandIntegrationTest {
@ClassRule
public static GfshCommandRule gfsh = new GfshCommandRule().withTimeout(1);
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+
@Test
- public void commandFailsWithoutOptions() throws Exception {
+ public void commandFailsWithoutOptions() {
String cmd = "describe disk-store";
gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("You should specify option (",
"--name", "--member", ") for this command");
@@ -65,37 +72,49 @@ public class DescribeDiskStoreCommandIntegrationTest {
}
@Test
- public void commandFailsWithOnlyMember() throws Exception {
+ public void commandFailsWithOnlyMember() {
String cmd = "describe disk-store --member=" + MEMBER_NAME;
gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("You should specify option (",
"--name", ") for this command");
}
@Test
- public void commandFailsWithOnlyName() throws Exception {
+ public void commandFailsWithOnlyName() {
String cmd = "describe disk-store --name=" + DISK_STORE_NAME;
gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("You should specify option (",
"--member", ") for this command");
}
@Test
- public void commandFailsWithBadMember() throws Exception {
+ public void commandFailsWithBadMember() {
String cmd = "describe disk-store --member=invalid-member-name --name=" + DISK_STORE_NAME;
gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("Member",
"could not be found. Please verify the member name or ID and try again.");
}
@Test
- public void commandFailsWithBadName() throws Exception {
+ public void commandFailsWithBadName() {
String cmd = "describe disk-store --name=invalid-diskstore-name --member=" + MEMBER_NAME;
gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("A disk store with name",
"was not found on member");
}
@Test
- public void commandSucceedsWithNameAndMember() throws Exception {
+ public void commandSucceedsWithNameAndMember() {
String cmd = "describe disk-store --name=" + DISK_STORE_NAME + " --member=" + MEMBER_NAME;
gfsh.executeAndAssertThat(cmd).statusIsSuccess()
.containsOutput(expectedData.toArray(new String[0]));
}
+
+ @Test
+ public void testDirValidation() {
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.DESCRIBE_OFFLINE_DISK_STORE);
+ csb.addOption(CliStrings.DESCRIBE_OFFLINE_DISK_STORE__DISKSTORENAME, DISK_STORE_NAME);
+ csb.addOption(CliStrings.DESCRIBE_OFFLINE_DISK_STORE__DISKDIRS, "wrongDiskDir");
+ String commandString = csb.toString();
+
+ gfsh.executeAndAssertThat(commandString).statusIsError()
+ .containsOutput("Could not find disk-dirs: \"wrongDiskDir");
+ }
+
}