You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by on...@apache.org on 2019/08/19 14:43:13 UTC

[geode] branch release/1.10.0 updated: GEODE-7079: Prevent NPE During Queue Conflation (#3911)

This is an automated email from the ASF dual-hosted git repository.

onichols pushed a commit to branch release/1.10.0
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/release/1.10.0 by this push:
     new 399a56e  GEODE-7079: Prevent NPE During Queue Conflation (#3911)
399a56e is described below

commit 399a56e9bb4f0732c72e61cef7d7ba4ec91887d9
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Wed Aug 14 21:38:31 2019 -0300

    GEODE-7079: Prevent NPE During Queue Conflation (#3911)
    
    * GEODE-7079: Prevent NPE During Queue Conflation
    
    - Added tests.
    - Fixed minor warnings.
    - Use the cached region name when doing conflation instead of the actual region so the processor doesn't need to wait for the actual region to be fully initialized.
    
    Co-authored-by: Benjamin Ross <br...@pivotal.io>
    (cherry picked from commit 6f4bbbd96bcecdb82cf7753ce1dae9fa6baebf9b)
---
 .../AsyncEventListenerDistributedTest.java         | 195 +++++++++++++------
 .../wan/AbstractGatewaySenderEventProcessor.java   |   4 +-
 ...rallelGatewaySenderEventProcessorJUnitTest.java |  35 ++++
 .../wan/serial/SerialWANConflationDUnitTest.java   | 216 +++++++++++++++++++--
 4 files changed, 374 insertions(+), 76 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
index e335081..09cab0e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.cache.wan.asyncqueue;
 
 import static org.apache.geode.cache.RegionShortcut.PARTITION;
 import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
 import static org.apache.geode.test.dunit.VM.getVM;
@@ -26,6 +27,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -57,6 +59,7 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue;
 import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.InternalGatewaySender;
@@ -127,9 +130,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
 
   @Test // serial, ReplicateRegion
   public void testSerialAsyncEventQueueSize() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
         100, dispatcherThreadCount, 100));
@@ -146,9 +149,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     vm1.invoke(() -> getInternalGatewaySender().pause());
     vm2.invoke(() -> getInternalGatewaySender().pause());
 
-    vm0.invoke(() -> waitForDispatcherToPause());
-    vm1.invoke(() -> waitForDispatcherToPause());
-    vm2.invoke(() -> waitForDispatcherToPause());
+    vm0.invoke(this::waitForDispatcherToPause);
+    vm1.invoke(this::waitForDispatcherToPause);
+    vm2.invoke(this::waitForDispatcherToPause);
 
     vm0.invoke(() -> doPuts(replicateRegionName, 1000));
 
@@ -159,9 +162,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
 
   @Test // serial, ReplicateRegion
   public void testReplicatedSerialAsyncEventQueue() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
         100, dispatcherThreadCount, 100));
@@ -186,9 +189,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
 
   @Test // serial, conflation, ReplicateRegion
   public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
         100, dispatcherThreadCount, 100));
@@ -205,9 +208,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     vm1.invoke(() -> getInternalGatewaySender().pause());
     vm2.invoke(() -> getInternalGatewaySender().pause());
 
-    vm0.invoke(() -> waitForDispatcherToPause());
-    vm1.invoke(() -> waitForDispatcherToPause());
-    vm2.invoke(() -> waitForDispatcherToPause());
+    vm0.invoke(this::waitForDispatcherToPause);
+    vm1.invoke(this::waitForDispatcherToPause);
+    vm2.invoke(this::waitForDispatcherToPause);
 
     Map<Integer, Integer> keyValues = new HashMap<>();
     for (int i = 0; i < 1000; i++) {
@@ -252,16 +255,19 @@ public class AsyncEventListenerDistributedTest implements Serializable {
 
   @Test // serial, persistent, conflation, ReplicateRegion
   public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+        true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
     vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+        true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
     vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+        true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
 
     vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
     vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
@@ -283,7 +289,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
       createCache();
 
       createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
-          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+          DEFAULT_BATCH_TIME_INTERVAL);
 
       createReplicateRegion(replicateRegionName, asyncEventQueueId);
 
@@ -298,7 +305,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
 
       createCache();
       createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
-          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+          DEFAULT_BATCH_TIME_INTERVAL);
       createReplicateRegion(replicateRegionName, asyncEventQueueId);
 
       // primary sender
