You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2017/09/25 18:00:16 UTC

[geode] branch develop updated: GEODE-3678: Added support to synchronize parallel gateway sender queues

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new af4012f  GEODE-3678: Added support to synchronize parallel gateway sender queues
af4012f is described below

commit af4012f5f934c7e8f21025c528d8ba18e3aad794
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Thu Sep 21 18:25:30 2017 -0700

    GEODE-3678: Added support to synchronize parallel gateway sender queues
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |   8 +-
 .../org/apache/geode/internal/DSFIDFactory.java    |   5 +
 .../geode/internal/DataSerializableFixedID.java    |   2 +
 .../geode/internal/cache/GemFireCacheImpl.java     |  29 ++
 .../internal/cache/InitialImageOperation.java      |  20 ++
 .../apache/geode/internal/cache/InternalCache.java |   4 +
 .../cache/RegionEntrySynchronizationListener.java  |  25 ++
 .../internal/cache/wan/AbstractGatewaySender.java  |  38 +++
 .../wan/AbstractGatewaySenderEventProcessor.java   |   2 +
 .../internal/cache/wan/GatewaySenderEventImpl.java |   2 +-
 ...waySenderQueueEntrySynchronizationListener.java |  35 +++
 ...aySenderQueueEntrySynchronizationOperation.java | 314 +++++++++++++++++++++
 .../internal/cache/wan/GatewaySenderStats.java     |  29 +-
 ...currentParallelGatewaySenderEventProcessor.java |   8 +
 .../ParallelGatewaySenderEventProcessor.java       |  22 +-
 ...oncurrentSerialGatewaySenderEventProcessor.java |   7 +
 .../serial/SerialGatewaySenderEventProcessor.java  |   6 +
 .../internal/cache/xmlcache/CacheCreation.java     |   8 +
 .../cache/xmlcache/ClientCacheCreation.java        |  10 +
 .../xmlcache/ParallelGatewaySenderCreation.java    |   9 +
 .../xmlcache/SerialGatewaySenderCreation.java      |   9 +
 .../geode/internal/i18n/LocalizedStrings.java      |  14 +
 ...yncEventQueueEntrySynchronizationDUnitTest.java | 183 ++++++++++++
 .../cache/wan/AsyncEventQueueTestBase.java         |  48 ++--
 .../cache/wan/WaitingAsyncEventListener.java       |  63 +++++
 .../codeAnalysis/sanctionedDataSerializables.txt   |   8 +
 .../geode/internal/cache/wan/WANTestBase.java      |   2 +-
 27 files changed, 875 insertions(+), 35 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 4c5f937..4eb9d0d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -94,7 +94,11 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createIntGauge(LOAD_BALANCES_IN_PROGRESS, "Number of load balances in progress",
                 "operations"),
             f.createLongCounter(LOAD_BALANCE_TIME, "Total time spent load balancing this sender",
-                "nanoseconds"),});
+                "nanoseconds"),
+            f.createIntCounter(SYNCHRONIZATION_EVENTS_ENQUEUED,
+                "Number of synchronization events added to the event queue.", "operations"),
+            f.createIntCounter(SYNCHRONIZATION_EVENTS_PROVIDED,
+                "Number of synchronization events provided to other members.", "operations"),});
 
     // Initialize id fields
     eventsReceivedId = type.nameToId(EVENTS_RECEIVED);
@@ -123,6 +127,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
     loadBalancesInProgressId = type.nameToId(LOAD_BALANCES_IN_PROGRESS);
     loadBalanceTimeId = type.nameToId(LOAD_BALANCE_TIME);
+    synchronizationEventsEnqueuedId = type.nameToId(SYNCHRONIZATION_EVENTS_ENQUEUED);
+    synchronizationEventsProvidedId = type.nameToId(SYNCHRONIZATION_EVENTS_PROVIDED);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 5b0d86b..31887ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -397,6 +397,7 @@ import org.apache.geode.internal.cache.versions.VMVersionTag;
 import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationOperation;
 import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage;
 import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.BatchRemovalReplyMessage;
 import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage;
@@ -926,6 +927,10 @@ public class DSFIDFactory implements DataSerializableFixedID {
     registerDSFID(PR_DESTROY_ON_DATA_STORE_MESSAGE, DestroyRegionOnDataStoreMessage.class);
     registerDSFID(SHUTDOWN_ALL_GATEWAYHUBS_REQUEST, ShutdownAllGatewayHubsRequest.class);
     registerDSFID(BUCKET_COUNT_LOAD_PROBE, BucketCountLoadProbe.class);
+    registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE,
+        GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationMessage.class);
+    registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
+        GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index 18f382b..b4fbac7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -812,6 +812,8 @@ public interface DataSerializableFixedID extends SerializationVersions {
   public static final short DESTROY_LUCENE_INDEX_MESSAGE = 2178;
   public static final short LUCENE_PAGE_RESULTS = 2179;
   public static final short LUCENE_RESULT_STRUCT = 2180;
+  public static final short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181;
+  public static final short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
 
   // NOTE, codes > 65535 will take 4 bytes to serialize
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index abc62f9..c5b5d01 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -80,6 +80,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
+import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationListener;
 import org.apache.geode.internal.security.SecurityServiceFactory;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
@@ -589,6 +590,9 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
 
   private final SecurityService securityService;
 
+  private final Set<RegionEntrySynchronizationListener> synchronizationListeners =
+      new ConcurrentHashSet<>();
+
   static {
     // this works around jdk bug 6427854, reported in ticket #44434
     String propertyName = "sun.nio.ch.bugLevel";
@@ -943,6 +947,8 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
       SystemFailure.signalCacheCreate();
 
       this.diskMonitor = new DiskStoreMonitor();
+
+      addRegionEntrySynchronizationListener(new GatewaySenderQueueEntrySynchronizationListener());
     } // synchronized
   }
 
