You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 16:58:15 UTC

[2/2] geode git commit: Fix BlockingHARegionJUnitTest

Fix BlockingHARegionJUnitTest


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9c18bcc3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9c18bcc3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9c18bcc3

Branch: refs/heads/feature/GEODE-2632-18
Commit: 9c18bcc3ffa0eaea55ddc1b0d921470730b8b797
Parents: 283215f
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 31 09:57:42 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 31 09:57:42 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/ha/HARegionQueue.java  |  155 +--
 .../cache/tier/sockets/CacheClientNotifier.java | 1037 +++++++++++-------
 2 files changed, 682 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/9c18bcc3/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 66e34b9..f75a912 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -2057,21 +2057,6 @@ public class HARegionQueue implements RegionQueue {
    * a single peek thread.
    */
   private static class BlockingHARegionQueue extends HARegionQueue {
-
-    private static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
-        DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
-
-    private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
-
-    /**
-     * System property name for indicating how much frequently the "Queue full" message should be
-     * logged.
-     */
-    private static final String MAX_QUEUE_LOG_FREQUENCY =
-        DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
-
-    private static final long DEFAULT_LOG_FREQUENCY = 1000;
-
     /**
      * Guards the Put permits
      */
@@ -2094,25 +2079,14 @@ public class HARegionQueue implements RegionQueue {
      */
     private final Object permitMon = new Object();
 
-    /**
-     * Lock on which the take & remove threads block awaiting data from put operations
-     */
+    // Lock on which the take & remove threads block awaiting data from put
+    // operations
     private final StoppableReentrantLock lock;
 
     /**
      * Condition object on which peek & take threads will block
      */
-    final StoppableCondition blockCond;
-
-    /**
-     * System property value denoting the time in milliseconds. Any thread putting an event into a
-     * subscription queue, which is full, will wait this much time for the queue to make space.
-     * It'll then enqueue the event possibly causing the queue to grow beyond its capacity/max-size.
-     * See #51400.
-     */
-    private final int enqueueEventWaitTime;
-
-    private final long logFrequency;
+    protected final StoppableCondition blockCond;
 
     /**
      * @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can
@@ -2123,42 +2097,16 @@ public class HARegionQueue implements RegionQueue {
         HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
         final byte clientConflation, boolean isPrimary)
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-
       super(regionName, cache, hrqa, haContainer, clientProxyId, clientConflation, isPrimary);
       this.capacity = hrqa.getBlockingQueueCapacity();
       this.putPermits = this.capacity;
       this.lock = new StoppableReentrantLock(this.region.getCancelCriterion());
-      this.blockCond = this.lock.newCondition();
+      this.blockCond = lock.newCondition();
 
       super.putGIIDataInRegion();
-
-      if (getClass() == BlockingHARegionQueue.class) {
-        this.initialized.set(true);
+      if (this.getClass() == BlockingHARegionQueue.class) {
+        initialized.set(true);
       }
-
-      this.enqueueEventWaitTime = calcEnqueueEventWaitTime();
-      this.logFrequency = calcLogFrequency();
-    }
-
-    private static int calcEnqueueEventWaitTime() {
-      int value = Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
-      if (value < 0) {
-        value = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
-      }
-      return value;
-    }
-
-    private static long calcLogFrequency() {
-      long value;
-      try {
-        value = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
-        if (value <= 0) {
-          value = DEFAULT_LOG_FREQUENCY;
-        }
-      } catch (NumberFormatException ignore) {
-        value = DEFAULT_LOG_FREQUENCY;
-      }
-      return value;
     }
 
     @Override
@@ -2186,55 +2134,56 @@ public class HARegionQueue implements RegionQueue {
      * in the HARegionQueue.
      */
     @Override
-    @SuppressWarnings("TLW_TWO_LOCK_WAIT")
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT")
     void checkQueueSizeConstraint() throws InterruptedException {
-      if (!(this.haContainer instanceof HAContainerMap && isPrimary())) {
-        // Fix for bug 39413
-        return;
-      }
-      if (Thread.interrupted()) {
-        throw new InterruptedException();
-      }
-
-      synchronized (this.putGuard) {
-        if (this.putPermits <= 0) {
-          synchronized (this.permitMon) {
-            if (reconcilePutPermits() <= 0) {
-              if (this.region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
-                this.isClientSlowReciever = true;
-              } else {
-                try {
-                  if ((this.maxQueueSizeHitCount % this.logFrequency) == 0) {
-                    logger.warn(LocalizedMessage.create(
-                        LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL,
-                        new Object[] {this.region.getName()}));
-                    this.maxQueueSizeHitCount = 0;
-                  }
-                  ++this.maxQueueSizeHitCount;
-                  this.region.checkReadiness(); // fix for bug 37581
-                  // TODO: wait called while holding two locks
-                  this.permitMon.wait(this.enqueueEventWaitTime);
-                  this.region.checkReadiness(); // fix for bug 37581
-                  // Fix for #51400. Allow the queue to grow beyond its
-                  // capacity/maxQueueSize, if it is taking a long time to
-                  // drain the queue, either due to a slower client or the
-                  // deadlock scenario mentioned in the ticket.
-                  reconcilePutPermits();
-                  if (this.maxQueueSizeHitCount % this.logFrequency == 1) {
-                    logger.info(LocalizedMessage
-                        .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS));
+      if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
+        if (Thread.interrupted())
+          throw new InterruptedException();
+        synchronized (this.putGuard) {
+          if (putPermits <= 0) {
+            synchronized (this.permitMon) {
+              if (reconcilePutPermits() <= 0) {
+                if (region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
+                  isClientSlowReciever = true;
+                } else {
+                  try {
+                    long logFrequency = CacheClientNotifier.DEFAULT_LOG_FREQUENCY;
+                    CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+                    if (ccn != null) { // check needed for junit tests
+                      logFrequency = ccn.getLogFrequency();
+                    }
+                    if ((this.maxQueueSizeHitCount % logFrequency) == 0) {
+                      logger.warn(LocalizedMessage.create(
+                          LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL,
+                          new Object[] {region.getName()}));
+                      this.maxQueueSizeHitCount = 0;
+                    }
+                    ++this.maxQueueSizeHitCount;
+                    this.region.checkReadiness(); // fix for bug 37581
+                    // TODO: wait called while holding two locks
+                    this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
+                    this.region.checkReadiness(); // fix for bug 37581
+                    // Fix for #51400. Allow the queue to grow beyond its
+                    // capacity/maxQueueSize, if it is taking a long time to
+                    // drain the queue, either due to a slower client or the
+                    // deadlock scenario mentioned in the ticket.
+                    reconcilePutPermits();
+                    if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
+                      logger.info(LocalizedMessage
+                          .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS));
+                    }
+                  } catch (InterruptedException ex) {
+                    // TODO: The line below is meaningless. Comment it out later
+                    this.permitMon.notifyAll();
+                    throw ex;
                   }
-                } catch (InterruptedException ex) {
-                  // TODO: The line below is meaningless. Comment it out later
-                  this.permitMon.notifyAll();
-                  throw ex;
                 }
               }
-            }
-          } // synchronized (this.permitMon)
-        } // if (putPermits <= 0)
-        --this.putPermits;
-      } // synchronized (this.putGuard)
+            } // synchronized (this.permitMon)
+          } // if (putPermits <= 0)
+          --putPermits;
+        } // synchronized (this.putGuard)
+      }
     }
 
     /**