@@ -315,16 +323,19 @@ public class AsyncEventListenerDistributedTest implements Serializable {
   @Ignore("TODO: Disabled for 52351")
   @Test // serial, persistent, ReplicateRegion
   public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
+        false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
     vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
+        false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
     vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
+        false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
 
     vm0.invoke(() -> {
       createReplicateRegion(replicateRegionName, asyncEventQueueId);
@@ -334,11 +345,11 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
     vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
 
-    vm1.invoke(() -> waitForSenderToBecomePrimary());
+    vm1.invoke(this::waitForSenderToBecomePrimary);
     vm1.invoke(() -> doPuts(replicateRegionName, 2000));
 
-    vm1.invoke(() -> waitForRegionQueuesToEmpty());
-    vm2.invoke(() -> waitForRegionQueuesToEmpty());
+    vm1.invoke(this::waitForRegionQueuesToEmpty);
+    vm2.invoke(this::waitForRegionQueuesToEmpty);
 
     int vm1size = vm1.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
     int vm2size = vm2.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
@@ -347,11 +358,65 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     assertThat(vm1size + vm2size).isGreaterThanOrEqualTo(2000);
   }
 
+  @Test
+  // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
+  // and the processor started dispatching events before the actual region was available.
+  public void replicatedRegionWithPersistentSerialAsyncEventQueueAndConflationEnabledShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue()
+      throws IOException {
+    // Custom Log File to manually search for exceptions.
+    File customLogFile = temporaryFolder.newFile("memberLog.log");
+    Properties dsProperties = getDistributedSystemProperties();
+    dsProperties.setProperty(ConfigurationProperties.LOG_FILE, customLogFile.getAbsolutePath());
+
+    // Create Region, AsyncEventQueue and Insert Some Entries.
+    vm0.invoke(() -> {
+      createCache();
+      // Large batch time interval and low batch size so no events are processed before the restart.
+      createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 10,
+          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, 120000);
+      createReplicateRegion(replicateRegionName, asyncEventQueueId);
+      doPuts(replicateRegionName, 5);
+      waitForAsyncEventQueueSize(5);
+    });
+
+    vm0.invoke(() -> {
+      // Restart the cache.
+      getCache().close();
+      cacheRule.createCache(dsProperties);
+
+      // Recover the queue from disk, reduce thresholds so processing starts right away.
+      SpyAsyncEventListener spyAsyncEventListener = new SpyAsyncEventListener();
+      createPersistentAsyncEventQueue(asyncEventQueueId, spyAsyncEventListener, true, 5,
+          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+          DEFAULT_BATCH_TIME_INTERVAL);
+      waitForSenderToBecomePrimary();
+
+      // Wait for the processors to start.
+      await().until(() -> {
+        Set<Thread> threads = Thread.getAllStackTraces().keySet();
+        return threads
+            .stream()
+            .filter(t -> t.getName().contains("Processor for GatewaySender_AsyncEventQueue"))
+            .allMatch(Thread::isAlive);
+      });
+
+      // Create the region, processing will continue and no NPE should be thrown anymore.
+      createReplicateRegion(replicateRegionName, asyncEventQueueId);
+      waitForRegionQueuesToEmpty();
+      assertThat(spyAsyncEventListener.getEventsMap().size()).isEqualTo(5);
+    });
+
+    Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line)
+        .as("Dispatcher shouldn't have thrown any errors while processing batches")
+        .doesNotContain("An Exception occurred. The dispatcher will continue.")
+        .doesNotContain("java.lang.NullPointerException"));
+  }
+
   @Test // serial, PartitionedRegion
   public void testPartitionedSerialAsyncEventQueue() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
         100, dispatcherThreadCount, 100));
@@ -377,9 +442,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
 
   @Test // serial, conflation, PartitionedRegion
   public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
         100, dispatcherThreadCount, 100));
@@ -396,9 +461,9 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     vm1.invoke(() -> getInternalGatewaySender().pause());
     vm2.invoke(() -> getInternalGatewaySender().pause());
 
