You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/10 21:09:44 UTC

[geode] branch develop updated: Revert "GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)"

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 26f976b  Revert "GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)"
26f976b is described below

commit 26f976b12637b64f34d0c58833e094fce999956e
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Tue Apr 10 14:08:49 2018 -0700

    Revert "GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)"
    
    This reverts commit e42ebec32518c014ab6bd3ffe6d1ee3cab813762.
    Due to acceptance test failed after merge. Need to double check the merge.
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  5 --
 .../geode/internal/cache/EntryEventImpl.java       |  3 -
 .../internal/cache/PartitionedRegionDataStore.java |  6 +-
 .../internal/cache/wan/AbstractGatewaySender.java  | 15 +----
 .../wan/AbstractGatewaySenderEventProcessor.java   | 50 +-------------
 .../internal/cache/wan/GatewaySenderStats.java     | 16 -----
 .../ConcurrentParallelGatewaySenderQueue.java      |  9 ---
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 20 +-----
 .../geode/internal/cache/wan/WANTestBase.java      | 78 +++-------------------
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 38 ++---------
 .../SerialGatewaySenderOperationsDUnitTest.java    | 49 +-------------
 11 files changed, 26 insertions(+), 263 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index b8259a3..dee2c92 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -47,9 +47,6 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
                 "Number of events received but not added to the event queue because the queue already contains an event with the event's key.",
                 "operations"),
-            f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
-            f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
-                "Number of events not added to primary queue due to sender yet runing.", "events"),
             f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES,
                 "Number of events conflated from batches.", "operations"),
             f.createIntCounter(EVENTS_DISTRIBUTED,
@@ -125,8 +122,6 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
     unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
     conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
     notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
-    notQueuedEventsAtYetRunningPrimarySenderId =
-        type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
     eventsFilteredId = type.nameToId(EVENTS_FILTERED);
     eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 664d054..c91d236 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -2158,9 +2158,6 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
     if (this.getInhibitDistribution()) {
       buf.append(";inhibitDistribution");
     }
-    if (this.tailKey != -1) {
-      buf.append(";tailKey=" + tailKey);
-    }
     buf.append("]");
     return buf.toString();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index d468ef4..ef8eb99 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
     return sizeOfLocalPrimaries;
   }
 
-  public int getSizeOfLocalBuckets() {
+  public int getSizeOfLocalBuckets(boolean includeSecondary) {
     int sizeOfLocal = 0;
-    Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions();
-    for (BucketRegion br : allLocalBuckets) {
+    Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions();
+    for (BucketRegion br : primaryBuckets) {
       sizeOfLocal += br.size();
     }
     return sizeOfLocal;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 034d810..a134e1e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,10 +849,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     // If this gateway is not running, return
     if (!isRunning()) {
       if (isDebugEnabled) {
-        logger.debug("Returning back without putting into the gateway sender queue:" + event);
-      }
-      if (this.eventProcessor != null) {
-        this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
+        logger.debug("Returning back without putting into the gateway sender queue");
       }
       return;
     }
@@ -965,10 +962,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
         // The sender may have stopped, after we have checked the status in the beginning.
         if (!isRunning()) {
           if (isDebugEnabled) {
-            logger.debug("Returning back without putting into the gateway sender queue:" + event);
-          }
-          if (this.eventProcessor != null) {
-            this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
+            logger.debug("Returning back without putting into the gateway sender queue");
           }
           return;
         }
@@ -1257,11 +1251,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
-  public int getEventSecondaryQueueSize() {
-    AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
-    return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
-  }
-
   public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
     this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index eea7480..9309e43 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,7 +33,6 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -263,57 +261,15 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     }
 
     // This should be local size instead of pr size
-    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
-      return ((ConcurrentParallelGatewaySenderQueue) queue).localSize();
-    }
-    return this.queue.size();
-  }
-
-  public int eventSecondaryQueueSize() {
-    if (queue == null) {
-      return 0;
+    if (this.queue instanceof ParallelGatewaySenderQueue) {
+      return ((ParallelGatewaySenderQueue) queue).localSize();
     }
-
-    // if parallel, get both primary and secondary queues' size, then substract primary queue's size
     if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
-      int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
-          - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
-      return size;
+      return ((ConcurrentParallelGatewaySenderQueue) queue).localSize();
     }
     return this.queue.size();
   }
 
-  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
-    if (queue == null) {
-      return;
-    }
-    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
-      ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
-      PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
-      if (prQ == null) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
-              + " is not created yet.");
-        }
-        return;
-      }
-      int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
-      long shadowKey = event.getTailKey();
-
-      ParallelGatewaySenderQueue pgsq =
-          (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
-      boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-      if (isPrimary) {
-        pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
-        this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender();
-        if (logger.isDebugEnabled()) {
-          logger.debug("register dropped event for primary queue. BucketId is " + bucketId
-              + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
-        }
-      }
-    }
-  }
-
   /**
    * @return the sender
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index adaf928..c7fd370 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -84,8 +84,6 @@ public class GatewaySenderStats {
 
   protected static final String EVENTS_FILTERED = "eventsFiltered";
   protected static final String NOT_QUEUED_EVENTS = "notQueuedEvent";
-  protected static final String NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER =
-      "notQueuedEventAtYetRunningPrimarySender";
 
   protected static final String LOAD_BALANCES_COMPLETED = "loadBalancesCompleted";
   protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress";
@@ -137,8 +135,6 @@ public class GatewaySenderStats {
   protected static int eventsFilteredId;
   /** Id of not queued events */
   protected static int notQueuedEventsId;