@@ -5232,4 +5238,27 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
   public CqService getCqService() {
     return this.cqService;
   }
+
+  public void addRegionEntrySynchronizationListener(RegionEntrySynchronizationListener listener) {
+    this.synchronizationListeners.add(listener);
+  }
+
+  public void removeRegionEntrySynchronizationListener(
+      RegionEntrySynchronizationListener listener) {
+    this.synchronizationListeners.remove(listener);
+  }
+
+  public void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+      InternalDistributedMember sender, LocalRegion region,
+      List<InitialImageOperation.Entry> entriesToSynchronize) {
+    for (RegionEntrySynchronizationListener listener : this.synchronizationListeners) {
+      try {
+        listener.afterSynchronization(sender, region, entriesToSynchronize);
+      } catch (Throwable t) {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.GemFireCacheImpl_CAUGHT_EXCEPTION_SYNCHRONIZING_EVENTS,
+            new Object[] {sender, region.getFullPath(), entriesToSynchronize}), t);
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index b48fdc5..38a76de 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -784,6 +784,10 @@ public class InitialImageOperation {
         keys = new HashSet();
       }
       final ByteArrayDataInput in = new ByteArrayDataInput();
+      List<Entry> entriesToSynchronize = null;
+      if (this.isSynchronizing) {
+        entriesToSynchronize = new ArrayList<>();
+      }
       for (int i = 0; i < entryCount; i++) {
         // stream is null-terminated
         if (internalDuringApplyDelta != null && !internalDuringApplyDelta.isRunning
@@ -884,6 +888,9 @@ public class InitialImageOperation {
                   if (record) {
                     this.entries.initialImagePut(entry.key, lastModified, tmpValue, wasRecovered,
                         true, tag, sender, this.isSynchronizing);
+                    if (this.isSynchronizing) {
+                      entriesToSynchronize.add(entry);
+                    }
                   }
                 } catch (RegionDestroyedException e) {
                   return false;
@@ -927,6 +934,9 @@ public class InitialImageOperation {
             }
             this.entries.initialImagePut(entry.key, lastModified, tmpValue, wasRecovered, false,
                 tag, sender, this.isSynchronizing);
+            if (this.isSynchronizing) {
+              entriesToSynchronize.add(entry);
+            }
           } catch (RegionDestroyedException e) {
             return false;
           } catch (CancelException e) {
@@ -935,6 +945,12 @@ public class InitialImageOperation {
           didIIP = true;
         }
       }
+      if (this.isSynchronizing && !entriesToSynchronize.isEmpty()) {
+        LocalRegion owner = ((AbstractRegionMap) this.entries)._getOwner();
+        LocalRegion region = owner instanceof BucketRegion ? owner.getPartitionedRegion() : owner;
+        owner.getCache().invokeRegionEntrySynchronizationListenersAfterSynchronization(sender,
+            region, entriesToSynchronize);
+      }
       if (keys != null) {
         if (isDebugEnabled) {
           logger.debug("processed these initial image keys: {}", keys);
@@ -2929,6 +2945,10 @@ public class InitialImageOperation {
       this.entryBits = EntryBits.setTombstone(this.entryBits, true);
     }
 
+    public Object getKey() {
+      return key;
+    }
+
     public VersionTag getVersionTag() {
       return versionTag;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 544406f..66bc44a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -316,4 +316,8 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime {
   boolean hasPersistentRegion();
 
   void shutDownAll();
+
+  void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+      InternalDistributedMember sender, LocalRegion region,
+      List<InitialImageOperation.Entry> entriesToSynchronize);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntrySynchronizationListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntrySynchronizationListener.java
new file mode 100644
index 0000000..aad20c4
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntrySynchronizationListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.List;
+
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public interface RegionEntrySynchronizationListener {
+
+  void afterSynchronization(InternalDistributedMember sender, LocalRegion region,
+      List<InitialImageOperation.Entry> entriesToSynchronize);
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 9b3b61f..fbf8c71 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1350,4 +1350,42 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
       this.event.release();
     }
   }
+
+  protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
+    GatewayQueueEvent event = null;
+    for (RegionQueue queue : getQueues()) {
+      Region region = queue.getRegion();
+      for (Iterator i = region.values().iterator(); i.hasNext();) {
+        GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) i.next();
+        if (gsei.getKey().equals(key) && gsei.getVersionTimeStamp() == timestamp) {
+          event = gsei;
+          logger.info(LocalizedMessage.create(
+              LocalizedStrings.AbstractGatewaySender_PROVIDING_SYNCHRONIZATION_EVENT,
+              new Object[] {this, key, timestamp, event}));
+          this.statistics.incSynchronizationEventsProvided();
+          break;
+        }
+      }
+    }
+    return event;
+  }
+
+  protected void putSynchronizationEvent(GatewayQueueEvent event) {
+    if (this.eventProcessor != null) {
+      this.lifeCycleLock.readLock().lock();
+      try {
+        logger.info(LocalizedMessage.create(
+            LocalizedStrings.AbstractGatewaySender_ENQUEUEING_SYNCHRONIZATION_EVENT,
+            new Object[] {this, event}));
+        this.eventProcessor.enqueueEvent(event);
+        this.statistics.incSynchronizationEventsEnqueued();
+      } catch (Throwable t) {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.AbstractGatewaySender_CAUGHT_EXCEPTION_ENQUEUEING_SYNCHRONIZATION_EVENT,
+            new Object[] {this, event}), t);
+      } finally {
+        this.lifeCycleLock.readLock().unlock();
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0c93755..52df2c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1291,6 +1291,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     ((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserRR(userRegion);
   }
 
+  protected abstract void enqueueEvent(GatewayQueueEvent event);
+
   protected class SenderStopperCallable implements Callable<Boolean> {
     private final AbstractGatewaySenderEventProcessor p;
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index cacd326..ec89ef4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -781,7 +781,7 @@ public class GatewaySenderEventImpl
         .append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
         .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
         .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
-        .append("]");
+        .append(";bucketId=").append(this.bucketId).append("]");
     return buffer.toString();
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationListener.java
new file mode 100644
index 0000000..2c898d2
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import java.util.List;
+
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntrySynchronizationListener;
+
+public class GatewaySenderQueueEntrySynchronizationListener
+    implements RegionEntrySynchronizationListener {
+
+  @Override
+  public void afterSynchronization(InternalDistributedMember sender, LocalRegion region,
+      List<InitialImageOperation.Entry> entriesToSynchronize) {
+    if (region.getAllGatewaySenderIds().size() > 0) {
+      new GatewaySenderQueueEntrySynchronizationOperation(sender, region, entriesToSynchronize)
+          .synchronizeEntries();
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
new file mode 100644
index 0000000..1c398a9
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+
+public class GatewaySenderQueueEntrySynchronizationOperation {
+
+  private InternalDistributedMember recipient;
+
+  private LocalRegion region;
+
+  private List<GatewaySenderQueueEntrySynchronizationEntry> entriesToSynchronize;
+
+  private static final Logger logger = LogService.getLogger();
+
+  protected GatewaySenderQueueEntrySynchronizationOperation(InternalDistributedMember recipient,
+      LocalRegion region, List<InitialImageOperation.Entry> giiEntriesToSynchronize) {
+    this.recipient = recipient;
+    this.region = region;
+    initializeEntriesToSynchronize(giiEntriesToSynchronize);
+  }
+
+  protected void synchronizeEntries() {
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "{}: Requesting synchronization from member={}; regionPath={}; entriesToSynchronize={}",
+          getClass().getSimpleName(), this.recipient, this.region.getFullPath(),
+          this.entriesToSynchronize);
+    }
+    // Create and send message
+    DM dm = this.region.getDistributionManager();
+    GatewaySenderQueueEntrySynchronizationReplyProcessor processor =
+        new GatewaySenderQueueEntrySynchronizationReplyProcessor(dm, this.recipient, this);
+    GatewaySenderQueueEntrySynchronizationMessage message =
+        new GatewaySenderQueueEntrySynchronizationMessage(this.recipient,
+            processor.getProcessorId(), this);
+    dm.putOutgoing(message);
+
+    // Wait for replies
+    try {
+      processor.waitForReplies();
+    } catch (ReplyException e) {
+      e.handleAsUnexpected();
+    } catch (InterruptedException e) {
+      dm.getCancelCriterion().checkCancelInProgress(e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  protected GemFireCacheImpl getCache() {
+    return (GemFireCacheImpl) CacheFactory.getAnyInstance();
+  }
+
+  private void initializeEntriesToSynchronize(
+      List<InitialImageOperation.Entry> giiEntriesToSynchronize) {
+    this.entriesToSynchronize = new ArrayList<>();
+    for (InitialImageOperation.Entry entry : giiEntriesToSynchronize) {
+      this.entriesToSynchronize.add(
+          new GatewaySenderQueueEntrySynchronizationEntry(entry.getKey(), entry.getVersionTag()));
+    }
+  }
+
+  public static class GatewaySenderQueueEntrySynchronizationReplyProcessor
+      extends ReplyProcessor21 {
+
+    private GatewaySenderQueueEntrySynchronizationOperation operation;
+
+    public GatewaySenderQueueEntrySynchronizationReplyProcessor(DM dm,
+        InternalDistributedMember recipient,
+        GatewaySenderQueueEntrySynchronizationOperation operation) {
+      super(dm, recipient);
+      this.operation = operation;
+    }
+
+    @Override
+    public void process(DistributionMessage msg) {
+      try {
+        if (msg instanceof ReplyMessage) {
+          ReplyMessage reply = (ReplyMessage) msg;
+          if (reply.getException() == null) {
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  "{}: Processing reply from member={}; regionPath={}; key={}; entriesToSynchronize={}",
+                  getClass().getSimpleName(), reply.getSender(),
+                  this.operation.region.getFullPath(), this.operation.entriesToSynchronize,
+                  reply.getReturnValue());
+            }
+            List<Map<String, GatewayQueueEvent>> events =
+                (List<Map<String, GatewayQueueEvent>>) reply.getReturnValue();
+            for (int i = 0; i < events.size(); i++) {
+              Map<String, GatewayQueueEvent> eventsForOneEntry = events.get(i);
+              if (events.isEmpty()) {
+                GatewaySenderQueueEntrySynchronizationEntry entry =
+                    this.operation.entriesToSynchronize.get(i);
+                logger.info(LocalizedMessage.create(
+                    LocalizedStrings.GatewaySenderQueueEntrySynchronizationReplyProcessor_REPLY_IS_EMPTY,
+                    new Object[] {reply.getSender(), this.operation.region.getFullPath(), entry.key,
+                        entry.entryVersion}));
+              } else {
+                putSynchronizationEvents(eventsForOneEntry);
+              }
+            }
+          }
+        }
+      } finally {
+        super.process(msg);
+      }
+    }
+
+    private void putSynchronizationEvents(Map<String, GatewayQueueEvent> senderIdsAndEvents) {
+      for (Map.Entry<String, GatewayQueueEvent> senderIdAndEvent : senderIdsAndEvents.entrySet()) {
+        AbstractGatewaySender sender =
+            (AbstractGatewaySender) getCache().getGatewaySender(senderIdAndEvent.getKey());
+        sender.putSynchronizationEvent(senderIdAndEvent.getValue());
+      }
+    }
+
+    private Cache getCache() {
+      return CacheFactory.getAnyInstance();
+    }
+  }
+
+  public static class GatewaySenderQueueEntrySynchronizationMessage
+      extends PooledDistributionMessage implements MessageWithReply {
+
+    private int processorId;
+
+    private String regionPath;
+
+    private List<GatewaySenderQueueEntrySynchronizationEntry> entriesToSynchronize;
+
+    /* For serialization */
+    public GatewaySenderQueueEntrySynchronizationMessage() {}
+
+    protected GatewaySenderQueueEntrySynchronizationMessage(InternalDistributedMember recipient,
+        int processorId, GatewaySenderQueueEntrySynchronizationOperation operation) {
+      super();
+      setRecipient(recipient);
+      this.processorId = processorId;
+      this.regionPath = operation.region.getFullPath();
+      this.entriesToSynchronize = operation.entriesToSynchronize;
+    }
+
+    @Override
+    protected void process(DistributionManager dm) {
+      Object result = null;
+      ReplyException replyException = null;
+      try {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}",
+              getClass().getSimpleName(), this.regionPath, this.entriesToSynchronize);
+        }
+        result = getSynchronizationEvents();
+      } catch (Throwable t) {
+        replyException = new ReplyException(t);
+      } finally {
+        ReplyMessage replyMsg = new ReplyMessage();
+        replyMsg.setRecipient(getSender());
+        replyMsg.setProcessorId(this.processorId);
+        if (replyException == null) {
+          replyMsg.setReturnValue(result);
+        } else {
+          replyMsg.setException(replyException);
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}: Sending synchronization reply returnValue={}; exception={}",
+              getClass().getSimpleName(), replyMsg.getReturnValue(), replyMsg.getException());
+        }
+        dm.putOutgoing(replyMsg);
+      }
+    }
+
+    private Object getSynchronizationEvents() {
+      List<Map<String, GatewayQueueEvent>> results = new ArrayList<>();
+      // Get the region
+      GemFireCacheImpl gfci = (GemFireCacheImpl) getCache();
+      LocalRegion region = (LocalRegion) gfci.getRegion(this.regionPath);
+
+      // Add the appropriate GatewaySenderEventImpl from each GatewaySender for each entry
+      Set<String> allGatewaySenderIds = region.getAllGatewaySenderIds();
+      for (GatewaySender sender : gfci.getAllGatewaySenders()) {
+        if (allGatewaySenderIds.contains(sender.getId())) {
+          for (GatewaySenderQueueEntrySynchronizationEntry entry : this.entriesToSynchronize) {
+            Map<String, GatewayQueueEvent> resultForOneEntry = new HashMap<>();
+            GatewayQueueEvent event = ((AbstractGatewaySender) sender)
+                .getSynchronizationEvent(entry.key, entry.entryVersion.getVersionTimeStamp());
+            if (event != null) {
+              resultForOneEntry.put(sender.getId(), event);
+            }
+            results.add(resultForOneEntry);
+          }
+        }
+      }
+
+      return results;
+    }
+
+    private Cache getCache() {
+      return CacheFactory.getAnyInstance();
+    }
+
+    @Override
+    public int getDSFID() {
+      return GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      out.writeInt(this.processorId);
+      DataSerializer.writeString(this.regionPath, out);
+      DataSerializer.writeArrayList((ArrayList) this.entriesToSynchronize, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      super.fromData(in);
+      this.processorId = in.readInt();
+      this.regionPath = DataSerializer.readString(in);
+      this.entriesToSynchronize = DataSerializer.readArrayList(in);
+    }
+  }
+
+  public static class GatewaySenderQueueEntrySynchronizationEntry
+      implements DataSerializableFixedID {
+
+    private Object key;
+
+    private VersionTag entryVersion;
+
+    /* For serialization */
+    public GatewaySenderQueueEntrySynchronizationEntry() {}
+
+    public GatewaySenderQueueEntrySynchronizationEntry(Object key, VersionTag entryVersion) {
+      this.key = key;
+      this.entryVersion = entryVersion;
+    }
+
+    @Override
+    public int getDSFID() {
+      return GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY;
+    }
+
+    @Override
+    public Version[] getSerializationVersions() {
+      return null;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeObject(this.key, out);
+      DataSerializer.writeObject(this.entryVersion, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      this.key = DataSerializer.readObject(in);
+      this.entryVersion = DataSerializer.readObject(in);
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append(getClass().getSimpleName()).append("[").append("key=")
+          .append(this.key).append("; entryVersion=").append(this.entryVersion).append("]")
+          .toString();
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 83720db..228e7a9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -89,6 +89,9 @@ public class GatewaySenderStats {
   protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress";
   protected static final String LOAD_BALANCE_TIME = "loadBalanceTime";
 
+  protected static final String SYNCHRONIZATION_EVENTS_ENQUEUED = "synchronizationEventsEnqueued";
+  protected static final String SYNCHRONIZATION_EVENTS_PROVIDED = "synchronizationEventsProvided";
+
   /** Id of the events queued statistic */
   protected static int eventsReceivedId;
   /** Id of the events queued statistic */
@@ -140,6 +143,10 @@ public class GatewaySenderStats {
   protected static int loadBalancesInProgressId;
   /** Id of load balance time */
   protected static int loadBalanceTimeId;
+  /** Id of synchronization events enqueued */
+  protected static int synchronizationEventsEnqueuedId;
+  /** Id of synchronization events provided */
+  protected static int synchronizationEventsProvidedId;
 
   /**
    * Static initializer to create and initialize the <code>StatisticsType</code>
@@ -213,7 +220,11 @@ public class GatewaySenderStats {
             f.createIntGauge(LOAD_BALANCES_IN_PROGRESS, "Number of load balances in progress",
                 "operations"),
             f.createLongCounter(LOAD_BALANCE_TIME, "Total time spent load balancing this sender",
-                "nanoseconds"),});
+                "nanoseconds"),
+            f.createIntCounter(SYNCHRONIZATION_EVENTS_ENQUEUED,
+                "Number of synchronization events added to the event queue.", "operations"),
+            f.createIntCounter(SYNCHRONIZATION_EVENTS_PROVIDED,
+                "Number of synchronization events provided to other members.", "operations"),});
 
     // Initialize id fields
     eventsReceivedId = type.nameToId(EVENTS_RECEIVED);
@@ -243,6 +254,8 @@ public class GatewaySenderStats {
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
     loadBalancesInProgressId = type.nameToId(LOAD_BALANCES_IN_PROGRESS);
     loadBalanceTimeId = type.nameToId(LOAD_BALANCE_TIME);
+    synchronizationEventsEnqueuedId = type.nameToId(SYNCHRONIZATION_EVENTS_ENQUEUED);
+    synchronizationEventsProvidedId = type.nameToId(SYNCHRONIZATION_EVENTS_PROVIDED);
   }
 
   ////////////////////// Instance Fields //////////////////////
@@ -741,6 +754,20 @@ public class GatewaySenderStats {
     stats.incLong(loadBalanceTimeId, delta);
   }
 
+  /**
+   * Increments the number of synchronization events enqueued.
+   */
+  public void incSynchronizationEventsEnqueued() {
+    this.stats.incInt(synchronizationEventsEnqueuedId, 1);
+  }
+
+  /**
+   * Increments the number of synchronization events provided.
+   */
+  public void incSynchronizationEventsProvided() {
+    this.stats.incInt(synchronizationEventsProvidedId, 1);
+  }
+
   public Statistics getStats() {
     return stats;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index ed6df0b..70a2cbb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -34,6 +34,7 @@ import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.LocalRegion;
@@ -41,6 +42,7 @@ import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
@@ -325,4 +327,10 @@ public class ConcurrentParallelGatewaySenderEventProcessor
   public void initializeEventDispatcher() {
     // no op for AsyncEventProcessor
   }
+
+  @Override
+  protected void enqueueEvent(GatewayQueueEvent event) {
+    int pId = ((GatewaySenderEventImpl) event).getBucketId() % this.nDispatcher;
+    this.processors[pId].enqueueEvent(event);
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index e74270f..ff8a231 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
@@ -112,15 +113,20 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
     }
 
     // TODO: Looks like for PDX region bucket id is set to -1.
-    boolean queuedEvent = false;
-    try {
-      EventID eventID = ((EntryEventImpl) event).getEventId();
+    EventID eventID = ((EntryEventImpl) event).getEventId();
 
-      // while merging 42004, kept substituteValue as it is(it is barry's
-      // change 42466). bucketID is merged with eventID.getBucketID
-      gatewayQueueEvent = new GatewaySenderEventImpl(operation, event, substituteValue, true,
-          eventID.getBucketID());
+    // while merging 42004, kept substituteValue as it is(it is barry's
+    // change 42466). bucketID is merged with eventID.getBucketID
+    gatewayQueueEvent =
+        new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID());
+
+    enqueueEvent(gatewayQueueEvent);
+  }
 
+  @Override
+  protected void enqueueEvent(GatewayQueueEvent gatewayQueueEvent) {
+    boolean queuedEvent = false;
+    try {
       if (getSender().beforeEnqueue(gatewayQueueEvent)) {
         long start = getSender().getStatistics().startTime();
         try {
@@ -138,7 +144,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
     } finally {
       if (!queuedEvent) {
         // it was not queued for some reason
-        gatewayQueueEvent.release();
+        ((GatewaySenderEventImpl) gatewayQueueEvent).release();
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 713713f..2f05d56 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -37,6 +37,7 @@ import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
@@ -386,4 +387,10 @@ public class ConcurrentSerialGatewaySenderEventProcessor
 
   }
 
+  @Override
+  protected void enqueueEvent(GatewayQueueEvent event) {
+    for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
+      serialProcessor.enqueueEvent(event);
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 150b5ac..2f5da89 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -37,6 +37,7 @@ import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.Assert;
@@ -859,4 +860,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
 
   }
 
+  @Override
+  protected void enqueueEvent(GatewayQueueEvent event) {
+    // @TODO This API hasn't been implemented yet
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 833fcaa..9c80ce1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -88,6 +88,7 @@ import org.apache.geode.internal.cache.DiskStoreFactoryImpl;
 import org.apache.geode.internal.cache.DiskStoreImpl;
 import org.apache.geode.internal.cache.DiskStoreMonitor;
 import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
 import org.apache.geode.internal.cache.ExpirationScheduler;
 import org.apache.geode.internal.cache.FilterProfile;
@@ -2214,4 +2215,11 @@ public class CacheCreation implements InternalCache {
   public void shutDownAll() {
     throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
   }
+
+  @Override
+  public void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+      InternalDistributedMember sender, LocalRegion region,
+      List<InitialImageOperation.Entry> entriesToSynchronize) {
+    throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java
index 667ae0c..fec0e7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.xmlcache;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -43,8 +44,11 @@ import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PoolFactoryImpl;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
@@ -283,4 +287,10 @@ public class ClientCacheCreation extends CacheCreation implements ClientCache {
     return Collections.emptySet();
   }
 
+  @Override
+  public void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+      InternalDistributedMember sender, LocalRegion region,
+      List<InitialImageOperation.Entry> entriesToSynchronize) {
+    throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 3cd60c7..791c3f5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.xmlcache;
 import java.util.List;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionAdvisee;
@@ -90,4 +91,12 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender impleme
 
   @Override
   protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+  protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
+    throw new UnsupportedOperationException();
+  }
+
+  protected void putSynchronizationEvent(GatewayQueueEvent event) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index 9aff712..9fcf06a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.xmlcache;
 import java.util.List;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionAdvisee;
@@ -87,4 +88,12 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement
 
   @Override
   protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+  protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
+    throw new UnsupportedOperationException();
+  }
+
+  protected void putSynchronizationEvent(GatewayQueueEvent event) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 3a32db8..fd77c3e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -7697,6 +7697,20 @@ public class LocalizedStrings {
   public static final StringId LuceneServiceImpl_REGION_0_CANNOT_BE_DESTROYED = new StringId(6660,
       "Region {0} cannot be destroyed because it defines Lucene index(es) [{1}]. Destroy all Lucene indexes before destroying the region.");
 
+  public static final StringId AbstractGatewaySender_CAUGHT_EXCEPTION_ENQUEUEING_SYNCHRONIZATION_EVENT =
+      new StringId(6661,
+          "{0}: Caught the following exception attempting to enqueue synchronization event={1}:");
+  public static final StringId GemFireCacheImpl_CAUGHT_EXCEPTION_SYNCHRONIZING_EVENTS =
+      new StringId(6662,
+          "Caught the following exception attempting to synchronize events from member={0}; regionPath={1}; entriesToSynchronize={2}:");
+  public static final StringId GatewaySenderQueueEntrySynchronizationReplyProcessor_REPLY_IS_EMPTY =
+      new StringId(6663,
+          "Synchronization event reply from member={0}; regionPath={1}; key={2}; entryVersion={3} is empty");
+  public static final StringId AbstractGatewaySender_PROVIDING_SYNCHRONIZATION_EVENT =
+      new StringId(6664, "{0}: Providing synchronization event for key={1}; timestamp={2}: {3}");
+  public static final StringId AbstractGatewaySender_ENQUEUEING_SYNCHRONIZATION_EVENT =
+      new StringId(6665, "{0}: Enqueueing synchronization event: {1}");
+
   /** Testing strings, messageId 90000-99999 **/
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueEntrySynchronizationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueEntrySynchronizationDUnitTest.java
new file mode 100644
index 0000000..ae14c5f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueEntrySynchronizationDUnitTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static junitparams.JUnitParamsRunner.$;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.VMCachedDeserializable;
+import org.apache.geode.internal.cache.versions.VMVersionTag;
+import org.apache.geode.internal.cache.versions.VersionSource;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@RunWith(JUnitParamsRunner.class)
+public class AsyncEventQueueEntrySynchronizationDUnitTest extends AsyncEventQueueTestBase {
+
+  private static Object[] getRegionShortcuts() {
+    return $(new Object[] {RegionShortcut.PARTITION_REDUNDANT, false, true},
+        new Object[] {RegionShortcut.PARTITION_REDUNDANT_PERSISTENT, true, true});
+  }
+
+  @Test
+  @Parameters(method = "getRegionShortcuts")
+  public void testParallelAsyncEventQueueSynchronization(RegionShortcut regionShortcut,
+      boolean isPersistentAeq, boolean isParallelAeq) throws Exception {
+    // Start locator
+    Integer locatorPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+
+    // Start three members
+    vm1.invoke(() -> createCache(locatorPort));
+    vm2.invoke(() -> createCache(locatorPort));
+    vm3.invoke(() -> createCache(locatorPort));
+
+    // Create parallel AsyncEventQueue
+    String aeqId = "db";
+    vm1.invoke(() -> createAsyncEventQueue(aeqId, isParallelAeq, 100, 1, false, isPersistentAeq,
+        null, true, new WaitingAsyncEventListener()));
+    vm2.invoke(() -> createAsyncEventQueue(aeqId, isParallelAeq, 100, 1, false, isPersistentAeq,
+        null, true, new WaitingAsyncEventListener()));
+    vm3.invoke(() -> createAsyncEventQueue(aeqId, isParallelAeq, 100, 1, false, isPersistentAeq,
+        null, true, new WaitingAsyncEventListener()));
+
+    // Create PartitionedRegion with redundant-copies=2
+    String regionName = getTestMethodName() + "_PR";
+    vm1.invoke(() -> createPartitionedRegion(regionName, regionShortcut, aeqId, 2));
+    vm2.invoke(() -> createPartitionedRegion(regionName, regionShortcut, aeqId, 2));
+    vm3.invoke(() -> createPartitionedRegion(regionName, regionShortcut, aeqId, 2));
+
+    // Create primary bucket in member 1, secondary buckets in members 2 and 3
+    Object key = "0";
+    vm1.invoke(() -> createBucket(regionName, key));
+    vm2.invoke(() -> createBucket(regionName, key));
+    vm3.invoke(() -> createBucket(regionName, key));
+    vm1.invoke(() -> assertPrimaryBucket(regionName, key));
+
+    // Fake a replication event in member 2
+    InternalDistributedMember idmMember1 = vm1.invoke(() -> getMember());
+    VersionSource vsMember1 = vm1.invoke(() -> getVersionMember(regionName));
+    vm2.invoke(() -> doFakeUpdate(idmMember1, vsMember1, regionName, key));
+
+    // Verify only member 2's queue contains an event
+    vm1.invoke(() -> checkAsyncEventQueueSize(aeqId, 0, true));
+    vm2.invoke(() -> checkAsyncEventQueueSize(aeqId, 1, true));
+    vm3.invoke(() -> checkAsyncEventQueueSize(aeqId, 0, true));
+
+    // Crash member 1
+    DistributedTestUtils.crashDistributedSystem(vm1);
+
+    // Wait for member 3's queue to contain 1 event (which means it has been synchronized)
+    vm3.invoke(() -> waitForAsyncEventQueueSize(aeqId, 1, true));
+
+    // Start processing events (mainly to avoid an InterruptedException suspect string)
+    vm2.invoke(() -> startProcessingAsyncEvents(aeqId));
+    vm3.invoke(() -> startProcessingAsyncEvents(aeqId));
+  }
+
+  private void createPartitionedRegion(String regionName, RegionShortcut regionShortcut,
+      String aeqId, int redundantCopies) {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setRedundantCopies(redundantCopies);
+
+    cache.createRegionFactory(regionShortcut).addAsyncEventQueueId(aeqId)
+        .setPartitionAttributes(paf.create()).create(regionName);
+  }
+
+  private void createBucket(String regionName, Object key) throws Exception {
+    PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+    int bucketId = PartitionedRegionHelper.getHashKey(pr, null, key, null, null);
+    pr.getRedundancyProvider().createBackupBucketOnMember(bucketId, getMember(), false, false, null,
+        true);
+  }
+
+  private void assertPrimaryBucket(String regionName, Object key) throws Exception {
+    PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+    int bucketId = PartitionedRegionHelper.getHashKey(pr, null, key, null, null);
+    assertThat(pr.getRegionAdvisor().isPrimaryForBucket(bucketId)).isTrue();
+  }
+
+  private InternalDistributedMember getMember() {
+    return (InternalDistributedMember) cache.getDistributedSystem().getDistributedMember();
+  }
+
+  private VersionSource getVersionMember(String regionName) {
+    PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+    return pr.getVersionMember();
+  }
+
+  private void doFakeUpdate(InternalDistributedMember fromMember, VersionSource versionSource,
+      String regionName, Object key) {
+    // Get the BucketRegion for the regionName and key
+    PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+    BucketRegion br = pr.getBucketRegion(key);
+
+    // Create VersionTag
+    long timestamp = System.currentTimeMillis();
+    VersionTag tag = new VMVersionTag();
+    tag.setMemberID(versionSource);
+    tag.setRegionVersion(1);
+    tag.setEntryVersion(1);
+    tag.setVersionTimeStamp(timestamp);
+    tag.setIsRemoteForTesting();
+
+    EntryEventImpl event =
+        EntryEventImpl.create(br, Operation.CREATE, key, true, fromMember, true, false);
+    event.setNewValue(new VMCachedDeserializable("0", 0));
+    event.setTailKey(161l);
+    event.setVersionTag(tag);
+    event.setEventId(new EventID(cache.getDistributedSystem()));
+
+    // Put event into region
+    br.getRegionMap().basicPut(event, timestamp, true, false, null, false, false);
+  }
+
+  private void startProcessingAsyncEvents(String aeqId) {
+    // Get the async event listener
+    WaitingAsyncEventListener listener = getWaitingAsyncEventListener(aeqId);
+
+    // Start processing waiting events
+    listener.startProcessingEvents();
+  }
+
+  private WaitingAsyncEventListener getWaitingAsyncEventListener(String aeqId) {
+    // Get the async event queue
+    AsyncEventQueue aeq = cache.getAsyncEventQueue(aeqId);
+    assertThat(aeq).isNotNull();
+
+    // Get and return the async event listener
+    AsyncEventListener aeqListener = aeq.getAsyncEventListener();
+    assertThat(aeqListener).isInstanceOf(WaitingAsyncEventListener.class);
+    return (WaitingAsyncEventListener) aeqListener;
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index dc7a218..6ba8c66 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
 import org.junit.experimental.categories.Category;
@@ -119,7 +120,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
 
   protected static AsyncEventListener eventListener1;
 
-  private static final long MAX_WAIT = 10000;
+  private static final long MAX_WAIT = 60000;
 
   protected static GatewayEventFilter eventFilter;
 
@@ -477,21 +478,21 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
   }
 
   public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
-    AsyncEventQueue theAsyncEventQueue = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncQueueId.equals(asyncChannel.getId())) {
-        theAsyncEventQueue = asyncChannel;
-      }
-    }
+    checkAsyncEventQueueSize(asyncQueueId, numQueueEntries, false);
+  }
 
-    GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue).getSender();
+  public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries,
+      boolean localSize) {
+    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(asyncQueueId);
+    GatewaySender sender = aeq.getSender();
 
     if (sender.isParallel()) {
       Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
-      assertEquals(numQueueEntries,
-          queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
+      Region queueRegion = queues.toArray(new RegionQueue[queues.size()])[0].getRegion();
+      if (localSize) {
+        queueRegion = PartitionRegionHelper.getLocalData(queueRegion);
+      }
+      assertEquals(numQueueEntries, queueRegion.size());
     } else {
       Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
       int size = 0;
@@ -513,19 +514,21 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
    */
   public static void waitForAsyncEventQueueSize(String asyncQueueId, final int numQueueEntries)
       throws Exception {
-    AsyncEventQueue theAsyncEventQueue = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncQueueId.equals(asyncChannel.getId())) {
-        theAsyncEventQueue = asyncChannel;
-      }
-    }
+    waitForAsyncEventQueueSize(asyncQueueId, numQueueEntries, false);
+  }
 
-    GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue).getSender();
+  public static void waitForAsyncEventQueueSize(String asyncQueueId, final int numQueueEntries,
+      boolean localSize) throws Exception {
+    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(asyncQueueId);
+    GatewaySender sender = aeq.getSender();
 
     if (sender.isParallel()) {
       final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
+      Region queueRegion = queues.toArray(new RegionQueue[queues.size()])[0].getRegion();
+      if (localSize) {
+        queueRegion = PartitionRegionHelper.getLocalData(queueRegion);
+      }
+      final Region fQueueRegion = queueRegion;
 
       Wait.waitForCriterion(new WaitCriterion() {
 
@@ -534,8 +537,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
         }
 
         public boolean done() {
-          boolean done = numQueueEntries == queues.toArray(new RegionQueue[queues.size()])[0]
-              .getRegion().size();
+          boolean done = numQueueEntries == fQueueRegion.size();
           return done;
         }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitingAsyncEventListener.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitingAsyncEventListener.java
new file mode 100644
index 0000000..f616f67
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitingAsyncEventListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+
+public class WaitingAsyncEventListener implements AsyncEventListener {
+
+  private final CountDownLatch latch = new CountDownLatch(1);
+
+  private final AtomicInteger numberOfEvents = new AtomicInteger();
+
+  public boolean processEvents(List<AsyncEvent> events) {
+    try {
+      waitToStartProcessingEvents();
+    } catch (InterruptedException e) {
+      throw new RuntimeException("WaitingAsyncEventListener processEvents was interrupted");
+    }
+    for (AsyncEvent event : events) {
+      process(event);
+    }
+    return true;
+  }
+
+  private void process(AsyncEvent event) {
+    incrementTotalEvents();
+  }
+
+  private void waitToStartProcessingEvents() throws InterruptedException {
+    this.latch.await();
+  }
+
+  public void startProcessingEvents() {
+    this.latch.countDown();
+  }
+
+  private int incrementTotalEvents() {
+    return this.numberOfEvents.incrementAndGet();
+  }
+
+  public int getTotalEvents() {
+    return this.numberOfEvents.get();
+  }
+
+  public void close() {}
+}
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 1820393..c0cf5ee 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1992,6 +1992,14 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
 fromData,183,2bb9007301003d1c10119f00032a04b5002b2a2bb900740100b500282a2bb900740100b500291c1011a200232bc1007599001c2bb80076b20077a60012bb0078592bc00075b20079b7007a4c2a2bb8007bc0007cb5002a2a2bb8007db500102a2bb9007e0100b5002e2a2bb6007f2a2bb80080b500302a2bb8007bc00020b500212a2bb900810100b500132a2bb900820100b500172a2bb900740100b500092a2bb900820100b80004b500052a2bb900820100b5001bb1
 toData,133,2ab600272b1011b9006802002b2ab40028b9006902002b2ab40029b9006902002ab4002a2bb8006a2ab400102bb8006b2b2ab4002eb9006c02002a2bb6006d2ab6002f2bb8006e2ab400212bb8006a2b2ab40013b9006f02002b2ab40017b9007003002b2ab40009b9006902002b2ab40005b60071b9007003002b2ab60072b900700300b1
 
+org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
+fromData,20,2a2bb80006b500022a2bb80006c00007b50001b1
+toData,17,2ab400022bb800052ab400012bb80005b1
+
+org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationMessage,2
+fromData,32,2a2bb7003d2a2bb9003e0100b500032a2bb8003fb500062a2bb80040b50008b1
+toData,35,2a2bb700392b2ab40003b9003a02002ab400062bb8003b2ab40008c0001e2bb8003cb1
+
 org/apache/geode/internal/cache/wan/parallel/ParallelQueueBatchRemovalMessage,2
 fromData,17,2a2bb7003b2a2bb8003cc0003db50004b1
 toData,14,2a2bb700392ab400042bb8003ab1
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 58d017c..8f51d71 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -3230,7 +3230,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
           .getAllLocalPrimaryBucketRegions();
       for (final BucketRegion bucket : buckets) {
-        Awaitility.await().atMost(180, TimeUnit.SECONDS).until(() -> {
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
           assertEquals("Expected bucket entries for bucket: " + bucket.getId()
               + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: "
               + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0,

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].