-    vm0.invoke(() -> waitForDispatcherToPause());
-    vm1.invoke(() -> waitForDispatcherToPause());
-    vm2.invoke(() -> waitForDispatcherToPause());
+    vm0.invoke(this::waitForDispatcherToPause);
+    vm1.invoke(this::waitForDispatcherToPause);
+    vm2.invoke(this::waitForDispatcherToPause);
 
     Map<Integer, Integer> keyValues = new HashMap<>();
     for (int i = 0; i < 1000; i++) {
@@ -451,16 +516,19 @@ public class AsyncEventListenerDistributedTest implements Serializable {
    */
   @Test // persistent, PartitionedRegion
   public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled() {
-    vm0.invoke(() -> createCache());
-    vm1.invoke(() -> createCache());
-    vm2.invoke(() -> createCache());
+    vm0.invoke(this::createCache);
+    vm1.invoke(this::createCache);
+    vm2.invoke(this::createCache);
 
     vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+        false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
     vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+        false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
     vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
-        false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
+        false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
+        DEFAULT_BATCH_TIME_INTERVAL));
 
     vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
     vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
@@ -483,7 +551,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
       createCache();
 
       createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
-          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+          DEFAULT_BATCH_TIME_INTERVAL);
 
       createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
 
@@ -497,7 +566,8 @@ public class AsyncEventListenerDistributedTest implements Serializable {
 
       createCache();
       createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
-          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
+          createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
+          DEFAULT_BATCH_TIME_INTERVAL);
       createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
 
       // primary sender
@@ -600,11 +670,12 @@ public class AsyncEventListenerDistributedTest implements Serializable {
       String diskStoreName,
       boolean isDiskSynchronous,
       int dispatcherThreads,
-      int maximumQueueMemory) {
+      int maximumQueueMemory,
+      int batchTimeInterval) {
+
     assertThat(asyncEventQueueId).isNotEmpty();
     assertThat(asyncEventListener).isNotNull();
     assertThat(diskStoreName).isNotEmpty();
-
     createDiskStore(diskStoreName, asyncEventQueueId);
 
     AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory();
@@ -614,6 +685,7 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     asyncEventQueueFactory.setDiskSynchronous(isDiskSynchronous);
     asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads);
     asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory);
+    asyncEventQueueFactory.setBatchTimeInterval(batchTimeInterval);
     asyncEventQueueFactory.setParallel(false);
     asyncEventQueueFactory.setPersistent(true);
 
@@ -623,17 +695,18 @@ public class AsyncEventListenerDistributedTest implements Serializable {
   private void addClosingCacheListener(String regionName, int closeAfterCreateKey) {
     assertThat(regionName).isNotEmpty();
 
-    Region region = getCache().getRegion(regionName);
+    Region<Integer, Integer> region = getCache().getRegion(regionName);
     assertNotNull(region);
 
-    CacheListenerAdapter cacheListener = new CacheListenerAdapter() {
-      @Override
-      public void afterCreate(EntryEvent event) {
-        if ((Integer) event.getKey() == closeAfterCreateKey) {
-          getCache().close();
-        }
-      }
-    };
+    CacheListenerAdapter<Integer, Integer> cacheListener =
+        new CacheListenerAdapter<Integer, Integer>() {
+          @Override
+          public void afterCreate(EntryEvent event) {
+            if ((Integer) event.getKey() == closeAfterCreateKey) {
+              getCache().close();
+            }
+          }
+        };
 
     region.getAttributesMutator().addCacheListener(cacheListener);
   }
@@ -676,6 +749,7 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     return totalSize;
   }
 