-  /** Id of not queued events due to the primary sender is yet running */
-  protected static int notQueuedEventsAtYetRunningPrimarySenderId;
   /** Id of events conflated in batch */
   protected static int eventsConflatedFromBatchesId;
   /** Id of load balances completed */
@@ -217,8 +213,6 @@ public class GatewaySenderStats {
             f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE,
                 "Current number of entries in the conflation indexes map.", "events"),
             f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
-            f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
-                "Number of events not added to primary queue due to sender yet runing.", "events"),
             f.createIntCounter(EVENTS_FILTERED,
                 "Number of events filtered through GatewayEventFilter.", "events"),
             f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed",
@@ -255,8 +249,6 @@ public class GatewaySenderStats {
     unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
     conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
     notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
-    notQueuedEventsAtYetRunningPrimarySenderId =
-        type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
     eventsFilteredId = type.nameToId(EVENTS_FILTERED);
     eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
@@ -607,14 +599,6 @@ public class GatewaySenderStats {
     return this.stats.getInt(notQueuedEventsId);
   }
 
-  public void incEventsNotQueuedAtYetRunningPrimarySender() {
-    this.stats.incInt(notQueuedEventsAtYetRunningPrimarySenderId, 1);
-  }
-
-  public int getEventsNotQueuedAtYetRunningPrimarySender() {
-    return this.stats.getInt(notQueuedEventsAtYetRunningPrimarySenderId);
-  }
-
   public void incEventsFiltered() {
     this.stats.incInt(eventsFilteredId, 1);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index e556910..4fc940c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -121,11 +121,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
     return this.processors[0].getQueue().size();
   }
 
-  public String displayContent() {
-    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue());
-    return pgsq.displayContent();
-  }
-
   public int localSize() {
     return localSize(false);
   }
@@ -195,10 +190,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
     return processors[index];
   }
 
-  public RegionQueue getQueueByBucket(int bucketId) {
-    return getPGSProcessor(bucketId).getQueue();
-  }
-
   public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
     return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 89880fc..3aa8534 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   // This method may need synchronization in case it is used by
   // ConcurrentParallelGatewaySender
-  public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
+  protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
     StoppableReentrantLock lock = buckToDispatchLock;
     if (lock != null) {
       lock.lock();
@@ -1405,28 +1405,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return localSize(false);
   }
 
-  public String displayContent() {
-    int size = 0;
-    StringBuffer sb = new StringBuffer();
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
-      if (prQ != null && prQ.getDataStore() != null) {
-        Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions();
-        for (BucketRegion br : allLocalBuckets) {
-          if (br.size() > 0) {
-            sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";");
-          }
-        }
-      }
-    }
-    return sb.toString();
-  }
-
   public int localSize(boolean includeSecondary) {
     int size = 0;
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
         if (includeSecondary) {
-          size += prQ.getDataStore().getSizeOfLocalBuckets();
+          size += prQ.getDataStore().getSizeOfLocalBuckets(true);
         } else {
           size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
         }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 3799083..226595b 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -933,8 +933,6 @@ public class WANTestBase extends DistributedTestCase {
     }
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + locPort + "]");
-    String logLevel = System.getProperty(LOG_LEVEL, "info");
-    props.setProperty(LOG_LEVEL, logLevel);
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
@@ -1157,21 +1155,6 @@ public class WANTestBase extends DistributedTestCase {
     return stats;
   }
 
-  public static List<Integer> getSenderStatsForDroppedEvents(String senderId) {
-    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
-    GatewaySenderStats statistics = sender.getStatistics();
-    ArrayList<Integer> stats = new ArrayList<Integer>();
-    int eventNotQueued = statistics.getEventsNotQueuedAtYetRunningPrimarySender();
-    if (eventNotQueued > 0) {
-      logger.info(
-          "Found " + eventNotQueued + " not queued events due to primary sender is yet running");
-    }
-    stats.add(eventNotQueued);
-    stats.add(statistics.getEventsNotQueued());
-    stats.add(statistics.getEventsNotQueuedConflated());
-    return stats;
-  }
-
   public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
       final int eventsQueued, final int eventsDistributed) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -2763,21 +2746,11 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateQueueSizeStat(String id, final int queueSize) {
     final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
-    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
         .until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
     assertEquals(queueSize, sender.getEventQueueSize());
   }
 
-  public static void validateSecondaryQueueSizeStat(String id, final int queueSize) {
-    final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
-    Awaitility.await().atMost(120, TimeUnit.SECONDS)
-        .until(() -> assertEquals(
-            "Expected unprocessedEventMap is drained but actual is "
-                + sender.getStatistics().getUnprocessedEventMapSize(),
-            queueSize, sender.getStatistics().getUnprocessedEventMapSize()));
-    assertEquals(queueSize, sender.getStatistics().getUnprocessedEventMapSize());
-  }
-
   /**
    * This method is specifically written for pause and stop operations. This method validates that
    * the region size remains same for at least minimum number of verification attempts and also it
@@ -3080,31 +3053,6 @@ public class WANTestBase extends DistributedTestCase {
     });
   }
 
-  public static Integer getSecondaryQueueContentSize(final String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
-    int size = abstractSender.getEventSecondaryQueueSize();
-    return size;
-  }
-
-  public static String displayQueueContent(final RegionQueue queue) {
-    if (queue instanceof ParallelGatewaySenderQueue) {
-      ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue;
-      return pgsq.displayContent();
-    } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
-      ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue;
-      return pgsq.displayContent();
-    }
-    return null;
-  }
-
   public static Integer getQueueContentSize(final String senderId) {
     return getQueueContentSize(senderId, false);
   }
@@ -3187,22 +3135,14 @@ public class WANTestBase extends DistributedTestCase {
           ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
       Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
           .getAllLocalPrimaryBucketRegions();
-      final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
-      RegionQueue queue = abstractSender.getEventProcessor().queue;
-      Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
-        assertEquals("Expected events in all primary queues are drained but actual is "
-            + abstractSender.getEventQueueSize() + ". Queue content is: "
-            + displayQueueContent(queue), 0, abstractSender.getEventQueueSize());
-      });
-      assertEquals("Expected events in all primary queues after drain is 0", 0,
-          abstractSender.getEventQueueSize());
-      Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
-        assertEquals("Expected events in all secondary queues are drained but actual is "
-            + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
-            + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
-      });
-      assertEquals("Except events in all secondary queues after drain is 0", 0,
-          abstractSender.getEventSecondaryQueueSize());
+      for (final BucketRegion bucket : buckets) {
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+          assertEquals("Expected bucket entries for bucket: " + bucket.getId()
+              + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: "
+              + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0,
+              bucket.keySet().size());
+        });
+      } // for loop ends
     } finally {
       exp.remove();
       exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index f5b98b7..eaef4f9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -20,8 +20,6 @@ import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_S
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.ArrayList;
-
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -409,42 +407,18 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200));
 
     // SECOND RUN: start async puts on region
-    ArrayList<Integer> vm4List = null;
-    ArrayList<Integer> vm5List = null;
-    ArrayList<Integer> vm6List = null;
-    ArrayList<Integer> vm7List = null;
-    boolean foundDroppedAtYetStartedPrimarySender = false;
-    int count = 0;
-
-    do {
-      stopSenders();
-      AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
-
-      // when puts are happening by another thread, start the senders
-      startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
-      async.join();
-      vm4List =
-          (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
-      vm5List =
-          (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
-      vm6List =
-          (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
-      vm7List =
-          (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
-      if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) {
-        foundDroppedAtYetStartedPrimarySender = true;
-      }
-      count++;
-    } while (foundDroppedAtYetStartedPrimarySender == false && count < 5);
-    assertThat(foundDroppedAtYetStartedPrimarySender);
+    AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
+
+    // when puts are happening by another thread, start the senders
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    async.join();
 
     // verify all the buckets on all the sender nodes are drained
     validateParallelSenderQueueAllBucketsDrained();
 
     // verify that the queue size ultimately becomes zero. That means all the events propagate to
     // remote site.
-
     vm4.invoke(() -> validateQueueContents("ln", 0));
   }
 
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index 8df5650..ee43b83 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -269,53 +269,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createSenderCaches(lnPort);
-
-    createSenderVM4();
-    createSenderVM5();
-
-    createReceiverRegions();
-
-    createSenderRegions();
-
-    vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
-
-    startSenderInVMs("ln", vm4, vm5);
-
-    vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
-
-    vm4.invoke(() -> WANTestBase.stopSender("ln"));
-    vm5.invoke(() -> WANTestBase.stopSender("ln"));
-
-    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
-    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
-
-    vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
-    vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
-
-    // do a lot of puts while senders are restarting
-    AsyncInvocation async = vm7.invokeAsync(() -> doPuts(getTestMethodName() + "_RR", 5000));
-
-    startSenderInVMsAsync("ln", vm4, vm5);
-    async.join();
-
-    vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
-    vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
-    vm4.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
-    vm5.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
-  }
-
-  @Test
   public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -345,7 +298,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
 
-    // Do some puts from both vm4 and vm5 while restarting a sender
+    // Do some puts while restarting a sender
     AsyncInvocation asyncPuts =
         vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 300));
 

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.