You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 16:58:15 UTC
[2/2] geode git commit: Fix BlockingHARegionJUnitTest
Fix BlockingHARegionJUnitTest
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9c18bcc3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9c18bcc3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9c18bcc3
Branch: refs/heads/feature/GEODE-2632-18
Commit: 9c18bcc3ffa0eaea55ddc1b0d921470730b8b797
Parents: 283215f
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 31 09:57:42 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 31 09:57:42 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/ha/HARegionQueue.java | 155 +--
.../cache/tier/sockets/CacheClientNotifier.java | 1037 +++++++++++-------
2 files changed, 682 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/9c18bcc3/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 66e34b9..f75a912 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -2057,21 +2057,6 @@ public class HARegionQueue implements RegionQueue {
* a single peek thread.
*/
private static class BlockingHARegionQueue extends HARegionQueue {
-
- private static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
- DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
-
- private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
-
- /**
- * System property name for indicating how much frequently the "Queue full" message should be
- * logged.
- */
- private static final String MAX_QUEUE_LOG_FREQUENCY =
- DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
-
- private static final long DEFAULT_LOG_FREQUENCY = 1000;
-
/**
* Guards the Put permits
*/
@@ -2094,25 +2079,14 @@ public class HARegionQueue implements RegionQueue {
*/
private final Object permitMon = new Object();
- /**
- * Lock on which the take & remove threads block awaiting data from put operations
- */
+ // Lock on which the take & remove threads block awaiting data from put
+ // operations
private final StoppableReentrantLock lock;
/**
* Condition object on which peek & take threads will block
*/
- final StoppableCondition blockCond;
-
- /**
- * System property value denoting the time in milliseconds. Any thread putting an event into a
- * subscription queue, which is full, will wait this much time for the queue to make space.
- * It'll then enqueue the event possibly causing the queue to grow beyond its capacity/max-size.
- * See #51400.
- */
- private final int enqueueEventWaitTime;
-
- private final long logFrequency;
+ protected final StoppableCondition blockCond;
/**
* @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can
@@ -2123,42 +2097,16 @@ public class HARegionQueue implements RegionQueue {
HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-
super(regionName, cache, hrqa, haContainer, clientProxyId, clientConflation, isPrimary);
this.capacity = hrqa.getBlockingQueueCapacity();
this.putPermits = this.capacity;
this.lock = new StoppableReentrantLock(this.region.getCancelCriterion());
- this.blockCond = this.lock.newCondition();
+ this.blockCond = lock.newCondition();
super.putGIIDataInRegion();
-
- if (getClass() == BlockingHARegionQueue.class) {
- this.initialized.set(true);
+ if (this.getClass() == BlockingHARegionQueue.class) {
+ initialized.set(true);
}
-
- this.enqueueEventWaitTime = calcEnqueueEventWaitTime();
- this.logFrequency = calcLogFrequency();
- }
-
- private static int calcEnqueueEventWaitTime() {
- int value = Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
- if (value < 0) {
- value = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
- }
- return value;
- }
-
- private static long calcLogFrequency() {
- long value;
- try {
- value = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
- if (value <= 0) {
- value = DEFAULT_LOG_FREQUENCY;
- }
- } catch (NumberFormatException ignore) {
- value = DEFAULT_LOG_FREQUENCY;
- }
- return value;
}
@Override
@@ -2186,55 +2134,56 @@ public class HARegionQueue implements RegionQueue {
* in the HARegionQueue.
*/
@Override
- @SuppressWarnings("TLW_TWO_LOCK_WAIT")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT")
void checkQueueSizeConstraint() throws InterruptedException {
- if (!(this.haContainer instanceof HAContainerMap && isPrimary())) {
- // Fix for bug 39413
- return;
- }
- if (Thread.interrupted()) {
- throw new InterruptedException();
- }
-
- synchronized (this.putGuard) {
- if (this.putPermits <= 0) {
- synchronized (this.permitMon) {
- if (reconcilePutPermits() <= 0) {
- if (this.region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
- this.isClientSlowReciever = true;
- } else {
- try {
- if ((this.maxQueueSizeHitCount % this.logFrequency) == 0) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL,
- new Object[] {this.region.getName()}));
- this.maxQueueSizeHitCount = 0;
- }
- ++this.maxQueueSizeHitCount;
- this.region.checkReadiness(); // fix for bug 37581
- // TODO: wait called while holding two locks
- this.permitMon.wait(this.enqueueEventWaitTime);
- this.region.checkReadiness(); // fix for bug 37581
- // Fix for #51400. Allow the queue to grow beyond its
- // capacity/maxQueueSize, if it is taking a long time to
- // drain the queue, either due to a slower client or the
- // deadlock scenario mentioned in the ticket.
- reconcilePutPermits();
- if (this.maxQueueSizeHitCount % this.logFrequency == 1) {
- logger.info(LocalizedMessage
- .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS));
+ if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ synchronized (this.putGuard) {
+ if (putPermits <= 0) {
+ synchronized (this.permitMon) {
+ if (reconcilePutPermits() <= 0) {
+ if (region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
+ isClientSlowReciever = true;
+ } else {
+ try {
+ long logFrequency = CacheClientNotifier.DEFAULT_LOG_FREQUENCY;
+ CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+ if (ccn != null) { // check needed for junit tests
+ logFrequency = ccn.getLogFrequency();
+ }
+ if ((this.maxQueueSizeHitCount % logFrequency) == 0) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL,
+ new Object[] {region.getName()}));
+ this.maxQueueSizeHitCount = 0;
+ }
+ ++this.maxQueueSizeHitCount;
+ this.region.checkReadiness(); // fix for bug 37581
+ // TODO: wait called while holding two locks
+ this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
+ this.region.checkReadiness(); // fix for bug 37581
+ // Fix for #51400. Allow the queue to grow beyond its
+ // capacity/maxQueueSize, if it is taking a long time to
+ // drain the queue, either due to a slower client or the
+ // deadlock scenario mentioned in the ticket.
+ reconcilePutPermits();
+ if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
+ logger.info(LocalizedMessage
+ .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS));
+ }
+ } catch (InterruptedException ex) {
+ // TODO: The line below is meaningless. Comment it out later
+ this.permitMon.notifyAll();
+ throw ex;
}
- } catch (InterruptedException ex) {
- // TODO: The line below is meaningless. Comment it out later
- this.permitMon.notifyAll();
- throw ex;
}
}
- }
- } // synchronized (this.permitMon)
- } // if (putPermits <= 0)
- --this.putPermits;
- } // synchronized (this.putGuard)
+ } // synchronized (this.permitMon)
+ } // if (putPermits <= 0)
+ --putPermits;
+ } // synchronized (this.putGuard)
+ }
}
/**