You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/10 17:51:14 UTC

[geode] branch develop updated: GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e42ebec  GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)
e42ebec is described below

commit e42ebec32518c014ab6bd3ffe6d1ee3cab813762
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Tue Apr 10 10:51:10 2018 -0700

    GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  5 ++
 .../geode/internal/cache/EntryEventImpl.java       |  3 +
 .../internal/cache/PartitionedRegionDataStore.java |  6 +-
 .../internal/cache/wan/AbstractGatewaySender.java  | 15 ++++-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 50 +++++++++++++-
 .../internal/cache/wan/GatewaySenderStats.java     | 16 +++++
 .../ConcurrentParallelGatewaySenderQueue.java      |  9 +++
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 20 +++++-
 .../geode/internal/cache/wan/WANTestBase.java      | 78 +++++++++++++++++++---
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 38 +++++++++--
 .../SerialGatewaySenderOperationsDUnitTest.java    | 49 +++++++++++++-
 11 files changed, 263 insertions(+), 26 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..b8259a3 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -47,6 +47,9 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
                 "Number of events received but not added to the event queue because the queue already contains an event with the event's key.",
                 "operations"),
+            f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
+            f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+                "Number of events not added to primary queue due to sender yet runing.", "events"),
             f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES,
                 "Number of events conflated from batches.", "operations"),
             f.createIntCounter(EVENTS_DISTRIBUTED,
@@ -122,6 +125,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
     unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
     conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
     notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+    notQueuedEventsAtYetRunningPrimarySenderId =
+        type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
     eventsFilteredId = type.nameToId(EVENTS_FILTERED);
     eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index c91d236..664d054 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -2158,6 +2158,9 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
     if (this.getInhibitDistribution()) {
       buf.append(";inhibitDistribution");
     }
+    if (this.tailKey != -1) {
+      buf.append(";tailKey=" + tailKey);
+    }
     buf.append("]");
     return buf.toString();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index ef8eb99..d468ef4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
     return sizeOfLocalPrimaries;
   }
 
-  public int getSizeOfLocalBuckets(boolean includeSecondary) {
+  public int getSizeOfLocalBuckets() {
     int sizeOfLocal = 0;
-    Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions();
-    for (BucketRegion br : primaryBuckets) {
+    Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions();
+    for (BucketRegion br : allLocalBuckets) {
       sizeOfLocal += br.size();
     }
     return sizeOfLocal;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..034d810 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     // If this gateway is not running, return
     if (!isRunning()) {
       if (isDebugEnabled) {
-        logger.debug("Returning back without putting into the gateway sender queue");
+        logger.debug("Returning back without putting into the gateway sender queue:" + event);
+      }
+      if (this.eventProcessor != null) {
+        this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
       }
       return;
     }
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
         // The sender may have stopped, after we have checked the status in the beginning.
         if (!isRunning()) {
           if (isDebugEnabled) {
-            logger.debug("Returning back without putting into the gateway sender queue");
+            logger.debug("Returning back without putting into the gateway sender queue:" + event);
+          }
+          if (this.eventProcessor != null) {
+            this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
           }
           return;
         }
@@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
+  public int getEventSecondaryQueueSize() {
+    AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+    return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
     this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..eea7480 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,6 +33,7 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -49,6 +50,7 @@ import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -261,15 +263,57 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     }
 
     // This should be local size instead of pr size
-    if (this.queue instanceof ParallelGatewaySenderQueue) {
-      return ((ParallelGatewaySenderQueue) queue).localSize();
-    }
     if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
       return ((ConcurrentParallelGatewaySenderQueue) queue).localSize();
     }
     return this.queue.size();
   }
 
+  public int eventSecondaryQueueSize() {
+    if (queue == null) {
+      return 0;
+    }
+
+    // if parallel, get both primary and secondary queues' size, then substract primary queue's size
+    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
+          - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
+      return size;
+    }
+    return this.queue.size();
+  }
+
+  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
+    if (queue == null) {
+      return;
+    }
+    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
+      PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
+      if (prQ == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
+              + " is not created yet.");
+        }
+        return;
+      }
+      int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
+      long shadowKey = event.getTailKey();
+
+      ParallelGatewaySenderQueue pgsq =
+          (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+      boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+      if (isPrimary) {
+        pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
+        this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender();
+        if (logger.isDebugEnabled()) {
+          logger.debug("register dropped event for primary queue. BucketId is " + bucketId
+              + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
+        }
+      }
+    }
+  }
+
   /**
    * @return the sender
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index c7fd370..adaf928 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -84,6 +84,8 @@ public class GatewaySenderStats {
 
   protected static final String EVENTS_FILTERED = "eventsFiltered";
   protected static final String NOT_QUEUED_EVENTS = "notQueuedEvent";
+  protected static final String NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER =
+      "notQueuedEventAtYetRunningPrimarySender";
 
   protected static final String LOAD_BALANCES_COMPLETED = "loadBalancesCompleted";
   protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress";
@@ -135,6 +137,8 @@ public class GatewaySenderStats {
   protected static int eventsFilteredId;
   /** Id of not queued events */
   protected static int notQueuedEventsId;
+  /** Id of not queued events due to the primary sender is yet running */
+  protected static int notQueuedEventsAtYetRunningPrimarySenderId;
   /** Id of events conflated in batch */
   protected static int eventsConflatedFromBatchesId;
   /** Id of load balances completed */
@@ -213,6 +217,8 @@ public class GatewaySenderStats {
             f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE,
                 "Current number of entries in the conflation indexes map.", "events"),
             f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
+            f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+                "Number of events not added to primary queue due to sender yet runing.", "events"),
             f.createIntCounter(EVENTS_FILTERED,
                 "Number of events filtered through GatewayEventFilter.", "events"),
             f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed",
@@ -249,6 +255,8 @@ public class GatewaySenderStats {
     unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
     conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
     notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+    notQueuedEventsAtYetRunningPrimarySenderId =
+        type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
     eventsFilteredId = type.nameToId(EVENTS_FILTERED);
     eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
@@ -599,6 +607,14 @@ public class GatewaySenderStats {
     return this.stats.getInt(notQueuedEventsId);
   }
 
+  public void incEventsNotQueuedAtYetRunningPrimarySender() {
+    this.stats.incInt(notQueuedEventsAtYetRunningPrimarySenderId, 1);
+  }
+
+  public int getEventsNotQueuedAtYetRunningPrimarySender() {
+    return this.stats.getInt(notQueuedEventsAtYetRunningPrimarySenderId);
+  }
+
   public void incEventsFiltered() {
     this.stats.incInt(eventsFilteredId, 1);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 4fc940c..e556910 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -121,6 +121,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
     return this.processors[0].getQueue().size();
   }
 
+  public String displayContent() {
+    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue());
+    return pgsq.displayContent();
+  }
+
   public int localSize() {
     return localSize(false);
   }
@@ -190,6 +195,10 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
     return processors[index];
   }
 
+  public RegionQueue getQueueByBucket(int bucketId) {
+    return getPGSProcessor(bucketId).getQueue();
+  }
+
   public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
     return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 3aa8534..89880fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   // This method may need synchronization in case it is used by
   // ConcurrentParallelGatewaySender
-  protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
+  public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
     StoppableReentrantLock lock = buckToDispatchLock;
     if (lock != null) {
       lock.lock();
@@ -1405,12 +1405,28 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return localSize(false);
   }
 
+  public String displayContent() {
+    int size = 0;
+    StringBuffer sb = new StringBuffer();
+    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+      if (prQ != null && prQ.getDataStore() != null) {
+        Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions();
+        for (BucketRegion br : allLocalBuckets) {
+          if (br.size() > 0) {
+            sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";");
+          }
+        }
+      }
+    }
+    return sb.toString();
+  }
+
   public int localSize(boolean includeSecondary) {
     int size = 0;
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
         if (includeSecondary) {
-          size += prQ.getDataStore().getSizeOfLocalBuckets(true);
+          size += prQ.getDataStore().getSizeOfLocalBuckets();
         } else {
           size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
         }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 226595b..3799083 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -933,6 +933,8 @@ public class WANTestBase extends DistributedTestCase {
     }
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+    String logLevel = System.getProperty(LOG_LEVEL, "info");
+    props.setProperty(LOG_LEVEL, logLevel);
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
@@ -1155,6 +1157,21 @@ public class WANTestBase extends DistributedTestCase {
     return stats;
   }
 
+  public static List<Integer> getSenderStatsForDroppedEvents(String senderId) {
+    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+    GatewaySenderStats statistics = sender.getStatistics();
+    ArrayList<Integer> stats = new ArrayList<Integer>();
+    int eventNotQueued = statistics.getEventsNotQueuedAtYetRunningPrimarySender();
+    if (eventNotQueued > 0) {
+      logger.info(
+          "Found " + eventNotQueued + " not queued events due to primary sender is yet running");
+    }
+    stats.add(eventNotQueued);
+    stats.add(statistics.getEventsNotQueued());
+    stats.add(statistics.getEventsNotQueuedConflated());
+    return stats;
+  }
+
   public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
       final int eventsQueued, final int eventsDistributed) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -2746,11 +2763,21 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateQueueSizeStat(String id, final int queueSize) {
     final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
         .until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
     assertEquals(queueSize, sender.getEventQueueSize());
   }
 
+  public static void validateSecondaryQueueSizeStat(String id, final int queueSize) {
+    final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
+    Awaitility.await().atMost(120, TimeUnit.SECONDS)
+        .until(() -> assertEquals(
+            "Expected unprocessedEventMap is drained but actual is "
+                + sender.getStatistics().getUnprocessedEventMapSize(),
+            queueSize, sender.getStatistics().getUnprocessedEventMapSize()));
+    assertEquals(queueSize, sender.getStatistics().getUnprocessedEventMapSize());
+  }
+
   /**
    * This method is specifically written for pause and stop operations. This method validates that
    * the region size remains same for at least minimum number of verification attempts and also it
@@ -3053,6 +3080,31 @@ public class WANTestBase extends DistributedTestCase {
     });
   }
 
+  public static Integer getSecondaryQueueContentSize(final String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+    int size = abstractSender.getEventSecondaryQueueSize();
+    return size;
+  }
+
+  public static String displayQueueContent(final RegionQueue queue) {
+    if (queue instanceof ParallelGatewaySenderQueue) {
+      ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue;
+      return pgsq.displayContent();
+    } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue;
+      return pgsq.displayContent();
+    }
+    return null;
+  }
+
   public static Integer getQueueContentSize(final String senderId) {
     return getQueueContentSize(senderId, false);
   }
@@ -3135,14 +3187,22 @@ public class WANTestBase extends DistributedTestCase {
           ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
       Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
           .getAllLocalPrimaryBucketRegions();
-      for (final BucketRegion bucket : buckets) {
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
-          assertEquals("Expected bucket entries for bucket: " + bucket.getId()
-              + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: "
-              + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0,
-              bucket.keySet().size());
-        });
-      } // for loop ends
+      final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+      RegionQueue queue = abstractSender.getEventProcessor().queue;
+      Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+        assertEquals("Expected events in all primary queues are drained but actual is "
+            + abstractSender.getEventQueueSize() + ". Queue content is: "
+            + displayQueueContent(queue), 0, abstractSender.getEventQueueSize());
+      });
+      assertEquals("Expected events in all primary queues after drain is 0", 0,
+          abstractSender.getEventQueueSize());
+      Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+        assertEquals("Expected events in all secondary queues are drained but actual is "
+            + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
+            + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
+      });
+      assertEquals("Except events in all secondary queues after drain is 0", 0,
+          abstractSender.getEventSecondaryQueueSize());
     } finally {
       exp.remove();
       exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index eaef4f9..f5b98b7 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -20,6 +20,8 @@ import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_S
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.ArrayList;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -407,18 +409,42 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200));
 
     // SECOND RUN: start async puts on region
-    AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
-
-    // when puts are happening by another thread, start the senders
-    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
-    async.join();
+    ArrayList<Integer> vm4List = null;
+    ArrayList<Integer> vm5List = null;
+    ArrayList<Integer> vm6List = null;
+    ArrayList<Integer> vm7List = null;
+    boolean foundDroppedAtYetStartedPrimarySender = false;
+    int count = 0;
+
+    do {
+      stopSenders();
+      AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
+
+      // when puts are happening by another thread, start the senders
+      startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+      async.join();
+      vm4List =
+          (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      vm5List =
+          (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      vm6List =
+          (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      vm7List =
+          (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) {
+        foundDroppedAtYetStartedPrimarySender = true;
+      }
+      count++;
+    } while (foundDroppedAtYetStartedPrimarySender == false && count < 5);
+    assertThat(foundDroppedAtYetStartedPrimarySender);
 
     // verify all the buckets on all the sender nodes are drained
     validateParallelSenderQueueAllBucketsDrained();
 
     // verify that the queue size ultimately becomes zero. That means all the events propagate to
     // remote site.
+
     vm4.invoke(() -> validateQueueContents("ln", 0));
   }
 
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index ee43b83..8df5650 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -269,6 +269,53 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
   }
 
   @Test
+  public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
+    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
+
+    vm4.invoke(() -> WANTestBase.stopSender("ln"));
+    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
+
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+
+    // do a lot of puts while senders are restarting
+    AsyncInvocation async = vm7.invokeAsync(() -> doPuts(getTestMethodName() + "_RR", 5000));
+
+    startSenderInVMsAsync("ln", vm4, vm5);
+    async.join();
+
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+    vm4.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
+    vm5.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
+  }
+
+  @Test
   public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -298,7 +345,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
 
-    // Do some puts while restarting a sender
+    // Do some puts from both vm4 and vm5 while restarting a sender
     AsyncInvocation asyncPuts =
         vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 300));
 

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.