+  @SuppressWarnings("unchecked")
   private void waitForAsyncEventListenerWithEventsMapSize(int expectedSize) {
     await().untilAsserted(
         () -> assertThat(getSpyAsyncEventListener().getEventsMap()).hasSize(expectedSize));
@@ -745,6 +819,7 @@ public class AsyncEventListenerDistributedTest implements Serializable {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public boolean processEvents(List<AsyncEvent> events) {
       for (AsyncEvent<K, V> event : events) {
         eventsMap.put(event.getKey(), event.getDeserializedValue());
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 3d5405e..698bc7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -783,7 +783,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
         if (gsEvent.shouldBeConflated()) {
           // The event should be conflated. Create the conflation key
           // (comprised of the event's region, key and the operation).
-          ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
+          ConflationKey key = new ConflationKey(gsEvent.getRegionPath(),
               gsEvent.getKeyToConflate(), gsEvent.getOperation());
 
           // Get the entry at that key
@@ -799,7 +799,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
         } else {
           // The event should not be conflated (create or destroy). Add it to
           // the map.
-          ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
+          ConflationKey key = new ConflationKey(gsEvent.getRegionPath(),
               gsEvent.getKeyToConflate(), gsEvent.getOperation(), gsEvent.getShadowKey());
           conflatedEventsMap.put(key, gsEvent);
         }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 7b0cfb2..e6abad5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -15,7 +15,10 @@
 package org.apache.geode.internal.cache.wan.parallel;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -106,6 +109,38 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     assertThat(gsei2.getShadowKey()).isEqualTo(lastUpdateShadowKey);
   }
 
+  // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
+  // and the processor started dispatching events before the actual region was available.
+  @Test
+  public void verifyBatchConflationWithNullEventRegionDoesNowThrowException()
+      throws Exception {
+    AbstractGatewaySenderEventProcessor processor =
+        ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+    List<GatewaySenderEventImpl> events = new ArrayList<GatewaySenderEventImpl>();
+
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    when(lr.getCache()).thenReturn(this.cache);
+
+    // Create two events for the same key, so that conflation will be needed. Mock the getRegion()
+    // value to return as null so we will hit the NPE if
+    // it is referenced.
+    GatewaySenderEventImpl gsei1 =
+        spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+            "Object_13964", "Object_13964_1", 100, 27709));
+    doReturn(null).when(gsei1).getRegion();
+
+    GatewaySenderEventImpl gsei2 =
+        spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+            "Object_13964", "Object_13964_2", 101, 27822));
+    doReturn(null).when(gsei2).getRegion();
+
+    events.add(gsei1);
+    events.add(gsei2);
+    assertThatCode(() -> processor.conflate(events)).doesNotThrowAnyException();
+  }
+
   @Test
   public void validateBatchConflationWithBatchContainingDuplicateNonConflatableEvents()
       throws Exception {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index a3f2f30..df3a19d 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -14,23 +14,149 @@
  */
 package org.apache.geode.internal.cache.wan.serial;
 
+import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_TIME_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 
+import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
 import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
 
 
 public class SerialWANConflationDUnitTest extends WANTestBase {
 
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  private void createCacheWithLogFile(Integer locPort, String logFile) {
+    WANTestBase test = new WANTestBase();
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    String logLevel = System.getProperty(LOG_LEVEL, "info");
+    props.setProperty(LOG_LEVEL, logLevel);
+    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+    props.setProperty(LOG_FILE, logFile);
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  private File createDirectory(String name) {
+    assertThat(name).isNotEmpty();
+
+    File directory = new File(temporaryFolder.getRoot(), name);
+    if (!directory.exists()) {
+      try {
+        return temporaryFolder.newFolder(name);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    return directory;
+  }
+
+  private GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName,
+      boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
+      boolean isPersistent, GatewayEventFilter filter, int numDispatchers,
+      GatewaySender.OrderPolicy policy, int socketBufferSize, int batchTimeInterval) {
+    InternalGatewaySenderFactory gateway =
+        (InternalGatewaySenderFactory) cache.createGatewaySenderFactory();
+    gateway.setParallel(isParallel);
+    gateway.setMaximumQueueMemory(maxMemory);
+    gateway.setBatchSize(batchSize);
+    gateway.setBatchConflationEnabled(isConflation);
+    gateway.setDispatcherThreads(numDispatchers);
+    gateway.setOrderPolicy(policy);
+    gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
+    gateway.setSocketBufferSize(socketBufferSize);
+    gateway.setBatchTimeInterval(batchTimeInterval);
+
+    if (filter != null) {
+      eventFilter = filter;
+      gateway.addGatewayEventFilter(filter);
+    }
+
+    if (isPersistent) {
+      gateway.setPersistenceEnabled(true);
+      gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+    } else {
+      DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+      gateway.setDiskStoreName(store.getName());
+    }
+
+    return gateway;
+  }
+
+  private void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+      int batchTimeInterval) {
+    final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+    try {
+      File persistentDirectory = createDirectory(dsName + "_disk_" + VM.getCurrentVMNum());
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      File[] dirs1 = new File[] {persistentDirectory};
+      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory,
+          batchSize, isConflation, isPersistent, filter, numDispatcherThreadsForTheRun,
+          GatewaySender.DEFAULT_ORDER_POLICY,
+          GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE, batchTimeInterval);
+      gateway.create(dsName, remoteDsId);
+
+    } finally {
+      exln.remove();
+    }
+  }
+
+  private void waitForEventQueueSize(int expectedQueueSize) {
+    await().untilAsserted(() -> {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      Optional<GatewaySender> sender =
+          senders.stream().filter(s -> s.getId().equals("ln")).findFirst();
+      assertThat(sender.isPresent()).isTrue();
+      Set<RegionQueue> queues = ((AbstractGatewaySender) sender.get()).getQueues();
+      int totalEvents = queues.stream().mapToInt(RegionQueue::size).sum();
+      assertThat(totalEvents).isEqualTo(expectedQueueSize);
+    });
+  }
+
   @Test
-  public void testSerialPropagationPartitionRegionBatchConflation() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+  public void testSerialPropagationPartitionRegionBatchConflation() {
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
 
@@ -59,7 +185,7 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
     vm7.invoke(() -> pauseSender("ln"));
 
 
-    final Map keyValues = new HashMap();
+    final Map<Integer, Integer> keyValues = new HashMap<>();
 
     for (int i = 1; i <= 10; i++) {
       for (int j = 1; j <= 10; j++) {
@@ -92,9 +218,9 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+  public void testSerialPropagationPartitionRegionConflationDuringEnqueue() {
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
 
@@ -123,7 +249,7 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
     vm7.invoke(() -> pauseSender("ln"));
 
 
-    final Map keyValues = new HashMap();
+    final Map<Integer, Integer> keyValues = new HashMap<>();
 
     for (int i = 1; i <= 10; i++) {
       for (int j = 1; j <= 10; j++) {
@@ -134,8 +260,8 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
 
     ArrayList<Integer> v4List =
         (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
-    assertTrue("After conflation during enqueue, there should be only 20 events",
-        v4List.get(0) == 20);
+    assertEquals("After conflation during enqueue, there should be only 20 events", 20,
+        (int) v4List.get(0));
 
     vm4.invoke(() -> resumeSender("ln"));
     vm5.invoke(() -> resumeSender("ln"));
@@ -150,13 +276,75 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
     ArrayList<Integer> v7List =
         (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
 
-    assertTrue("No events in secondary queue stats since it's serial sender",
-        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0);
-    assertTrue("Total queued events should be 100",
-        (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100);
+    assertEquals("No events in secondary queue stats since it's serial sender", 0,
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)));
+    assertEquals("Total queued events should be 100", 100,
+        (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)));
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
 
   }
 
+  @Test
+  // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
+  // and the processor started dispatching events before the actual region was available.
+  public void persistentSerialGatewayWithConflationShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue()
+      throws IOException {
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> createReplicatedRegion(getTestMethodName(), null, Scope.DISTRIBUTED_ACK,
+        DataPolicy.PERSISTENT_REPLICATE, isOffHeap()));
+    createReceiverInVMs(vm2);
+
+    // Create Region, associate gateway and insert some entries.
+    vm4.invoke(() -> {
+      createCache(lnPort);
+      createReplicatedRegion(getTestMethodName(), "ln", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE,
+          isOffHeap());
+
+      // Large batch time interval and low batch size so no events are processed before the restart.
+      createSender("ln", 2, false, 100, 10, true, true, null, 120000);
+
+      Region<Integer, Integer> region = cache.getRegion(getTestMethodName());
+      for (int i = 0; i < 5; i++) {
+        region.put(i, i);
+      }
+      waitForEventQueueSize(5);
+    });
+    vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
+
+    // Custom Log File to manually search for exceptions.
+    File customLogFile = temporaryFolder.newFile("memberLog.log");
+
+    vm4.invoke(() -> {
+      // Restart the cache.
+      cache.close();
+      createCacheWithLogFile(lnPort, customLogFile.getAbsolutePath());
+
+      // Recover the queue from disk, reduce batch thresholds so processing starts right away.
+      createSender("ln", 2, false, 100, 5, true, true, null, DEFAULT_BATCH_TIME_INTERVAL);
+      waitForSenderToBecomePrimary("ln");
+
+      // Wait for the processors to start.
+      await().until(() -> {
+        Set<Thread> threads = Thread.getAllStackTraces().keySet();
+        return threads
+            .stream()
+            .filter(t -> t.getName().contains("Processor for GatewaySender_ln"))
+            .allMatch(Thread::isAlive);
+      });
+
+      // Create the region, processing will continue and no NPE should be thrown anymore.
+      createReplicatedRegion(getTestMethodName(), "ln", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE,
+          isOffHeap());
+    });
+    vm2.invoke(() -> validateRegionSize(getTestMethodName(), 5));
+
+    Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line)
+        .as("Dispatchers shouldn't have thrown any errors while processing batches")
+        .doesNotContain("An Exception occurred. The dispatcher will continue.")
+        .doesNotContain("java.lang.NullPointerException"));
+  }
 }