You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2020/08/24 08:23:45 UTC

[geode] branch develop updated: GEODE-8119: Threads hangs when offline disk store command is invoked (#5466)

This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new afb161f  GEODE-8119: Threads hangs when offline disk store command is invoked (#5466)
afb161f is described below

commit afb161fe793ebaede473c996e88cc161b9c5b6f5
Author: Mario Kevo <48...@users.noreply.github.com>
AuthorDate: Mon Aug 24 10:22:41 2020 +0200

    GEODE-8119: Threads hangs when offline disk store command is invoked (#5466)
    
    * GEODE-8119: Threads hangs when offline disk store command is invoked
    
    * changes after comments
---
 .../apache/geode/internal/cache/DiskStoreImpl.java |   2 +-
 .../cli/commands/DiskStoreCommandsDUnitTest.java   |  20 ++
 .../OfflineDiskStoreCommandsDUnitTest.java         | 246 +++++++++++++++++++++
 .../AlterDiskStoreCommandIntegrationTest.java      |  32 +++
 .../DescribeDiskStoreCommandIntegrationTest.java   |  31 ++-
 5 files changed, 324 insertions(+), 7 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index bb05e96..718dd7e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -460,7 +460,7 @@ public class DiskStoreImpl implements DiskStore {
     } else {
       this.asyncQueue = new ForceableLinkedBlockingQueue<Object>();
     }
-    if (!isValidating() && !isOfflineCompacting()) {
+    if (!isOffline()) {
       startAsyncFlusher();
     }
 
diff --git a/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java
index 1dac0b4..a6a3f08 100644
--- a/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java
+++ b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DiskStoreCommandsDUnitTest.java
@@ -48,6 +48,7 @@ import org.apache.geode.cache.configuration.DiskStoreType;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
 import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.DiskInitFile;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.SnapshotTestUtil;
 import org.apache.geode.management.internal.cli.result.model.TabularResultModel;
@@ -520,6 +521,25 @@ public class DiskStoreCommandsDUnitTest implements Serializable {
   }
 
   @Test
+  @Parameters({"compact offline-disk-store", "describe offline-disk-store",
+      "upgrade offline-disk-store", "validate offline-disk-store",
+      "alter disk-store --region=testRegion --enable-statistics=true"})
+  public void offlineDiskStoreCommandShouldFailWhenDiskStoreFileDoesNotExist(
+      String baseCommand) {
+    Path diskStorePath =
+        Paths.get(tempDir.getRoot().getAbsolutePath());
+    assertThat(Files.exists(diskStorePath)).isTrue();
+    Path diskStoreFilePath =
+        Paths.get(diskStorePath + File.separator + "BACKUPnonExistingDiskStore"
+            + DiskInitFile.IF_FILE_EXT);
+    assertThat(Files.exists(diskStoreFilePath)).isFalse();
+    gfsh.executeAndAssertThat(baseCommand + " --name=nonExistingDiskStore --disk-dirs="
+        + diskStorePath.toAbsolutePath().toString()).statusIsError()
+        .containsOutput("The init file " + diskStoreFilePath + " does not exist.");
+    assertThat(Files.exists(diskStoreFilePath)).isFalse();
+  }
+
+  @Test
   @Parameters(method = "getDiskDirNames")
   public void validateDiskStoreDiskDirectoryPath(String diskDirectoryName)
       throws Exception {
diff --git a/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/OfflineDiskStoreCommandsDUnitTest.java b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/OfflineDiskStoreCommandsDUnitTest.java
new file mode 100644
index 0000000..97c04ea
--- /dev/null
+++ b/geode-gfsh/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/OfflineDiskStoreCommandsDUnitTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.DiskInitFile;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@RunWith(JUnitParamsRunner.class)
+public class OfflineDiskStoreCommandsDUnitTest implements Serializable {
+  private static final String REGION_NAME = "testRegion";
+  private static final String DISK_STORE_ID = "testDisk";
+  private static final String WRONG_DISK_STORE_ID = "wrongTestDisk";
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+
+  private Properties createLocatorConfiguration(int localLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + localLocatorPort + "],server=true,peer=true,hostname-for-clients=localhost");
+
+    return config;
+  }
+
+  private Properties createServerConfiguration(int localLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+
+    return config;
+  }
+
+  private void createDiskStore(File[] diskStoreDirectories) {
+    DiskStoreFactory diskStoreFactory = cacheRule.getCache().createDiskStoreFactory();
+    diskStoreFactory.setMaxOplogSize(1);
+    diskStoreFactory.setAutoCompact(true);
+    diskStoreFactory.setAllowForceCompaction(true);
+    diskStoreFactory.setDiskDirs(diskStoreDirectories);
+
+    diskStoreFactory.create(DISK_STORE_ID);
+  }
+
+  private void createRegion() {
+    cacheRule.getCache()
+        .<String, String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
+        .setDiskStoreName(DISK_STORE_ID)
+        .create(REGION_NAME);
+  }
+
+  private void createServerWithRegionAndPersistentRegion(File[] diskStoreDirectories) {
+    createDiskStore(diskStoreDirectories);
+    createRegion();
+    cacheRule.getCache().getRegion(REGION_NAME);
+  }
+
+  private void gracefullyDisconnect() {
+    InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+    InternalDistributedSystem.getConnectedInstance().disconnect();
+    await()
+        .untilAsserted(() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+  }
+
+  @Test
+  @Parameters({"compact offline-disk-store", "describe offline-disk-store",
+      "validate offline-disk-store",
+      "alter disk-store --region=testRegion --enable-statistics=true"})
+  public void offlineDiskStoreCommandsSupportDiskStoresWithMultipleDirectories(String baseCommand)
+      throws IOException {
+    VM locator = getVM(0);
+    VM server = getVM(1);
+    final int ENTRIES = 100000;
+    int site1Port = getRandomAvailableTCPPortsForDUnitSite(1)[0];
+
+    File diskStoreDirectory1 = temporaryFolder.newFolder("diskDir1");
+    File diskStoreDirectory2 = temporaryFolder.newFolder("diskDir2");
+    File diskStoreDirectory3 = temporaryFolder.newFolder("diskDir3");
+    File[] diskStoreDirectories =
+        new File[] {diskStoreDirectory1, diskStoreDirectory2, diskStoreDirectory3};
+    String diskDirs = Arrays.stream(diskStoreDirectories).map(File::getAbsolutePath)
+        .collect(Collectors.joining(","));
+
+    locator.invoke(() -> cacheRule.createCache(createLocatorConfiguration(site1Port)));
+    server.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
+    server.invoke(() -> {
+      createServerWithRegionAndPersistentRegion(diskStoreDirectories);
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+      IntStream.range(0, ENTRIES).forEach(i -> region.put("Key_" + i, "Value_" + i));
+    });
+    locator.invoke(this::gracefullyDisconnect);
+    server.invoke(this::gracefullyDisconnect);
+    gfsh.executeAndAssertThat(
+        baseCommand + " --name=" + DISK_STORE_ID + " --disk-dirs=" + diskDirs)
+        .statusIsSuccess();
+  }
+
+  @Test
+  @Parameters({"describe offline-disk-store",
+      "validate offline-disk-store",
+      "alter disk-store --region=testRegion --enable-statistics=true"})
+  public void offlineDiskStoreCommandsDoNotLeaveLingeringThreadsRunning(String baseCommand)
+      throws IOException {
+    VM locator = getVM(0);
+    VM server = getVM(1);
+    final int ENTRIES = 100000;
+    int site1Port = getRandomAvailableTCPPortsForDUnitSite(1)[0];
+    String threadName = "Asynchronous disk writer for region";
+    int counter = 0;
+
+    File diskStoreDirectory1 = temporaryFolder.newFolder("diskDir1");
+
+    File[] diskStoreDirectories =
+        new File[] {diskStoreDirectory1};
+    String diskDirs = Arrays.stream(diskStoreDirectories).map(File::getAbsolutePath)
+        .collect(Collectors.joining(","));
+
+    locator.invoke(() -> cacheRule.createCache(createLocatorConfiguration(site1Port)));
+    server.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
+    server.invoke(() -> {
+      createServerWithRegionAndPersistentRegion(diskStoreDirectories);
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+      IntStream.range(0, ENTRIES).forEach(i -> region.put("Key_" + i, "Value_" + i));
+    });
+    locator.invoke(this::gracefullyDisconnect);
+    server.invoke(this::gracefullyDisconnect);
+
+    gfsh.executeAndAssertThat(
+        baseCommand + " --name=" + DISK_STORE_ID + " --disk-dirs=" + diskDirs)
+        .statusIsSuccess();
+
+    File tempFile = temporaryFolder.newFile("dumpFile.txt");
+    BufferedWriter writer = new BufferedWriter(new FileWriter(tempFile));
+    ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+    ThreadInfo[] infos = bean.dumpAllThreads(true, true);
+    for (ThreadInfo info : infos) {
+      if (info.toString().contains(threadName))
+        writer.append(info.toString());
+    }
+
+    writer.close();
+
+    try (BufferedReader br = new BufferedReader(new FileReader(tempFile))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        if (line.contains(threadName))
+          counter++;
+      }
+    }
+    assertThat(counter).isEqualTo(0);
+  }
+
+  @Test
+  @Parameters({"compact offline-disk-store", "describe offline-disk-store",
+      "validate offline-disk-store",
+      "alter disk-store --region=testRegion --enable-statistics=true"})
+  public void offlineDiskStoreCommandShouldFailWhenDiskStoreFileDoesNotExist(String baseCommand)
+      throws IOException {
+    VM locator = getVM(0);
+    VM server = getVM(1);
+    final int ENTRIES = 100000;
+    int site1Port = getRandomAvailableTCPPortsForDUnitSite(1)[0];
+
+    File diskStoreDirectory1 = temporaryFolder.newFolder("diskDir1");
+    File diskStoreDirectory2 = temporaryFolder.newFolder("diskDir2");
+    File diskStoreDirectory3 = temporaryFolder.newFolder("diskDir3");
+    File[] diskStoreDirectories =
+        new File[] {diskStoreDirectory1, diskStoreDirectory2, diskStoreDirectory3};
+    String diskDirs = Arrays.stream(diskStoreDirectories).map(File::getAbsolutePath)
+        .collect(Collectors.joining(","));
+
+    locator.invoke(() -> cacheRule.createCache(createLocatorConfiguration(site1Port)));
+    server.invoke(() -> cacheRule.createCache(createServerConfiguration(site1Port)));
+    server.invoke(() -> {
+      createServerWithRegionAndPersistentRegion(diskStoreDirectories);
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+      IntStream.range(0, ENTRIES).forEach(i -> region.put("Key_" + i, "Value_" + i));
+    });
+    locator.invoke(this::gracefullyDisconnect);
+    server.invoke(this::gracefullyDisconnect);
+    gfsh.executeAndAssertThat(
+        baseCommand + " --name=" + WRONG_DISK_STORE_ID + " --disk-dirs=" + diskDirs)
+        .statusIsError()
+        .containsOutput("The init file " + diskStoreDirectory1 + File.separator + "BACKUP"
+            + WRONG_DISK_STORE_ID + DiskInitFile.IF_FILE_EXT + " does not exist.");
+  }
+}
diff --git a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java
index 30b77cb..e354e04 100644
--- a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java
+++ b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/AlterDiskStoreCommandIntegrationTest.java
@@ -17,11 +17,14 @@ package org.apache.geode.management.internal.cli.commands;
 
 import static org.mockito.Mockito.spy;
 
+import java.io.File;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import org.apache.geode.internal.cache.DiskInitFile;
 import org.apache.geode.management.cli.GfshCommand;
 import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
 import org.apache.geode.management.internal.i18n.CliStrings;
@@ -55,4 +58,33 @@ public class AlterDiskStoreCommandIntegrationTest {
     gfsh.executeAndAssertThat(command, commandString).statusIsError()
         .containsOutput("Cannot use the --remove=true parameter with any other parameters");
   }
+
+  @Test
+  public void testDirValidation() {
+    CommandStringBuilder csb = new CommandStringBuilder(CliStrings.ALTER_DISK_STORE);
+    csb.addOption(CliStrings.ALTER_DISK_STORE__DISKSTORENAME, "diskStoreName");
+    csb.addOption(CliStrings.ALTER_DISK_STORE__REGIONNAME, "regionName");
+    csb.addOption(CliStrings.ALTER_DISK_STORE__DISKDIRS, "wrongDiskDir");
+    csb.addOption(CliStrings.ALTER_DISK_STORE__CONCURRENCY__LEVEL, "5");
+    String commandString = csb.toString();
+
+    gfsh.executeAndAssertThat(command, commandString).statusIsError()
+        .containsOutput("Could not find disk-dirs: \"wrongDiskDir");
+  }
+
+  @Test
+  public void testNameValidation() {
+    String diskStoreName = "diskStoreName";
+    CommandStringBuilder csb = new CommandStringBuilder(CliStrings.ALTER_DISK_STORE);
+    csb.addOption(CliStrings.ALTER_DISK_STORE__DISKSTORENAME, diskStoreName);
+    csb.addOption(CliStrings.ALTER_DISK_STORE__REGIONNAME, "regionName");
+    csb.addOption(CliStrings.ALTER_DISK_STORE__DISKDIRS, tempDir.getRoot().toString());
+    csb.addOption(CliStrings.ALTER_DISK_STORE__CONCURRENCY__LEVEL, "5");
+    String commandString = csb.toString();
+
+    gfsh.executeAndAssertThat(command, commandString).statusIsError()
+        .containsOutput(
+            "The init file " + tempDir.getRoot().toString() + File.separator + "BACKUP"
+                + diskStoreName + DiskInitFile.IF_FILE_EXT + " does not exist.");
+  }
 }
diff --git a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java
index c273fcc..63b93a7 100644
--- a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java
+++ b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/commands/DescribeDiskStoreCommandIntegrationTest.java
@@ -20,10 +20,14 @@ import java.util.List;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
 
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
 import org.apache.geode.test.junit.categories.PersistenceTest;
 import org.apache.geode.test.junit.rules.GfshCommandRule;
 import org.apache.geode.test.junit.rules.GfshCommandRule.PortType;
@@ -56,8 +60,11 @@ public class DescribeDiskStoreCommandIntegrationTest {
   @ClassRule
   public static GfshCommandRule gfsh = new GfshCommandRule().withTimeout(1);
 
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
   @Test
-  public void commandFailsWithoutOptions() throws Exception {
+  public void commandFailsWithoutOptions() {
     String cmd = "describe disk-store";
     gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("You should specify option (",
         "--name", "--member", ") for this command");
@@ -65,37 +72,49 @@ public class DescribeDiskStoreCommandIntegrationTest {
   }
 
   @Test
-  public void commandFailsWithOnlyMember() throws Exception {
+  public void commandFailsWithOnlyMember() {
     String cmd = "describe disk-store --member=" + MEMBER_NAME;
     gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("You should specify option (",
         "--name", ") for this command");
   }
 
   @Test
-  public void commandFailsWithOnlyName() throws Exception {
+  public void commandFailsWithOnlyName() {
     String cmd = "describe disk-store --name=" + DISK_STORE_NAME;
     gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("You should specify option (",
         "--member", ") for this command");
   }
 
   @Test
-  public void commandFailsWithBadMember() throws Exception {
+  public void commandFailsWithBadMember() {
     String cmd = "describe disk-store --member=invalid-member-name --name=" + DISK_STORE_NAME;
     gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("Member",
         "could not be found.  Please verify the member name or ID and try again.");
   }
 
   @Test
-  public void commandFailsWithBadName() throws Exception {
+  public void commandFailsWithBadName() {
     String cmd = "describe disk-store --name=invalid-diskstore-name --member=" + MEMBER_NAME;
     gfsh.executeAndAssertThat(cmd).statusIsError().containsOutput("A disk store with name",
         "was not found on member");
   }
 
   @Test
-  public void commandSucceedsWithNameAndMember() throws Exception {
+  public void commandSucceedsWithNameAndMember() {
     String cmd = "describe disk-store --name=" + DISK_STORE_NAME + " --member=" + MEMBER_NAME;
     gfsh.executeAndAssertThat(cmd).statusIsSuccess()
         .containsOutput(expectedData.toArray(new String[0]));
   }
+
+  @Test
+  public void testDirValidation() {
+    CommandStringBuilder csb = new CommandStringBuilder(CliStrings.DESCRIBE_OFFLINE_DISK_STORE);
+    csb.addOption(CliStrings.DESCRIBE_OFFLINE_DISK_STORE__DISKSTORENAME, DISK_STORE_NAME);
+    csb.addOption(CliStrings.DESCRIBE_OFFLINE_DISK_STORE__DISKDIRS, "wrongDiskDir");
+    String commandString = csb.toString();
+
+    gfsh.executeAndAssertThat(commandString).statusIsError()
+        .containsOutput("Could not find disk-dirs: \"wrongDiskDir");
+  }
+
 }