You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/05/09 23:38:09 UTC

[geode] branch feature/GEODE-5087 updated (8f292eb -> 9dcec30)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-5087
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit 8f292eb  GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender
     new 9dcec30  GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (8f292eb)
            \
             N -- N -- N   refs/heads/feature/GEODE-5087 (9dcec30)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../wan/serial/SerialGatewaySenderEventProcessor.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.

[geode] 01/01: GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-5087
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9dcec3010c454460e00f2792bd4c5a0d2e943ee8
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Thu May 3 17:15:33 2018 -0700

    GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender
---
 .../internal/ParallelAsyncEventQueueImpl.java      |  2 +-
 .../internal/SerialAsyncEventQueueImpl.java        |  2 +-
 .../internal/cache/wan/AbstractGatewaySender.java  | 68 +++++++++++++---------
 .../wan/AbstractGatewaySenderEventProcessor.java   | 33 +----------
 ...currentParallelGatewaySenderEventProcessor.java | 32 ++++++++++
 .../ParallelGatewaySenderEventProcessor.java       |  6 ++
 .../cache/wan/serial/BatchDestroyOperation.java    | 25 +++++++-
 ...oncurrentSerialGatewaySenderEventProcessor.java | 64 ++++++++++++--------
 .../serial/SerialGatewaySenderEventProcessor.java  | 47 +++++++++++++--
 .../xmlcache/ParallelAsyncEventQueueCreation.java  |  2 +-
 .../xmlcache/ParallelGatewaySenderCreation.java    |  2 +-
 .../xmlcache/SerialAsyncEventQueueCreation.java    |  2 +-
 .../xmlcache/SerialGatewaySenderCreation.java      |  2 +-
 .../wan/parallel/ParallelGatewaySenderImpl.java    |  2 +-
 .../cache/wan/serial/SerialGatewaySenderImpl.java  |  6 +-
 .../SerialGatewaySenderOperationsDUnitTest.java    |  2 -
 16 files changed, 196 insertions(+), 101 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 538b65a..8e2e4e4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -168,7 +168,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     int bucketId = -1;
     // merged from 42004
     if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 9e0239d..400126d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -225,7 +225,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
    * internal.cache.EntryEventImpl)
    */
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     EventID originalEventId = clonedEvent.getEventId();
     long originalThreadId = originalEventId.getThreadID();
     long newThreadId = originalThreadId;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 1027582..3b55989 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -173,6 +173,9 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
 
   protected volatile ConcurrentLinkedQueue<TmpQueueEvent> tmpQueuedEvents =
       new ConcurrentLinkedQueue<>();
+
+  protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents =
+      new ConcurrentLinkedQueue<>();
   /**
    * The number of seconds to wait before stopping the GatewaySender. Default is 0 seconds.
    */
