You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by on...@apache.org on 2019/08/19 14:43:13 UTC
[geode] branch release/1.10.0 updated: GEODE-7079: Prevent NPE
During Queue Conflation (#3911)
This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch release/1.10.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.10.0 by this push:
new 399a56e GEODE-7079: Prevent NPE During Queue Conflation (#3911)
399a56e is described below
commit 399a56e9bb4f0732c72e61cef7d7ba4ec91887d9
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Wed Aug 14 21:38:31 2019 -0300
GEODE-7079: Prevent NPE During Queue Conflation (#3911)
* GEODE-7079: Prevent NPE During Queue Conflation
- Added tests.
- Fixed minor warnings.
- Use the cached region name when doing conflation instead of the actual region so the processor doesn't need to wait for the actual region to be fully initialized.
Co-authored-by: Benjamin Ross <br...@pivotal.io>
(cherry picked from commit 6f4bbbd96bcecdb82cf7753ce1dae9fa6baebf9b)
---
.../AsyncEventListenerDistributedTest.java | 195 +++++++++++++------
.../wan/AbstractGatewaySenderEventProcessor.java | 4 +-
...rallelGatewaySenderEventProcessorJUnitTest.java | 35 ++++
.../wan/serial/SerialWANConflationDUnitTest.java | 216 +++++++++++++++++++--
4 files changed, 374 insertions(+), 76 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
index e335081..09cab0e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.cache.wan.asyncqueue;
import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
import static org.apache.geode.test.dunit.VM.getVM;
@@ -26,6 +27,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +59,7 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue;
import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
@@ -127,9 +130,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
@Test // serial, ReplicateRegion
public void testSerialAsyncEventQueueSize() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
@@ -146,9 +149,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
- vm0.invoke(() -> waitForDispatcherToPause());
- vm1.invoke(() -> waitForDispatcherToPause());
- vm2.invoke(() -> waitForDispatcherToPause());
+ vm0.invoke(this::waitForDispatcherToPause);
+ vm1.invoke(this::waitForDispatcherToPause);
+ vm2.invoke(this::waitForDispatcherToPause);
vm0.invoke(() -> doPuts(replicateRegionName, 1000));
@@ -159,9 +162,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
@Test // serial, ReplicateRegion
public void testReplicatedSerialAsyncEventQueue() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
@@ -186,9 +189,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
@Test // serial, conflation, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
@@ -205,9 +208,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
- vm0.invoke(() -> waitForDispatcherToPause());
- vm1.invoke(() -> waitForDispatcherToPause());
- vm2.invoke(() -> waitForDispatcherToPause());
+ vm0.invoke(this::waitForDispatcherToPause);
+ vm1.invoke(this::waitForDispatcherToPause);
+ vm2.invoke(this::waitForDispatcherToPause);
Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
@@ -252,16 +255,19 @@ public class AsyncEventListenerDistributedTest implements Serializable {
@Test // serial, persistent, conflation, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+ true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+ true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+ true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
@@ -283,7 +289,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
- createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+ createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL);
createReplicateRegion(replicateRegionName, asyncEventQueueId);
@@ -298,7 +305,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
- createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+ createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL);
createReplicateRegion(replicateRegionName, asyncEventQueueId);
// primary sender
@@ -315,16 +323,19 @@ public class AsyncEventListenerDistributedTest implements Serializable {
@Ignore("TODO: Disabled for 52351")
@Test // serial, persistent, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
+ false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
+ false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
+ false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm0.invoke(() -> {
createReplicateRegion(replicateRegionName, asyncEventQueueId);
@@ -334,11 +345,11 @@ public class AsyncEventListenerDistributedTest implements Serializable {
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
- vm1.invoke(() -> waitForSenderToBecomePrimary());
+ vm1.invoke(this::waitForSenderToBecomePrimary);
vm1.invoke(() -> doPuts(replicateRegionName, 2000));
- vm1.invoke(() -> waitForRegionQueuesToEmpty());
- vm2.invoke(() -> waitForRegionQueuesToEmpty());
+ vm1.invoke(this::waitForRegionQueuesToEmpty);
+ vm2.invoke(this::waitForRegionQueuesToEmpty);
int vm1size = vm1.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
int vm2size = vm2.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
@@ -347,11 +358,65 @@ public class AsyncEventListenerDistributedTest implements Serializable {
assertThat(vm1size + vm2size).isGreaterThanOrEqualTo(2000);
}
+ @Test
+ // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
+ // and the processor started dispatching events before the actual region was available.
+ public void replicatedRegionWithPersistentSerialAsyncEventQueueAndConflationEnabledShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue()
+ throws IOException {
+ // Custom Log File to manually search for exceptions.
+ File customLogFile = temporaryFolder.newFile("memberLog.log");
+ Properties dsProperties = getDistributedSystemProperties();
+ dsProperties.setProperty(ConfigurationProperties.LOG_FILE, customLogFile.getAbsolutePath());
+
+ // Create Region, AsyncEventQueue and Insert Some Entries.
+ vm0.invoke(() -> {
+ createCache();
+ // Large batch time interval and low batch size so no events are processed before the restart.
+ createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 10,
+ createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, 120000);
+ createReplicateRegion(replicateRegionName, asyncEventQueueId);
+ doPuts(replicateRegionName, 5);
+ waitForAsyncEventQueueSize(5);
+ });
+
+ vm0.invoke(() -> {
+ // Restart the cache.
+ getCache().close();
+ cacheRule.createCache(dsProperties);
+
+ // Recover the queue from disk, reduce thresholds so processing starts right away.
+ SpyAsyncEventListener spyAsyncEventListener = new SpyAsyncEventListener();
+ createPersistentAsyncEventQueue(asyncEventQueueId, spyAsyncEventListener, true, 5,
+ createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL);
+ waitForSenderToBecomePrimary();
+
+ // Wait for the processors to start.
+ await().until(() -> {
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ return threads
+ .stream()
+ .filter(t -> t.getName().contains("Processor for GatewaySender_AsyncEventQueue"))
+ .allMatch(Thread::isAlive);
+ });
+
+ // Create the region, processing will continue and no NPE should be thrown anymore.
+ createReplicateRegion(replicateRegionName, asyncEventQueueId);
+ waitForRegionQueuesToEmpty();
+ assertThat(spyAsyncEventListener.getEventsMap().size()).isEqualTo(5);
+ });
+
+ Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line)
+ .as("Dispatcher shouldn't have thrown any errors while processing batches")
+ .doesNotContain("An Exception occurred. The dispatcher will continue.")
+ .doesNotContain("java.lang.NullPointerException"));
+ }
+
@Test // serial, PartitionedRegion
public void testPartitionedSerialAsyncEventQueue() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
@@ -377,9 +442,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
@Test // serial, conflation, PartitionedRegion
public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
@@ -396,9 +461,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
- vm0.invoke(() -> waitForDispatcherToPause());
- vm1.invoke(() -> waitForDispatcherToPause());
- vm2.invoke(() -> waitForDispatcherToPause());
+ vm0.invoke(this::waitForDispatcherToPause);
+ vm1.invoke(this::waitForDispatcherToPause);
+ vm2.invoke(this::waitForDispatcherToPause);
Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
@@ -451,16 +516,19 @@ public class AsyncEventListenerDistributedTest implements Serializable {
*/
@Test // persistent, PartitionedRegion
public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled() {
- vm0.invoke(() -> createCache());
- vm1.invoke(() -> createCache());
- vm2.invoke(() -> createCache());
+ vm0.invoke(this::createCache);
+ vm1.invoke(this::createCache);
+ vm2.invoke(this::createCache);
vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+ false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+ false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
- false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+ false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
@@ -483,7 +551,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
- createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+ createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
@@ -497,7 +566,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
- createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+ createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+ DEFAULT_BATCH_TIME_INTERVAL);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
// primary sender
@@ -600,11 +670,12 @@ public class AsyncEventListenerDistributedTest implements Serializable {
String diskStoreName,
boolean isDiskSynchronous,
int dispatcherThreads,
- int maximumQueueMemory) {
+ int maximumQueueMemory,
+ int batchTimeInterval) {
+
assertThat(asyncEventQueueId).isNotEmpty();
assertThat(asyncEventListener).isNotNull();
assertThat(diskStoreName).isNotEmpty();
-
createDiskStore(diskStoreName, asyncEventQueueId);
AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory();
@@ -614,6 +685,7 @@ public class AsyncEventListenerDistributedTest implements Serializable {
asyncEventQueueFactory.setDiskSynchronous(isDiskSynchronous);
asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads);
asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory);
+ asyncEventQueueFactory.setBatchTimeInterval(batchTimeInterval);
asyncEventQueueFactory.setParallel(false);
asyncEventQueueFactory.setPersistent(true);
@@ -623,17 +695,18 @@ public class AsyncEventListenerDistributedTest implements Serializable {
private void addClosingCacheListener(String regionName, int closeAfterCreateKey) {
assertThat(regionName).isNotEmpty();
- Region region = getCache().getRegion(regionName);
+ Region<Integer, Integer> region = getCache().getRegion(regionName);
assertNotNull(region);
- CacheListenerAdapter cacheListener = new CacheListenerAdapter() {
- @Override
- public void afterCreate(EntryEvent event) {
- if ((Integer) event.getKey() == closeAfterCreateKey) {
- getCache().close();
- }
- }
- };
+ CacheListenerAdapter<Integer, Integer> cacheListener =
+ new CacheListenerAdapter<Integer, Integer>() {
+ @Override
+ public void afterCreate(EntryEvent event) {
+ if ((Integer) event.getKey() == closeAfterCreateKey) {
+ getCache().close();
+ }
+ }
+ };
region.getAttributesMutator().addCacheListener(cacheListener);
}
@@ -676,6 +749,7 @@ public class AsyncEventListenerDistributedTest implements Serializable {
return totalSize;
}
+ @SuppressWarnings("unchecked")
private void waitForAsyncEventListenerWithEventsMapSize(int expectedSize) {
await().untilAsserted(
() -> assertThat(getSpyAsyncEventListener().getEventsMap()).hasSize(expectedSize));
@@ -745,6 +819,7 @@ public class AsyncEventListenerDistributedTest implements Serializable {
}
@Override
+ @SuppressWarnings("unchecked")
public boolean processEvents(List<AsyncEvent> events) {
for (AsyncEvent<K, V> event : events) {
eventsMap.put(event.getKey(), event.getDeserializedValue());
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 3d5405e..698bc7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -783,7 +783,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
if (gsEvent.shouldBeConflated()) {
// The event should be conflated. Create the conflation key
// (comprised of the event's region, key and the operation).
- ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
+ ConflationKey key = new ConflationKey(gsEvent.getRegionPath(),
gsEvent.getKeyToConflate(), gsEvent.getOperation());
// Get the entry at that key
@@ -799,7 +799,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
} else {
// The event should not be conflated (create or destroy). Add it to
// the map.
- ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
+ ConflationKey key = new ConflationKey(gsEvent.getRegionPath(),
gsEvent.getKeyToConflate(), gsEvent.getOperation(), gsEvent.getShadowKey());
conflatedEventsMap.put(key, gsEvent);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 7b0cfb2..e6abad5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -15,7 +15,10 @@
package org.apache.geode.internal.cache.wan.parallel;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -106,6 +109,38 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
assertThat(gsei2.getShadowKey()).isEqualTo(lastUpdateShadowKey);
}
+ // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
+ // and the processor started dispatching events before the actual region was available.
+ @Test
+ public void verifyBatchConflationWithNullEventRegionDoesNowThrowException()
+ throws Exception {
+ AbstractGatewaySenderEventProcessor processor =
+ ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+ List<GatewaySenderEventImpl> events = new ArrayList<GatewaySenderEventImpl>();
+
+ LocalRegion lr = mock(LocalRegion.class);
+ when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+ when(lr.getCache()).thenReturn(this.cache);
+
+ // Create two events for the same key, so that conflation will be needed. Mock the getRegion()
+ // value to return as null so we will hit the NPE if
+ // it is referenced.
+ GatewaySenderEventImpl gsei1 =
+ spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+ "Object_13964", "Object_13964_1", 100, 27709));
+ doReturn(null).when(gsei1).getRegion();
+
+ GatewaySenderEventImpl gsei2 =
+ spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+ "Object_13964", "Object_13964_2", 101, 27822));
+ doReturn(null).when(gsei2).getRegion();
+
+ events.add(gsei1);
+ events.add(gsei2);
+ assertThatCode(() -> processor.conflate(events)).doesNotThrowAnyException();
+ }
+
@Test
public void validateBatchConflationWithBatchContainingDuplicateNonConflatableEvents()
throws Exception {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index a3f2f30..df3a19d 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -14,23 +14,149 @@
*/
package org.apache.geode.internal.cache.wan.serial;
+import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_TIME_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.junit.Rule;
import org.junit.Test;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
public class SerialWANConflationDUnitTest extends WANTestBase {
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+ private void createCacheWithLogFile(Integer locPort, String logFile) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ String logLevel = System.getProperty(LOG_LEVEL, "info");
+ props.setProperty(LOG_LEVEL, logLevel);
+ props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+ props.setProperty(LOG_FILE, logFile);
+
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ }
+
+ private File createDirectory(String name) {
+ assertThat(name).isNotEmpty();
+
+ File directory = new File(temporaryFolder.getRoot(), name);
+ if (!directory.exists()) {
+ try {
+ return temporaryFolder.newFolder(name);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ return directory;
+ }
+
+ private GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName,
+ boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
+ boolean isPersistent, GatewayEventFilter filter, int numDispatchers,
+ GatewaySender.OrderPolicy policy, int socketBufferSize, int batchTimeInterval) {
+ InternalGatewaySenderFactory gateway =
+ (InternalGatewaySenderFactory) cache.createGatewaySenderFactory();
+ gateway.setParallel(isParallel);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.setDispatcherThreads(numDispatchers);
+ gateway.setOrderPolicy(policy);
+ gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
+ gateway.setSocketBufferSize(socketBufferSize);
+ gateway.setBatchTimeInterval(batchTimeInterval);
+
+ if (filter != null) {
+ eventFilter = filter;
+ gateway.addGatewayEventFilter(filter);
+ }
+
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+ } else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+
+ return gateway;
+ }
+
+ private void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+ int batchTimeInterval) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+ try {
+ File persistentDirectory = createDirectory(dsName + "_disk_" + VM.getCurrentVMNum());
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] {persistentDirectory};
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory,
+ batchSize, isConflation, isPersistent, filter, numDispatcherThreadsForTheRun,
+ GatewaySender.DEFAULT_ORDER_POLICY,
+ GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE, batchTimeInterval);
+ gateway.create(dsName, remoteDsId);
+
+ } finally {
+ exln.remove();
+ }
+ }
+
+ private void waitForEventQueueSize(int expectedQueueSize) {
+ await().untilAsserted(() -> {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ Optional<GatewaySender> sender =
+ senders.stream().filter(s -> s.getId().equals("ln")).findFirst();
+ assertThat(sender.isPresent()).isTrue();
+ Set<RegionQueue> queues = ((AbstractGatewaySender) sender.get()).getQueues();
+ int totalEvents = queues.stream().mapToInt(RegionQueue::size).sum();
+ assertThat(totalEvents).isEqualTo(expectedQueueSize);
+ });
+ }
+
@Test
- public void testSerialPropagationPartitionRegionBatchConflation() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+ public void testSerialPropagationPartitionRegionBatchConflation() {
+ Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
@@ -59,7 +185,7 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
vm7.invoke(() -> pauseSender("ln"));
- final Map keyValues = new HashMap();
+ final Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 1; i <= 10; i++) {
for (int j = 1; j <= 10; j++) {
@@ -92,9 +218,9 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
}
@Test
- public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+ public void testSerialPropagationPartitionRegionConflationDuringEnqueue() {
+ Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
@@ -123,7 +249,7 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
vm7.invoke(() -> pauseSender("ln"));
- final Map keyValues = new HashMap();
+ final Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 1; i <= 10; i++) {
for (int j = 1; j <= 10; j++) {
@@ -134,8 +260,8 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
ArrayList<Integer> v4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
- assertTrue("After conflation during enqueue, there should be only 20 events",
- v4List.get(0) == 20);
+ assertEquals("After conflation during enqueue, there should be only 20 events", 20,
+ (int) v4List.get(0));
vm4.invoke(() -> resumeSender("ln"));
vm5.invoke(() -> resumeSender("ln"));
@@ -150,13 +276,75 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
ArrayList<Integer> v7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- assertTrue("No events in secondary queue stats since it's serial sender",
- (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0);
- assertTrue("Total queued events should be 100",
- (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100);
+ assertEquals("No events in secondary queue stats since it's serial sender", 0,
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)));
+ assertEquals("Total queued events should be 100", 100,
+ (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)));
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
}
+ @Test
+ // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
+ // and the processor started dispatching events before the actual region was available.
+ public void persistentSerialGatewayWithConflationShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue()
+ throws IOException {
+ Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> createReplicatedRegion(getTestMethodName(), null, Scope.DISTRIBUTED_ACK,
+ DataPolicy.PERSISTENT_REPLICATE, isOffHeap()));
+ createReceiverInVMs(vm2);
+
+ // Create Region, associate gateway and insert some entries.
+ vm4.invoke(() -> {
+ createCache(lnPort);
+ createReplicatedRegion(getTestMethodName(), "ln", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE,
+ isOffHeap());
+
+ // Large batch time interval and low batch size so no events are processed before the restart.
+ createSender("ln", 2, false, 100, 10, true, true, null, 120000);
+
+ Region<Integer, Integer> region = cache.getRegion(getTestMethodName());
+ for (int i = 0; i < 5; i++) {
+ region.put(i, i);
+ }
+ waitForEventQueueSize(5);
+ });
+ vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
+
+ // Custom Log File to manually search for exceptions.
+ File customLogFile = temporaryFolder.newFile("memberLog.log");
+
+ vm4.invoke(() -> {
+ // Restart the cache.
+ cache.close();
+ createCacheWithLogFile(lnPort, customLogFile.getAbsolutePath());
+
+ // Recover the queue from disk, reduce batch thresholds so processing starts right away.
+ createSender("ln", 2, false, 100, 5, true, true, null, DEFAULT_BATCH_TIME_INTERVAL);
+ waitForSenderToBecomePrimary("ln");
+
+ // Wait for the processors to start.
+ await().until(() -> {
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ return threads
+ .stream()
+ .filter(t -> t.getName().contains("Processor for GatewaySender_ln"))
+ .allMatch(Thread::isAlive);
+ });
+
+ // Create the region, processing will continue and no NPE should be thrown anymore.
+ createReplicatedRegion(getTestMethodName(), "ln", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE,
+ isOffHeap());
+ });
+ vm2.invoke(() -> validateRegionSize(getTestMethodName(), 5));
+
+ Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line)
+ .as("Dispatchers shouldn't have thrown any errors while processing batches")
+ .doesNotContain("An Exception occurred. The dispatcher will continue.")
+ .doesNotContain("java.lang.NullPointerException"));
+ }
}