You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2021/01/06 19:41:22 UTC
[geode] branch support/1.12 updated: GEODE-5922: concurrency
problems in SerialGatewaySenderQueue (#5870)
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new ec2d79e GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870)
ec2d79e is described below
commit ec2d79ebfd0b85fc90203c3c114c4b3eec37602f
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Jan 6 09:56:54 2021 -0800
GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870)
reverting 3ed37a754d789bb52cf190db23088e819955fd58
(cherry picked from commit ab16f68c7c3b121af00c3aca64a92d9809cb6019)
---
.../cache/wan/serial/SerialGatewaySenderQueue.java | 124 ++++++++-------------
1 file changed, 49 insertions(+), 75 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 0e19ebb..437455b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -26,7 +26,6 @@ import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
@@ -141,13 +140,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
private boolean isDiskSynchronous;
/**
- * The writeLock of this concurrent lock is used to protect access to the queue.
- * It is implemented as a fair lock to ensure FIFO ordering of queueing attempts.
- * Otherwise threads can be unfairly delayed.
- */
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-
- /**
* The <code>Map</code> mapping the regionName->key to the queue key. This index allows fast
* updating of entries in the queue for conflation.
*/
@@ -219,23 +211,18 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
@Override
- public boolean put(Object event) throws CacheException {
- lock.writeLock().lock();
- try {
- GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
- final Region r = eventImpl.getRegion();
- final boolean isPDXRegion =
- (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
- final boolean isWbcl =
- this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
- if (!(isPDXRegion && isWbcl)) {
- putAndGetKey(event);
- return true;
- }
- return false;
- } finally {
- lock.writeLock().unlock();
+ public synchronized boolean put(Object event) throws CacheException {
+ GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
+ final Region r = eventImpl.getRegion();
+ final boolean isPDXRegion =
+ (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
+ final boolean isWbcl =
+ this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
+ if (!(isPDXRegion && isWbcl)) {
+ putAndGetKey(event);
+ return true;
}
+ return false;
}
private long putAndGetKey(Object object) throws CacheException {
@@ -259,6 +246,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
return key.longValue();
}
+
@Override
public AsyncEvent take() throws CacheException {
// Unsupported since we have no callers.
@@ -280,49 +268,44 @@ public class SerialGatewaySenderQueue implements RegionQueue {
* have peeked. If the entry was not peeked, this method will silently return.
*/
@Override
- public void remove() throws CacheException {
- lock.writeLock().lock();
+ public synchronized void remove() throws CacheException {
+ if (this.peekedIds.isEmpty()) {
+ return;
+ }
+ Long key = this.peekedIds.remove();
try {
- if (this.peekedIds.isEmpty()) {
- return;
- }
- Long key = this.peekedIds.remove();
- try {
- // Increment the head key
- updateHeadKey(key.longValue());
- removeIndex(key);
- // Remove the entry at that key with a callback arg signifying it is
- // a WAN queue so that AbstractRegionEntry.destroy can get the value
- // even if it has been evicted to disk. In the normal case, the
- // AbstractRegionEntry.destroy only gets the value in the VM.
- this.region.localDestroy(key, WAN_QUEUE_TOKEN);
- this.stats.decQueueSize();
-
- } catch (EntryNotFoundException ok) {
- // this is acceptable because the conflation can remove entries
- // out from underneath us.
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
- this, key);
- }
- }
-
- boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
- this.lastDispatchedKey = key;
- if (wasEmpty) {
- synchronized (this) {
- notifyAll();
- }
- }
+ // Increment the head key
+ updateHeadKey(key.longValue());
+ removeIndex(key);
+ // Remove the entry at that key with a callback arg signifying it is
+ // a WAN queue so that AbstractRegionEntry.destroy can get the value
+ // even if it has been evicted to disk. In the normal case, the
+ // AbstractRegionEntry.destroy only gets the value in the VM.
+ this.region.localDestroy(key, WAN_QUEUE_TOKEN);
+ this.stats.decQueueSize();
+ } catch (EntryNotFoundException ok) {
+ // this is acceptable because the conflation can remove entries
+ // out from underneath us.
if (logger.isDebugEnabled()) {
logger.debug(
- "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
- this, key, this.lastDispatchedKey, this.lastDestroyedKey);
+ "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
+ this, key);
+ }
+ }
+
+ boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
+ this.lastDispatchedKey = key;
+ if (wasEmpty) {
+ synchronized (this) {
+ notifyAll();
}
- } finally {
- lock.writeLock().unlock();
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
+ this, key, this.lastDispatchedKey, this.lastDestroyedKey);
}
}
@@ -463,8 +446,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
Object key = object.getKeyToConflate();
Long previousIndex;
- lock.writeLock().lock();
- try {
+ synchronized (this) {
Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
if (latestIndexesForRegion == null) {
latestIndexesForRegion = new HashMap<Object, Long>();
@@ -472,8 +454,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
previousIndex = latestIndexesForRegion.put(key, tailKey);
- } finally {
- lock.writeLock().unlock();
}
if (isDebugEnabled) {
@@ -554,7 +534,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
/*
- * this must be invoked with lock.writeLock() held
+ * this must be invoked under synchronization
*/
private void removeIndex(Long qkey) {
// Determine whether conflation is enabled for this queue and object
@@ -742,8 +722,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
if (tailKey.get() != -1) {
return;
}
- lock.writeLock().lock();
- try {
+ synchronized (this) {
long largestKey = -1;
long largestKeyLessThanHalfMax = -1;
long smallestKey = -1;
@@ -791,8 +770,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
logger.debug("{}: Initialized tail key to: {}, head key to: {}", this, this.tailKey,
this.headKey);
}
- } finally {
- lock.writeLock().unlock();
}
}
@@ -1031,8 +1008,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
long temp;
- lock.writeLock().lock();
- try {
+ synchronized (SerialGatewaySenderQueue.this) {
temp = lastDispatchedKey;
boolean wasEmpty = temp == lastDestroyedKey;
while (lastDispatchedKey == lastDestroyedKey) {
@@ -1041,8 +1017,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
if (wasEmpty)
continue;
- } finally {
- lock.writeLock().unlock();
}
// release not needed since disallowOffHeapValues called
EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY,