You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2021/01/06 19:41:22 UTC

[geode] branch support/1.12 updated: GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new ec2d79e  GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870)
ec2d79e is described below

commit ec2d79ebfd0b85fc90203c3c114c4b3eec37602f
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Jan 6 09:56:54 2021 -0800

    GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870)
    
    reverting 3ed37a754d789bb52cf190db23088e819955fd58
    
    (cherry picked from commit ab16f68c7c3b121af00c3aca64a92d9809cb6019)
---
 .../cache/wan/serial/SerialGatewaySenderQueue.java | 124 ++++++++-------------
 1 file changed, 49 insertions(+), 75 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 0e19ebb..437455b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
 
@@ -141,13 +140,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
   private boolean isDiskSynchronous;
 
   /**
-   * The writeLock of this concurrent lock is used to protect access to the queue.
-   * It is implemented as a fair lock to ensure FIFO ordering of queueing attempts.
-   * Otherwise threads can be unfairly delayed.
-   */
-  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-
-  /**
    * The <code>Map</code> mapping the regionName->key to the queue key. This index allows fast
    * updating of entries in the queue for conflation.
    */
@@ -219,23 +211,18 @@ public class SerialGatewaySenderQueue implements RegionQueue {
   }
 
   @Override
-  public boolean put(Object event) throws CacheException {
-    lock.writeLock().lock();
-    try {
-      GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
-      final Region r = eventImpl.getRegion();
-      final boolean isPDXRegion =
-          (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
-      final boolean isWbcl =
-          this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
-      if (!(isPDXRegion && isWbcl)) {
-        putAndGetKey(event);
-        return true;
-      }
-      return false;
-    } finally {
-      lock.writeLock().unlock();
+  public synchronized boolean put(Object event) throws CacheException {
+    GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
+    final Region r = eventImpl.getRegion();
+    final boolean isPDXRegion =
+        (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
+    final boolean isWbcl =
+        this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
+    if (!(isPDXRegion && isWbcl)) {
+      putAndGetKey(event);
+      return true;
     }
+    return false;
   }
 
   private long putAndGetKey(Object object) throws CacheException {
@@ -259,6 +246,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     return key.longValue();
   }
 
+
   @Override
   public AsyncEvent take() throws CacheException {
     // Unsupported since we have no callers.
@@ -280,49 +268,44 @@ public class SerialGatewaySenderQueue implements RegionQueue {
    * have peeked. If the entry was not peeked, this method will silently return.
    */
   @Override
-  public void remove() throws CacheException {
-    lock.writeLock().lock();
+  public synchronized void remove() throws CacheException {
+    if (this.peekedIds.isEmpty()) {
+      return;
+    }
+    Long key = this.peekedIds.remove();
     try {
-      if (this.peekedIds.isEmpty()) {
-        return;
-      }
-      Long key = this.peekedIds.remove();
-      try {
-        // Increment the head key
-        updateHeadKey(key.longValue());
-        removeIndex(key);
-        // Remove the entry at that key with a callback arg signifying it is
-        // a WAN queue so that AbstractRegionEntry.destroy can get the value
-        // even if it has been evicted to disk. In the normal case, the
-        // AbstractRegionEntry.destroy only gets the value in the VM.
-        this.region.localDestroy(key, WAN_QUEUE_TOKEN);
-        this.stats.decQueueSize();
-
-      } catch (EntryNotFoundException ok) {
-        // this is acceptable because the conflation can remove entries
-        // out from underneath us.
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
-              this, key);
-        }
-      }
-
-      boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
-      this.lastDispatchedKey = key;
-      if (wasEmpty) {
-        synchronized (this) {
-          notifyAll();
-        }
-      }
+      // Increment the head key
+      updateHeadKey(key.longValue());
+      removeIndex(key);
+      // Remove the entry at that key with a callback arg signifying it is
+      // a WAN queue so that AbstractRegionEntry.destroy can get the value
+      // even if it has been evicted to disk. In the normal case, the
+      // AbstractRegionEntry.destroy only gets the value in the VM.
+      this.region.localDestroy(key, WAN_QUEUE_TOKEN);
+      this.stats.decQueueSize();
 
+    } catch (EntryNotFoundException ok) {
+      // this is acceptable because the conflation can remove entries
+      // out from underneath us.
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
-            this, key, this.lastDispatchedKey, this.lastDestroyedKey);
+            "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
+            this, key);
+      }
+    }
+
+    boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
+    this.lastDispatchedKey = key;
+    if (wasEmpty) {
+      synchronized (this) {
+        notifyAll();
       }
-    } finally {
-      lock.writeLock().unlock();
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
+          this, key, this.lastDispatchedKey, this.lastDestroyedKey);
     }
   }
 
@@ -463,8 +446,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
       Object key = object.getKeyToConflate();
       Long previousIndex;
 
-      lock.writeLock().lock();
-      try {
+      synchronized (this) {
         Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
         if (latestIndexesForRegion == null) {
           latestIndexesForRegion = new HashMap<Object, Long>();
@@ -472,8 +454,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
         }
 
         previousIndex = latestIndexesForRegion.put(key, tailKey);
-      } finally {
-        lock.writeLock().unlock();
       }
 
       if (isDebugEnabled) {
@@ -554,7 +534,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
   }
 
   /*
-   * this must be invoked with lock.writeLock() held
+   * this must be invoked under synchronization
    */
   private void removeIndex(Long qkey) {
     // Determine whether conflation is enabled for this queue and object
@@ -742,8 +722,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     if (tailKey.get() != -1) {
       return;
     }
-    lock.writeLock().lock();
-    try {
+    synchronized (this) {
       long largestKey = -1;
       long largestKeyLessThanHalfMax = -1;
       long smallestKey = -1;
@@ -791,8 +770,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
         logger.debug("{}: Initialized tail key to: {}, head key to: {}", this, this.tailKey,
             this.headKey);
       }
-    } finally {
-      lock.writeLock().unlock();
     }
   }
 
@@ -1031,8 +1008,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
             }
 
             long temp;
-            lock.writeLock().lock();
-            try {
+            synchronized (SerialGatewaySenderQueue.this) {
               temp = lastDispatchedKey;
               boolean wasEmpty = temp == lastDestroyedKey;
               while (lastDispatchedKey == lastDestroyedKey) {
@@ -1041,8 +1017,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
               }
               if (wasEmpty)
                 continue;
-            } finally {
-              lock.writeLock().unlock();
             }
             // release not needed since disallowOffHeapValues called
             EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY,