You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nr...@apache.org on 2018/01/31 22:34:56 UTC

[geode] branch develop updated: GEODE-4390: Replace flaky test with new tests (#1371)

This is an automated email from the ASF dual-hosted git repository.

nreich pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4a75148  GEODE-4390: Replace flaky test with new tests (#1371)
4a75148 is described below

commit 4a7514883515ed804b6c5e905e11627395036db9
Author: Nick Reich <nr...@pivotal.io>
AuthorDate: Wed Jan 31 14:34:52 2018 -0800

    GEODE-4390: Replace flaky test with new tests (#1371)
---
 .../geode/internal/cache/GemFireCacheImpl.java     |   1 +
 .../apache/geode/internal/cache/InternalCache.java |   2 +
 .../internal/cache/xmlcache/CacheCreation.java     |   5 +
 .../cache/partitioned/PersistPRKRFDUnitTest.java   | 258 ---------------------
 .../partitioned/PersistPRKRFIntegrationTest.java   | 176 ++++++++++++++
 5 files changed, 184 insertions(+), 258 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 014f0ea..3aed22b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2521,6 +2521,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
     }
   }
 
+  @Override
   public void closeDiskStores() {
     Iterator<DiskStoreImpl> it = this.diskStores.values().iterator();
     while (it.hasNext()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index d872037..562aaa7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -344,4 +344,6 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime {
   InternalQueryService getQueryService();
 
   Set<AsyncEventQueue> getAsyncEventQueues(boolean visibleOnly);
+
+  void closeDiskStores();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 65cb986..1d262e9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -1259,6 +1259,11 @@ public class CacheCreation implements InternalCache {
   }
 
   @Override
+  public void closeDiskStores() {
+    throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+  }
+
+  @Override
   public AsyncEventQueue getAsyncEventQueue(String id) {
     for (AsyncEventQueue asyncEventQueue : this.asyncEventQueues) {
       if (asyncEventQueue.getId().equals(id)) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFDUnitTest.java
deleted file mode 100644
index efb4c2f..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFDUnitTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.partitioned;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Properties;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.Declarable;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.util.CacheWriterAdapter;
-import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.internal.cache.DiskRegion;
-import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
-
-/**
- * Tests the basic use cases for PR persistence.
- */
-@Category(DistributedTest.class)
-public class PersistPRKRFDUnitTest extends PersistentPartitionedRegionTestBase {
-
-  private static final int NUM_BUCKETS = 15;
-  private static final int MAX_WAIT = 30 * 1000;
-  static Object lockObject = new Object();
-
-  /**
-   * do a put/modify/destroy while closing disk store
-   *
-   * to turn on debug, add following parameter in local.conf: hydra.VmPrms-extraVMArgs +=
-   * "-Ddisk.KRF_DEBUG=true";
-   */
-  @Test
-  @Category(FlakyTest.class)
-  public void testCloseDiskStoreWhenPut() {
-    final String title = "testCloseDiskStoreWhenPut:";
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-
-    createPR(vm0, 0);
-    createData(vm0, 0, 10, "a");
-    vm0.invoke(new CacheSerializableRunnable(title + "server add writer") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion(getPartitionedRegionName());
-        // let the region to hold on the put until diskstore is closed
-        if (!DiskStoreImpl.KRF_DEBUG) {
-          region.getAttributesMutator().setCacheWriter(new MyWriter());
-        }
-      }
-    });
-
-    // create test
-    AsyncInvocation async1 = vm0.invokeAsync(new CacheSerializableRunnable(title + "async create") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion(getPartitionedRegionName());
-        IgnoredException expect = IgnoredException.addIgnoredException("CacheClosedException");
-        try {
-          region.put(10, "b");
-          fail("Expect CacheClosedException here");
-        } catch (CacheClosedException cce) {
-          System.out.println(title + cce.getMessage());
-          if (DiskStoreImpl.KRF_DEBUG) {
-            assert cce.getMessage().contains("The disk store is closed.");
-          } else {
-            assert cce.getMessage().contains("The disk store is closed");
-          }
-        } finally {
-          expect.remove();
-        }
-      }
-    });
-    vm0.invoke(new CacheSerializableRunnable(title + "close disk store") {
-      public void run2() throws CacheException {
-        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
-        Wait.pause(500);
-        gfc.closeDiskStores();
-        synchronized (lockObject) {
-          lockObject.notify();
-        }
-      }
-    });
-    ThreadUtils.join(async1, MAX_WAIT);
-    closeCache(vm0);
-
-    // update
-    createPR(vm0, 0);
-    vm0.invoke(new CacheSerializableRunnable(title + "server add writer") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion(getPartitionedRegionName());
-        // let the region to hold on the put until diskstore is closed
-        if (!DiskStoreImpl.KRF_DEBUG) {
-          region.getAttributesMutator().setCacheWriter(new MyWriter());
-        }
-      }
-    });
-    async1 = vm0.invokeAsync(new CacheSerializableRunnable(title + "async update") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion(getPartitionedRegionName());
-        IgnoredException expect = IgnoredException.addIgnoredException("CacheClosedException");
-        try {
-          region.put(1, "b");
-          fail("Expect CacheClosedException here");
-        } catch (CacheClosedException cce) {
-          System.out.println(title + cce.getMessage());
-          if (DiskStoreImpl.KRF_DEBUG) {
-            assert cce.getMessage().contains("The disk store is closed.");
-          } else {
-            assert cce.getMessage().contains("The disk store is closed");
-          }
-        } finally {
-          expect.remove();
-        }
-      }
-    });
-    vm0.invoke(new CacheSerializableRunnable(title + "close disk store") {
-      public void run2() throws CacheException {
-        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
-        Wait.pause(500);
-        gfc.closeDiskStores();
-        synchronized (lockObject) {
-          lockObject.notify();
-        }
-      }
-    });
-    ThreadUtils.join(async1, MAX_WAIT);
-    closeCache(vm0);
-
-    // destroy
-    createPR(vm0, 0);
-    vm0.invoke(new CacheSerializableRunnable(title + "server add writer") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion(getPartitionedRegionName());
-        // let the region to hold on the put until diskstore is closed
-        if (!DiskStoreImpl.KRF_DEBUG) {
-          region.getAttributesMutator().setCacheWriter(new MyWriter());
-        }
-      }
-    });
-    async1 = vm0.invokeAsync(new CacheSerializableRunnable(title + "async destroy") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion(getPartitionedRegionName());
-        IgnoredException expect = IgnoredException.addIgnoredException("CacheClosedException");
-        try {
-          region.destroy(2, "b");
-          fail("Expect CacheClosedException here");
-        } catch (CacheClosedException cce) {
-          System.out.println(title + cce.getMessage());
-          if (DiskStoreImpl.KRF_DEBUG) {
-            assert cce.getMessage().contains("The disk store is closed.");
-          } else {
-            assert cce.getMessage().contains("The disk store is closed");
-          }
-        } finally {
-          expect.remove();
-        }
-      }
-    });
-    vm0.invoke(new CacheSerializableRunnable(title + "close disk store") {
-      public void run2() throws CacheException {
-        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
-        Wait.pause(500);
-        gfc.closeDiskStores();
-        synchronized (lockObject) {
-          lockObject.notify();
-        }
-      }
-    });
-    ThreadUtils.join(async1, MAX_WAIT);
-
-    checkData(vm0, 0, 10, "a");
-    checkData(vm0, 10, 11, null);
-    closeCache(vm0);
-  }
-
-  void checkRecoveredFromDisk(VM vm, final int bucketId, final boolean recoveredLocally) {
-    vm.invoke(new SerializableRunnable("check recovered from disk") {
-      @Override
-      public void run() {
-        Cache cache = getCache();
-        PartitionedRegion region = (PartitionedRegion) cache.getRegion(getPartitionedRegionName());
-        DiskRegion disk = region.getRegionAdvisor().getBucket(bucketId).getDiskRegion();
-        if (recoveredLocally) {
-          assertEquals(0, disk.getStats().getRemoteInitializations());
-          assertEquals(1, disk.getStats().getLocalInitializations());
-        } else {
-          assertEquals(1, disk.getStats().getRemoteInitializations());
-          assertEquals(0, disk.getStats().getLocalInitializations());
-        }
-      }
-    });
-  }
-
-  private static class MyWriter extends CacheWriterAdapter implements Declarable {
-    public MyWriter() {}
-
-    public void init(Properties props) {}
-
-    public void beforeCreate(EntryEvent event) {
-      try {
-        synchronized (lockObject) {
-          lockObject.wait();
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    public void beforeUpdate(EntryEvent event) {
-      try {
-        synchronized (lockObject) {
-          lockObject.wait();
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    public void beforeDestroy(EntryEvent event) {
-      try {
-        synchronized (lockObject) {
-          lockObject.wait();
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFIntegrationTest.java
new file mode 100644
index 0000000..b9ad809
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFIntegrationTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.util.CacheWriterAdapter;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+/**
+ * Tests the basic use cases for PR persistence.
+ */
+@Category(IntegrationTest.class)
+public class PersistPRKRFIntegrationTest {
+  private static final String REGION_NAME = "testRegion";
+  private static final String DISK_STORE_NAME = "testRegionDiskStore";
+  private static final int BUCKETS = 1;
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private InternalCache cache;
+  private Region<String, String> testRegion;
+  private BlockingWriter<String, String> blockingWriter;
+
+  @Before
+  public void setup() throws IOException {
+    cache = (InternalCache) new CacheFactory().create();
+    cache.createDiskStoreFactory().setDiskDirs(new File[] {tempFolder.newFolder("diskDir")})
+        .create(DISK_STORE_NAME);
+    PartitionAttributesImpl partitionAttributes = new PartitionAttributesImpl();
+    partitionAttributes.setTotalNumBuckets(BUCKETS);
+    testRegion = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
+        .setPartitionAttributes(partitionAttributes).setDiskStoreName(DISK_STORE_NAME)
+        .create(REGION_NAME);
+
+    blockingWriter = new BlockingWriter<>();
+  }
+
+  @After
+  public void tearDown() {
+    cache.close();
+  }
+
+  @Test
+  public void closeDiskStoreDuringCreate() throws InterruptedException {
+    testRegion.getAttributesMutator().setCacheWriter(blockingWriter);
+    Future<Void> asyncCreate = CompletableFuture.runAsync(() -> testRegion.put("newKey", "value"));
+    blockingWriter.awaitCreateInProgress();
+    cache.closeDiskStores();
+    blockingWriter.allowCreates();
+    assertThatThrownBy(asyncCreate::get).hasRootCauseInstanceOf(CacheClosedException.class)
+        .hasMessageContaining("The disk store is closed");
+  }
+
+  @Test
+  public void closeDiskStoreDuringUpdate() throws InterruptedException {
+    testRegion.put("existingKey", "value");
+    testRegion.getAttributesMutator().setCacheWriter(blockingWriter);
+    Future<Void> asyncUpdate =
+        CompletableFuture.runAsync(() -> testRegion.put("existingKey", "newValue"));
+    blockingWriter.awaitUpdateInProgress();
+    cache.closeDiskStores();
+    blockingWriter.allowUpdates();
+    assertThatThrownBy(asyncUpdate::get).hasRootCauseInstanceOf(CacheClosedException.class)
+        .hasMessageContaining("The disk store is closed");
+  }
+
+  @Test
+  public void closeDiskStoreDuringDestroy() throws InterruptedException {
+    testRegion.put("existingKey", "value");
+    testRegion.getAttributesMutator().setCacheWriter(blockingWriter);
+    Future<Void> asyncDestroy = CompletableFuture.runAsync(() -> testRegion.remove("existingKey"));
+    blockingWriter.awaitDestroyInProgress();
+    cache.closeDiskStores();
+    blockingWriter.allowDestroys();
+    assertThatThrownBy(asyncDestroy::get).hasRootCauseInstanceOf(CacheClosedException.class)
+        .hasMessageContaining("The disk store is closed");
+  }
+
+  private static class BlockingWriter<K, V> extends CacheWriterAdapter<K, V> implements Declarable {
+    private CountDownLatch beforeCreateLatch = new CountDownLatch(1);
+    private CountDownLatch allowCreates = new CountDownLatch(1);
+    private CountDownLatch beforeUpdateLatch = new CountDownLatch(1);
+    private CountDownLatch allowUpdates = new CountDownLatch(1);
+    private CountDownLatch beforeDestroyLatch = new CountDownLatch(1);
+    private CountDownLatch allowDestroys = new CountDownLatch(1);
+
+    @Override
+    public void beforeCreate(EntryEvent event) {
+      try {
+        beforeCreateLatch.countDown();
+        allowCreates.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void beforeDestroy(EntryEvent event) {
+      try {
+        beforeDestroyLatch.countDown();
+        allowDestroys.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void beforeUpdate(EntryEvent event) {
+      try {
+        beforeUpdateLatch.countDown();
+        allowUpdates.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    void allowCreates() {
+      allowCreates.countDown();
+    }
+
+    void awaitCreateInProgress() throws InterruptedException {
+      beforeCreateLatch.await();
+    }
+
+    void allowDestroys() {
+      allowDestroys.countDown();
+    }
+
+    void awaitDestroyInProgress() throws InterruptedException {
+      beforeDestroyLatch.await();
+    }
+
+    void allowUpdates() {
+      allowUpdates.countDown();
+    }
+
+    void awaitUpdateInProgress() throws InterruptedException {
+      beforeUpdateLatch.await();
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
nreich@apache.org.