You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nr...@apache.org on 2018/01/31 22:34:56 UTC
[geode] branch develop updated: GEODE-4390: Replace flaky test with
new tests (#1371)
This is an automated email from the ASF dual-hosted git repository.
nreich pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4a75148 GEODE-4390: Replace flaky test with new tests (#1371)
4a75148 is described below
commit 4a7514883515ed804b6c5e905e11627395036db9
Author: Nick Reich <nr...@pivotal.io>
AuthorDate: Wed Jan 31 14:34:52 2018 -0800
GEODE-4390: Replace flaky test with new tests (#1371)
---
.../geode/internal/cache/GemFireCacheImpl.java | 1 +
.../apache/geode/internal/cache/InternalCache.java | 2 +
.../internal/cache/xmlcache/CacheCreation.java | 5 +
.../cache/partitioned/PersistPRKRFDUnitTest.java | 258 ---------------------
.../partitioned/PersistPRKRFIntegrationTest.java | 176 ++++++++++++++
5 files changed, 184 insertions(+), 258 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 014f0ea..3aed22b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2521,6 +2521,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
+ @Override
public void closeDiskStores() {
Iterator<DiskStoreImpl> it = this.diskStores.values().iterator();
while (it.hasNext()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index d872037..562aaa7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -344,4 +344,6 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime {
InternalQueryService getQueryService();
Set<AsyncEventQueue> getAsyncEventQueues(boolean visibleOnly);
+
+ void closeDiskStores();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 65cb986..1d262e9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -1259,6 +1259,11 @@ public class CacheCreation implements InternalCache {
}
@Override
+ public void closeDiskStores() {
+ throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+ }
+
+ @Override
public AsyncEventQueue getAsyncEventQueue(String id) {
for (AsyncEventQueue asyncEventQueue : this.asyncEventQueues) {
if (asyncEventQueue.getId().equals(id)) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFDUnitTest.java
deleted file mode 100644
index efb4c2f..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFDUnitTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.partitioned;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Properties;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.Declarable;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.util.CacheWriterAdapter;
-import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.internal.cache.DiskRegion;
-import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
-
-/**
- * Tests the basic use cases for PR persistence.
- */
-@Category(DistributedTest.class)
-public class PersistPRKRFDUnitTest extends PersistentPartitionedRegionTestBase {
-
- private static final int NUM_BUCKETS = 15;
- private static final int MAX_WAIT = 30 * 1000;
- static Object lockObject = new Object();
-
- /**
- * do a put/modify/destroy while closing disk store
- *
- * to turn on debug, add following parameter in local.conf: hydra.VmPrms-extraVMArgs +=
- * "-Ddisk.KRF_DEBUG=true";
- */
- @Test
- @Category(FlakyTest.class)
- public void testCloseDiskStoreWhenPut() {
- final String title = "testCloseDiskStoreWhenPut:";
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
-
- createPR(vm0, 0);
- createData(vm0, 0, 10, "a");
- vm0.invoke(new CacheSerializableRunnable(title + "server add writer") {
- public void run2() throws CacheException {
- Region region = getRootRegion(getPartitionedRegionName());
- // let the region to hold on the put until diskstore is closed
- if (!DiskStoreImpl.KRF_DEBUG) {
- region.getAttributesMutator().setCacheWriter(new MyWriter());
- }
- }
- });
-
- // create test
- AsyncInvocation async1 = vm0.invokeAsync(new CacheSerializableRunnable(title + "async create") {
- public void run2() throws CacheException {
- Region region = getRootRegion(getPartitionedRegionName());
- IgnoredException expect = IgnoredException.addIgnoredException("CacheClosedException");
- try {
- region.put(10, "b");
- fail("Expect CacheClosedException here");
- } catch (CacheClosedException cce) {
- System.out.println(title + cce.getMessage());
- if (DiskStoreImpl.KRF_DEBUG) {
- assert cce.getMessage().contains("The disk store is closed.");
- } else {
- assert cce.getMessage().contains("The disk store is closed");
- }
- } finally {
- expect.remove();
- }
- }
- });
- vm0.invoke(new CacheSerializableRunnable(title + "close disk store") {
- public void run2() throws CacheException {
- GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
- Wait.pause(500);
- gfc.closeDiskStores();
- synchronized (lockObject) {
- lockObject.notify();
- }
- }
- });
- ThreadUtils.join(async1, MAX_WAIT);
- closeCache(vm0);
-
- // update
- createPR(vm0, 0);
- vm0.invoke(new CacheSerializableRunnable(title + "server add writer") {
- public void run2() throws CacheException {
- Region region = getRootRegion(getPartitionedRegionName());
- // let the region to hold on the put until diskstore is closed
- if (!DiskStoreImpl.KRF_DEBUG) {
- region.getAttributesMutator().setCacheWriter(new MyWriter());
- }
- }
- });
- async1 = vm0.invokeAsync(new CacheSerializableRunnable(title + "async update") {
- public void run2() throws CacheException {
- Region region = getRootRegion(getPartitionedRegionName());
- IgnoredException expect = IgnoredException.addIgnoredException("CacheClosedException");
- try {
- region.put(1, "b");
- fail("Expect CacheClosedException here");
- } catch (CacheClosedException cce) {
- System.out.println(title + cce.getMessage());
- if (DiskStoreImpl.KRF_DEBUG) {
- assert cce.getMessage().contains("The disk store is closed.");
- } else {
- assert cce.getMessage().contains("The disk store is closed");
- }
- } finally {
- expect.remove();
- }
- }
- });
- vm0.invoke(new CacheSerializableRunnable(title + "close disk store") {
- public void run2() throws CacheException {
- GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
- Wait.pause(500);
- gfc.closeDiskStores();
- synchronized (lockObject) {
- lockObject.notify();
- }
- }
- });
- ThreadUtils.join(async1, MAX_WAIT);
- closeCache(vm0);
-
- // destroy
- createPR(vm0, 0);
- vm0.invoke(new CacheSerializableRunnable(title + "server add writer") {
- public void run2() throws CacheException {
- Region region = getRootRegion(getPartitionedRegionName());
- // let the region to hold on the put until diskstore is closed
- if (!DiskStoreImpl.KRF_DEBUG) {
- region.getAttributesMutator().setCacheWriter(new MyWriter());
- }
- }
- });
- async1 = vm0.invokeAsync(new CacheSerializableRunnable(title + "async destroy") {
- public void run2() throws CacheException {
- Region region = getRootRegion(getPartitionedRegionName());
- IgnoredException expect = IgnoredException.addIgnoredException("CacheClosedException");
- try {
- region.destroy(2, "b");
- fail("Expect CacheClosedException here");
- } catch (CacheClosedException cce) {
- System.out.println(title + cce.getMessage());
- if (DiskStoreImpl.KRF_DEBUG) {
- assert cce.getMessage().contains("The disk store is closed.");
- } else {
- assert cce.getMessage().contains("The disk store is closed");
- }
- } finally {
- expect.remove();
- }
- }
- });
- vm0.invoke(new CacheSerializableRunnable(title + "close disk store") {
- public void run2() throws CacheException {
- GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
- Wait.pause(500);
- gfc.closeDiskStores();
- synchronized (lockObject) {
- lockObject.notify();
- }
- }
- });
- ThreadUtils.join(async1, MAX_WAIT);
-
- checkData(vm0, 0, 10, "a");
- checkData(vm0, 10, 11, null);
- closeCache(vm0);
- }
-
- void checkRecoveredFromDisk(VM vm, final int bucketId, final boolean recoveredLocally) {
- vm.invoke(new SerializableRunnable("check recovered from disk") {
- @Override
- public void run() {
- Cache cache = getCache();
- PartitionedRegion region = (PartitionedRegion) cache.getRegion(getPartitionedRegionName());
- DiskRegion disk = region.getRegionAdvisor().getBucket(bucketId).getDiskRegion();
- if (recoveredLocally) {
- assertEquals(0, disk.getStats().getRemoteInitializations());
- assertEquals(1, disk.getStats().getLocalInitializations());
- } else {
- assertEquals(1, disk.getStats().getRemoteInitializations());
- assertEquals(0, disk.getStats().getLocalInitializations());
- }
- }
- });
- }
-
- private static class MyWriter extends CacheWriterAdapter implements Declarable {
- public MyWriter() {}
-
- public void init(Properties props) {}
-
- public void beforeCreate(EntryEvent event) {
- try {
- synchronized (lockObject) {
- lockObject.wait();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void beforeUpdate(EntryEvent event) {
- try {
- synchronized (lockObject) {
- lockObject.wait();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void beforeDestroy(EntryEvent event) {
- try {
- synchronized (lockObject) {
- lockObject.wait();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFIntegrationTest.java
new file mode 100644
index 0000000..b9ad809
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistPRKRFIntegrationTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.util.CacheWriterAdapter;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+/**
+ * Tests the basic use cases for PR persistence.
+ */
+@Category(IntegrationTest.class)
+public class PersistPRKRFIntegrationTest {
+ private static final String REGION_NAME = "testRegion";
+ private static final String DISK_STORE_NAME = "testRegionDiskStore";
+ private static final int BUCKETS = 1;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private InternalCache cache;
+ private Region<String, String> testRegion;
+ private BlockingWriter<String, String> blockingWriter;
+
+ @Before
+ public void setup() throws IOException {
+ cache = (InternalCache) new CacheFactory().create();
+ cache.createDiskStoreFactory().setDiskDirs(new File[] {tempFolder.newFolder("diskDir")})
+ .create(DISK_STORE_NAME);
+ PartitionAttributesImpl partitionAttributes = new PartitionAttributesImpl();
+ partitionAttributes.setTotalNumBuckets(BUCKETS);
+ testRegion = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
+ .setPartitionAttributes(partitionAttributes).setDiskStoreName(DISK_STORE_NAME)
+ .create(REGION_NAME);
+
+ blockingWriter = new BlockingWriter<>();
+ }
+
+ @After
+ public void tearDown() {
+ cache.close();
+ }
+
+ @Test
+ public void closeDiskStoreDuringCreate() throws InterruptedException {
+ testRegion.getAttributesMutator().setCacheWriter(blockingWriter);
+ Future<Void> asyncCreate = CompletableFuture.runAsync(() -> testRegion.put("newKey", "value"));
+ blockingWriter.awaitCreateInProgress();
+ cache.closeDiskStores();
+ blockingWriter.allowCreates();
+ assertThatThrownBy(asyncCreate::get).hasRootCauseInstanceOf(CacheClosedException.class)
+ .hasMessageContaining("The disk store is closed");
+ }
+
+ @Test
+ public void closeDiskStoreDuringUpdate() throws InterruptedException {
+ testRegion.put("existingKey", "value");
+ testRegion.getAttributesMutator().setCacheWriter(blockingWriter);
+ Future<Void> asyncUpdate =
+ CompletableFuture.runAsync(() -> testRegion.put("existingKey", "newValue"));
+ blockingWriter.awaitUpdateInProgress();
+ cache.closeDiskStores();
+ blockingWriter.allowUpdates();
+ assertThatThrownBy(asyncUpdate::get).hasRootCauseInstanceOf(CacheClosedException.class)
+ .hasMessageContaining("The disk store is closed");
+ }
+
+ @Test
+ public void closeDiskStoreDuringDestroy() throws InterruptedException {
+ testRegion.put("existingKey", "value");
+ testRegion.getAttributesMutator().setCacheWriter(blockingWriter);
+ Future<Void> asyncDestroy = CompletableFuture.runAsync(() -> testRegion.remove("existingKey"));
+ blockingWriter.awaitDestroyInProgress();
+ cache.closeDiskStores();
+ blockingWriter.allowDestroys();
+ assertThatThrownBy(asyncDestroy::get).hasRootCauseInstanceOf(CacheClosedException.class)
+ .hasMessageContaining("The disk store is closed");
+ }
+
+ private static class BlockingWriter<K, V> extends CacheWriterAdapter<K, V> implements Declarable {
+ private CountDownLatch beforeCreateLatch = new CountDownLatch(1);
+ private CountDownLatch allowCreates = new CountDownLatch(1);
+ private CountDownLatch beforeUpdateLatch = new CountDownLatch(1);
+ private CountDownLatch allowUpdates = new CountDownLatch(1);
+ private CountDownLatch beforeDestroyLatch = new CountDownLatch(1);
+ private CountDownLatch allowDestroys = new CountDownLatch(1);
+
+ @Override
+ public void beforeCreate(EntryEvent event) {
+ try {
+ beforeCreateLatch.countDown();
+ allowCreates.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beforeDestroy(EntryEvent event) {
+ try {
+ beforeDestroyLatch.countDown();
+ allowDestroys.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beforeUpdate(EntryEvent event) {
+ try {
+ beforeUpdateLatch.countDown();
+ allowUpdates.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ void allowCreates() {
+ allowCreates.countDown();
+ }
+
+ void awaitCreateInProgress() throws InterruptedException {
+ beforeCreateLatch.await();
+ }
+
+ void allowDestroys() {
+ allowDestroys.countDown();
+ }
+
+ void awaitDestroyInProgress() throws InterruptedException {
+ beforeDestroyLatch.await();
+ }
+
+ void allowUpdates() {
+ allowUpdates.countDown();
+ }
+
+ void awaitUpdateInProgress() throws InterruptedException {
+ beforeUpdateLatch.await();
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
nreich@apache.org.