@@ -844,40 +847,43 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
-    // If this gateway is not running, return
-    if (!isRunning()) {
-      if (isDebugEnabled) {
-        logger.debug("Returning back without putting into the gateway sender queue:" + event);
-      }
-      if (this.eventProcessor != null) {
-        this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
-      }
-      return;
-    }
-
-    final GatewaySenderStats stats = getStatistics();
-    stats.incEventsReceived();
-
-    if (!checkForDistribution(event, stats)) {
-      stats.incEventsNotQueued();
-      return;
-    }
-
-    // this filter is defined by Asif which exist in old wan too. new wan has
-    // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
-    // not considering this filter
-    if (!this.filter.enqueueEvent(event)) {
-      stats.incEventsFiltered();
-      return;
-    }
     // released by this method or transfers ownership to TmpQueueEvent
     @Released
     EntryEventImpl clonedEvent = new EntryEventImpl(event, false);
     boolean freeClonedEvent = true;
     try {
 
-      Region region = event.getRegion();
+      // If this gateway is not running, return
+      if (!isRunning()) {
+        if (this.isPrimary()) {
+          tmpDroppedEvents.add(clonedEvent);
+          if (isDebugEnabled) {
+            logger.debug("add to tmpDroppedEvents for evnet {}", clonedEvent);
+          }
+        }
+        if (isDebugEnabled) {
+          logger.debug("Returning back without putting into the gateway sender queue:" + event);
+        }
+        return;
+      }
+
+      final GatewaySenderStats stats = getStatistics();
+      stats.incEventsReceived();
+
+      if (!checkForDistribution(event, stats)) {
+        stats.incEventsNotQueued();
+        return;
+      }
 
+      // this filter is defined by Asif which exist in old wan too. new wan has
+      // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
+      // not considering this filter
+      if (!this.filter.enqueueEvent(event)) {
+        stats.incEventsFiltered();
+        return;
+      }
+
+      // start to distribute
       setModifiedEventId(clonedEvent);
       Object callbackArg = clonedEvent.getRawCallbackArgument();
 
@@ -1024,6 +1030,12 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
    */
   public void enqueueTempEvents() {
     if (this.eventProcessor != null) {// Fix for defect #47308
+      // process tmpDroppedEvents
+      EntryEventImpl droppedEvent = null;
+      while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
+        this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
+      }
+
       TmpQueueEvent nextEvent = null;
       final GatewaySenderStats stats = getStatistics();
       try {
@@ -1224,7 +1236,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return region;
   }
 
-  protected abstract void setModifiedEventId(EntryEventImpl clonedEvent);
+  public abstract void setModifiedEventId(EntryEventImpl clonedEvent);
 
   public static class DefaultGatewayEventFilter
       implements org.apache.geode.internal.cache.GatewayEventFilter {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 2ce06c6..89fa586 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,7 +33,6 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -279,36 +277,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     return this.queue.size();
   }
 
-  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
-    if (queue == null) {
-      return;
-    }
-    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
-      ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
-      PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
-      if (prQ == null) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
-              + " is not created yet.");
-        }
-        return;
-      }
-      int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
-      long shadowKey = event.getTailKey();
-
-      ParallelGatewaySenderQueue pgsq =
-          (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
-      boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-      if (isPrimary) {
-        pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
-        this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
-        if (logger.isDebugEnabled()) {
-          logger.debug("register dropped event for primary queue. BucketId is " + bucketId
-              + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
-        }
-      }
-    }
-  }
+  protected abstract void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent);
 
   /**
    * @return the sender
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 54b7034..6b8cce1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -32,12 +32,15 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -138,6 +141,35 @@ public class ConcurrentParallelGatewaySenderEventProcessor
   }
 
   @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    if (queue == null) {
+      return;
+    }
+    ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
+    PartitionedRegion prQ = cpgsq.getRegion(droppedEvent.getRegion().getFullPath());
+    if (prQ == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("shadow partitioned region " + droppedEvent.getRegion().getFullPath()
+            + " is not created yet.");
+      }
+      return;
+    }
+    int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) droppedEvent);
+    long shadowKey = droppedEvent.getTailKey();
+
+    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+    boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+    if (isPrimary) {
+      pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
+      this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
+      if (logger.isDebugEnabled()) {
+        logger.debug("register dropped event for primary queue. BucketId is " + bucketId
+            + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
+      }
+    }
+  }
+
+  @Override
   public void run() {
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 5715a35..77811c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -149,6 +149,12 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
     }
   }
 
+  @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    logger.info("ParallelGatewaySenderEventProcessor should not process dropped event {}",
+        droppedEvent);
+  }
+
   public void clear(PartitionedRegion pr, int bucketId) {
     ((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
index debb005..0744561 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
@@ -102,7 +102,7 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
         }
 
         // Optimized way
-        for (long k = (Long) this.key; k <= this.tailKey; k++) {
+        for (long k = (Long) this.key; k <= this.tailKey && this.tailKey != -1; k++) {
           try {
             for (GatewayEventFilter filter : rgn.getSerialGatewaySender()
                 .getGatewayEventFilters()) {
@@ -124,6 +124,29 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
             }
           }
         }
+
+        // destroy dropped event from unprocessedKeys
+        if (this.tailKey == -1) {
+          SerialGatewaySenderEventProcessor ep = null;
+          int index = ((Long) this.key).intValue();
+          if (index == -1) {
+            // this is SerialGatewaySenderEventProcessor
+            ep = (SerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender()
+                .getEventProcessor();
+          } else {
+            ConcurrentSerialGatewaySenderEventProcessor csgep =
+                (ConcurrentSerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender()
+                    .getEventProcessor();
+            ep = csgep.processors.get(index);
+          }
+          boolean removed = ep.basicHandlePrimaryDestroy(ev.getEventId());
+          if (removed) {
+            if (isDebugEnabled) {
+              logger.debug("Removed a dropped event {} from unprocessedEvents.",
+                  (EntryEventImpl) event);
+            }
+          }
+        }
         this.appliedOperation = true;
       } catch (CacheWriterException e) {
         throw new Error(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index ec01fd9..8ec6ce1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -109,6 +109,35 @@ public class ConcurrentSerialGatewaySenderEventProcessor
 
   }
 
+  public void setModifiedEventId(EntryEventImpl clonedEvent, int index) {
+    EventID originalEventId = clonedEvent.getEventId();
+    if (logger.isDebugEnabled()) {
+      logger.debug("The original EventId is {}", originalEventId);
+    }
+    // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
+    // generating threadId by the algorithm explained above used to clash with
+    // fakeThreadId generated by putAll
+    // below is new way to generate threadId so that it doesn't clash with
+    // any.
+    long newThreadId =
+        ThreadIdentifier.createFakeThreadIDForParallelGateway(index, originalEventId.getThreadID(),
+            0 /*
+               * gateway sender event id index has already been applied in
+               * SerialGatewaySenderImpl.setModifiedEventId
+               */);
+    EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId,
+        originalEventId.getSequenceID());
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}"
+              + ":index=" + this.sender.getEventIdIndex(),
+          this, clonedEvent.getKey(), index, originalEventId,
+          ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), newEventId,
+          ThreadIdentifier.toDisplayString(newThreadId));
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
   public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
       int index) throws CacheException, IOException {
     // Get the appropriate gateway
@@ -121,30 +150,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
       @Released
       EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event);
       try {
-        EventID originalEventId = clonedEvent.getEventId();
-        if (logger.isDebugEnabled()) {
-          logger.debug("The original EventId is {}", originalEventId);
-        }
-        // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
-        // generating threadId by the algorithm explained above used to clash with
-        // fakeThreadId generated by putAll
-        // below is new way to generate threadId so that it doesn't clash with
-        // any.
-        long newThreadId = ThreadIdentifier.createFakeThreadIDForParallelGateway(index,
-            originalEventId.getThreadID(),
-            0 /*
-               * gateway sender event id index has already been applied in
-               * SerialGatewaySenderImpl.setModifiedEventId
-               */);
-        EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId,
-            originalEventId.getSequenceID());
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
-              this, event.getKey(), index, originalEventId, originalEventId.getThreadID(),
-              newEventId, newThreadId);
-        }
-        clonedEvent.setEventId(newEventId);
+        setModifiedEventId(clonedEvent, index);
         serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue);
       } finally {
         clonedEvent.release();
@@ -375,6 +381,16 @@ public class ConcurrentSerialGatewaySenderEventProcessor
   }
 
   @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    this.getSender().setModifiedEventId(droppedEvent);
