You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/11/25 22:35:46 UTC

[geode] branch feature/GEODE-7435 created (now 2b0afb9)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a change to branch feature/GEODE-7435
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 2b0afb9  GEODE-7435 - Improve performance of GMSMembershipManager.handleOrDeferMessage

This branch includes the following new commits:

     new 2b0afb9  GEODE-7435 - Improve performance of GMSMembershipManager.handleOrDeferMessage

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-7435 - Improve performance of GMSMembershipManager.handleOrDeferMessage

Posted by bs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7435
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 2b0afb93b9d0ba5c9ff41a3225b980c028c1986c
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Nov 25 14:35:05 2019 -0800

    GEODE-7435 - Improve performance of GMSMembershipManager.handleOrDeferMessage
    
    Modified the method to avoid synchronization unless we're in an initialization phase.
    While initializaing we synchronize and queue messages unless we're told the
    queue has been drained.
---
 .../internal/membership/gms/GMSMembership.java     | 63 ++++++++++++----------
 1 file changed, 34 insertions(+), 29 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
index 510c420..7572918 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
@@ -69,7 +69,6 @@ import org.apache.geode.distributed.internal.membership.gms.api.MessageListener;
 import org.apache.geode.distributed.internal.membership.gms.api.QuorumChecker;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.GMSMessage;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
-import org.apache.geode.internal.cache.partitioned.PartitionMessageWithDirectReply;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.internal.tcp.ConnectionException;
@@ -342,6 +341,12 @@ public class GMSMembership implements Membership {
   private volatile boolean processingEvents = false;
 
   /**
+   * Set to true under startupLock when processingEvents has been set to true
+   * and startup messages have been removed from the queue and dispatched
+   */
+  private boolean startupMessagesDrained = false;
+
+  /**
    * This is the latest viewId installed
    */
   private long latestViewId = -1;
@@ -730,9 +735,11 @@ public class GMSMembership implements Membership {
    * @param member the member
    */
   protected void handleOrDeferSurpriseConnect(InternalDistributedMember member) {
-    synchronized (startupLock) {
-      if (!processingEvents) {
-        startupMessages.add(new StartupEvent(member));
+    if (!processingEvents) {
+      synchronized (startupLock) {
+        if (!startupMessagesDrained) {
+          startupMessages.add(new StartupEvent(member));
+        }
         return;
       }
     }
@@ -888,19 +895,14 @@ public class GMSMembership implements Membership {
    * @param msg the message to process
    */
   protected void handleOrDeferMessage(DistributionMessage msg) {
-    synchronized (startupLock) {
-      if (beingSick || playingDead) {
-        // cache operations are blocked in a "sick" member
-        if (msg.containsRegionContentChange() || msg instanceof PartitionMessageWithDirectReply) {
-          startupMessages.add(new StartupEvent(msg));
-          return;
-        }
-      }
-      if (!processingEvents) {
-        startupMessages.add(new StartupEvent(msg));
-        return;
-      }
-    }
+    // if (!processingEvents) {
+    // synchronized(startupLock) {
+    // if (!startupMessagesDrained) {
+    // startupMessages.add(new StartupEvent(msg));
+    // return;
+    // }
+    // }
+    // }
     dispatchMessage(msg);
   }
 
@@ -1013,10 +1015,12 @@ public class GMSMembership implements Membership {
     }
     latestViewWriteLock.lock();
     try {
-      synchronized (startupLock) {
-        if (!processingEvents) {
-          startupMessages.add(new StartupEvent(viewArg));
-          return;
+      if (!processingEvents) {
+        synchronized (startupLock) {
+          if (!startupMessagesDrained) {
+            startupMessages.add(new StartupEvent(viewArg));
+            return;
+          }
         }
       }
       // view processing can take a while, so we use a separate thread
@@ -1042,10 +1046,8 @@ public class GMSMembership implements Membership {
   protected void handleOrDeferSuspect(SuspectMember suspectInfo) {
     latestViewWriteLock.lock();
     try {
-      synchronized (startupLock) {
-        if (!processingEvents) {
-          return;
-        }
+      if (!processingEvents) {
+        return;
       }
       InternalDistributedMember suspect = gmsMemberToDMember(suspectInfo.suspectedMember);
       InternalDistributedMember who = gmsMemberToDMember(suspectInfo.whoSuspected);
@@ -1121,11 +1123,13 @@ public class GMSMembership implements Membership {
           int remaining = startupMessages.size();
           if (remaining == 0) {
             // While holding the lock, flip the bit so that
-            // no more events get put into startupMessages, and
-            // notify all waiters to proceed.
+            // no more events get put into startupMessages
+            startupMessagesDrained = true;
+            // set the volatile boolean that states that queueing is completely done now
             processingEvents = true;
+            // notify any threads waiting for event processing that we're open for business
             startupLock.notifyAll();
-            break; // ...and we're done.
+            break;
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Membership: {} remaining startup message(s)", remaining);
@@ -1163,8 +1167,9 @@ public class GMSMembership implements Membership {
       services.getCancelCriterion().checkCancelInProgress(null);
       synchronized (startupLock) {
         // Now check using a memory fence and synchronization.
-        if (processingEvents)
+        if (processingEvents && startupMessagesDrained) {
           break;
+        }
         boolean interrupted = Thread.interrupted();
         try {
           startupLock.wait();