You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/30 17:42:01 UTC
[09/43] geode git commit: GEODE-2993: Rethrow CacheClosedException
from AbstractGatewaySender.distribute()
GEODE-2993: Rethrow CacheClosedException from AbstractGatewaySender.distribute()
- rethrow CacheClosedException
- Add test for cache close while enqueuing event in AEQ.
- Add cleanup of disk dirs created by test.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0fe0a106
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0fe0a106
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0fe0a106
Branch: refs/heads/feature/GEODE-2632-17
Commit: 0fe0a1061065f07d4b734d7055f56ad1635f1a2a
Parents: c1ab3ff
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
Authored: Thu May 25 15:31:16 2017 -0700
Committer: Lynn Hughes-Godfrey <lh...@pivotal.io>
Committed: Thu May 25 15:31:16 2017 -0700
----------------------------------------------------------------------
.../cache/wan/AbstractGatewaySender.java | 1 +
.../cache/wan/AsyncEventQueueTestBase.java | 2 +
.../asyncqueue/AsyncEventListenerDUnitTest.java | 102 +++++++++++++++++++
3 files changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 7ed9b51..c38d547 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -973,6 +973,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
ev.enqueueEvent(operation, clonedEvent, substituteValue);
} catch (CancelException e) {
logger.debug("caught cancel exception", e);
+ throw e;
} catch (RegionDestroyedException e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.GatewayImpl_0_AN_EXCEPTION_OCCURRED_WHILE_QUEUEING_1_TO_PERFORM_OPERATION_2_FOR_3,
http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 6fe7ee9..dc7a218 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -98,6 +98,7 @@ import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
@@ -1555,6 +1556,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
public static void cleanupVM() throws IOException {
closeCache();
+ JUnit4CacheTestCase.cleanDiskDirs();
}
public static void closeCache() throws IOException {
http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 3dd0550..795af36 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,11 +30,18 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
@@ -42,11 +50,14 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase;
+import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
@@ -1674,6 +1685,66 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> getBucketMoved(vm2, "ln"));
}
+ @Test
+ public void testCacheClosedBeforeAEQWrite() {
+ Integer lnPort =
+ (Integer) vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
+
+ vm1.invoke(createCacheRunnable(lnPort));
+ vm2.invoke(createCacheRunnable(lnPort));
+ vm3.invoke(createCacheRunnable(lnPort));
+ final DistributedMember member1 =
+ vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+
+ vm1.invoke(() -> addAEQWithCacheCloseFilter());
+ vm2.invoke(() -> addAEQWithCacheCloseFilter());
+
+ vm1.invoke(() -> createPersistentPartitionRegion());
+ vm2.invoke(() -> createPersistentPartitionRegion());
+ vm3.invoke(() -> {
+ AttributesFactory fact = new AttributesFactory();
+
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(16);
+ pfact.setLocalMaxMemory(0);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setOffHeap(isOffHeap());
+ Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln")
+ .create(getTestMethodName() + "_PR");
+
+ });
+
+ vm3.invoke(() -> {
+ Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+ r.put(1, 1);
+ r.put(2, 2);
+ // This will trigger the gateway event filter to close the cache
+ try {
+ r.removeAll(Collections.singleton(1));
+ fail("Should have received a partition offline exception");
+ } catch (PartitionOfflineException expected) {
+
+ }
+ });
+ }
+
+ private void createPersistentPartitionRegion() {
+ AttributesFactory fact = new AttributesFactory();
+
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(16);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ fact.setOffHeap(isOffHeap());
+ Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln")
+ .create(getTestMethodName() + "_PR");
+ }
+
+ private void addAEQWithCacheCloseFilter() {
+ cache.createAsyncEventQueueFactory().addGatewayEventFilter(new CloseCacheGatewayFilter())
+ .setPersistent(true).setParallel(true).create("ln", new MyAsyncEventListener());
+ }
+
private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) {
return vm.invoke(() -> {
final BucketMovingAsyncEventListener listener =
@@ -1690,6 +1761,37 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
});
}
+ private final class CloseCacheGatewayFilter implements GatewayEventFilter {
+ @Override
+ public boolean beforeEnqueue(final GatewayQueueEvent event) {
+ if (event.getOperation().isRemoveAll()) {
+ new Thread(() -> cache.close()).start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ throw new CacheClosedException();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean beforeTransmit(final GatewayQueueEvent event) {
+ return false;
+ }
+
+ @Override
+ public void afterAcknowledgement(final GatewayQueueEvent event) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
private static class BucketMovingAsyncEventListener implements AsyncEventListener {
private final DistributedMember destination;
private boolean moved;