+    // modified event again for concurrent SGSEP
+    int index = Math.abs(getHashCode(((EntryEventImpl) droppedEvent)) % this.processors.size());
+    setModifiedEventId(droppedEvent, index);
+
+    this.processors.get(index).sendBatchDestroyOperationForDroppedEvent(droppedEvent, index);
+  }
+
+  @Override
   protected void enqueueEvent(GatewayQueueEvent event) {
     for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
       serialProcessor.enqueueEvent(event);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 3fa4d6a..39609c7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender.EventWrapper;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -610,7 +611,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
       }
       my_executor.execute(new Runnable() {
         public void run() {
-          basicHandlePrimaryDestroy(gatewayEvent);
+          basicHandlePrimaryDestroy(gatewayEvent.getEventId());
         }
       });
     }
@@ -620,23 +621,25 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
    * Just remove the event from the unprocessed events map if it is present. This method added to
    * fix bug 37603
    */
-  protected void basicHandlePrimaryDestroy(final GatewaySenderEventImpl gatewayEvent) {
+  protected boolean basicHandlePrimaryDestroy(final EventID eventId) {
     if (this.sender.isPrimary()) {
       // no need to do anything if we have become the primary
-      return;
+      return false;
     }
     GatewaySenderStats statistics = this.sender.getStatistics();
     // Get the event from the map
     synchronized (unprocessedEventsLock) {
       if (this.unprocessedEvents == null)
-        return;
+        return false;
       // now we can safely use the unprocessedEvents field
-      EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());
+      EventWrapper ew = this.unprocessedEvents.remove(eventId);
       if (ew != null) {
         ew.event.release();
         statistics.incUnprocessedEventsRemovedByPrimary();
+        return true;
       }
     }
+    return false;
   }
 
   protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) {
@@ -865,4 +868,38 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
     // @TODO This API hasn't been implemented yet
     throw new UnsupportedOperationException();
   }
+
+  public void sendBatchDestroyOperationForDroppedEvent(EntryEventImpl dropEvent, int index) {
+    EntryEventImpl destroyEvent =
+        EntryEventImpl.create((LocalRegion) this.queue.getRegion(), Operation.DESTROY, (long) index,
+            null/* newValue */, null, false, sender.getCache().getMyId());
+    destroyEvent.setEventId(dropEvent.getEventId());
+    destroyEvent.disallowOffHeapValues();
+    destroyEvent.setTailKey(-1L);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to secondary for event {}",
+          destroyEvent);
+    }
+
+    try {
+      BatchDestroyOperation op = new BatchDestroyOperation(destroyEvent);
+      op.distribute();
+      if (logger.isDebugEnabled()) {
+        logger.debug("BatchRemovalThread completed destroy of dropped event {}", dropEvent);
+      }
+    } catch (Exception ignore) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Exception in sending dropped event could be ignored in order not to interrupt sender starting",
+            ignore);
+      }
+    }
+  }
+
+  @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    this.getSender().setModifiedEventId(droppedEvent);
+    sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1);
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
index 6f8efa8..4686b67 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
@@ -87,5 +87,5 @@ public class ParallelAsyncEventQueueCreation extends AbstractGatewaySender
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 5b025b5..257ee75 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -90,7 +90,7 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender impleme
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 
   protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
     throw new UnsupportedOperationException();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
index ce71c54..cd06661 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
@@ -86,5 +86,5 @@ public class SerialAsyncEventQueueCreation extends AbstractGatewaySender impleme
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index 80c04de..b0766ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -87,7 +87,7 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 
   protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
     throw new UnsupportedOperationException();
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index d023704..f565426 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -167,7 +167,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     int bucketId = -1;
     // merged from 42004
     if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index d964253..ecca896 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -211,7 +211,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     EventID originalEventId = clonedEvent.getEventId();
     long originalThreadId = originalEventId.getThreadID();
     long newThreadId = originalThreadId;
@@ -226,7 +226,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}",
-          this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId);
+          this, clonedEvent.getKey(), originalEventId,
+          ThreadIdentifier.toDisplayString(originalThreadId), newEventId,
+          ThreadIdentifier.toDisplayString(newThreadId));
     }
     clonedEvent.setEventId(newEventId);
   }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index caa357e..4993f24 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -50,7 +50,6 @@ import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.WanTest;
 
 @Category({DistributedTest.class, WanTest.class})
@@ -266,7 +265,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
   }
 
-  @Category({FlakyTest.class, WanTest.class}) // GEODE-5056
   @Test
   public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.