You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/01 18:13:16 UTC

[geode] 01/01: GEODE-7920: Geode UDP INT thread found processing cache operations

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7920
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8f24abbc7bade3f8e65a52a1a0402343e12ba2b5
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Apr 1 11:10:26 2020 -0700

    GEODE-7920: Geode UDP INT thread found processing cache operations
    
    Modified DistributionMessage to look for JGroups "internal" executor
    threads.  We thought we'd turned off all JGroups thread pools but this
    one is still around.  We don't want to process DistributionMessages in
    these threads unless absolutely necessary since they're needed when
    processing incoming messages.
---
 .../internal/ClusterOperationExecutors.java           |  2 +-
 .../distributed/internal/DistributionMessage.java     | 19 +++++++++++++------
 .../geode/distributed/internal/ShutdownMessage.java   |  2 +-
 .../internal/ThrottlingMemLinkedQueueWithDMStats.java |  2 +-
 .../distributed/internal/DistributionMessageTest.java | 12 ++++++++++++
 5 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
index d16e7fd..7ea6532 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
@@ -737,7 +737,7 @@ public class ClusterOperationExecutors implements OperationExecutors {
       // UDP readers are throttled in the FC protocol, which queries
       // the queue to see if it should throttle
       if (stats.getInternalSerialQueueBytes() > TOTAL_SERIAL_QUEUE_THROTTLE
-          && !DistributionMessage.isPreciousThread()) {
+          && !DistributionMessage.isMembershipMessengerThread()) {
         do {
           boolean interrupted = Thread.interrupted();
           try {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
index 4fed3c1..0f11b6b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
@@ -412,7 +412,8 @@ public abstract class DistributionMessage
    */
   protected void schedule(final ClusterDistributionManager dm) {
     boolean inlineProcess = INLINE_PROCESS
-        && getProcessorType() == OperationExecutors.SERIAL_EXECUTOR && !isPreciousThread();
+        && getProcessorType() == OperationExecutors.SERIAL_EXECUTOR
+        && !isMembershipMessengerThread();
 
     boolean forceInline = this.acker != null || getInlineProcess() || Connection.isDominoThread();
 
@@ -476,13 +477,19 @@ public abstract class DistributionMessage
   }
 
   /**
-   * returns true if the current thread should not be used for inline processing. i.e., it is a
-   * "precious" resource
+   * returns true if the current thread should not be used for inline processing because it
+   * is responsible for reading geode-membership messages. Blocking such a thread can cause
+   * a server to be kicked out
    */
-  public static boolean isPreciousThread() {
+  public static boolean isMembershipMessengerThread() {
     String thrname = Thread.currentThread().getName();
-    // return thrname.startsWith("Geode UDP");
-    return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver");
+
+    return isMembershipMessengerThreadName(thrname);
+  }
+
+  public static boolean isMembershipMessengerThreadName(String thrname) {
+    return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver")
+        || thrname.startsWith("Geode UDP");
   }
 
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java
index 66cadc4..2e1ebc4 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java
@@ -79,7 +79,7 @@ public class ShutdownMessage extends HighPriorityDistributionMessage
     // reply.setRecipient(getSender());
     // can't send a response in a UDP receiver thread or we might miss
     // the other side going away due to blocking receipt of views
-    // if (DistributionMessage.isPreciousThread()) {
+    // if (DistributionMessage.isMembershipMessengerThread()) {
     // dm.getWaitingThreadPool().execute(new Runnable() {
     // public void run() {
     // dm.putOutgoing(reply);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
index a7a8c21..2432cd9 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
@@ -102,7 +102,7 @@ public class ThrottlingMemLinkedQueueWithDMStats<E> extends OverflowQueueWithDMS
       throw new InterruptedException();
     // only block threads reading from tcp stream sockets. blocking udp
     // will cause retransmission storms
-    if (!DistributionMessage.isPreciousThread()) {
+    if (!DistributionMessage.isMembershipMessengerThread()) {
       long startTime = DistributionStats.getStatTime();
       do {
         try {
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java
index 91027e1..7bc854a 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java
@@ -14,10 +14,14 @@
  */
 package org.apache.geode.distributed.internal;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -35,4 +39,12 @@ public class DistributionMessageTest {
 
     verify(mockDistributionMessage, times(1)).setReplySender(mockReplySender);
   }
+
+  @Test
+  public void membershipMessengerThreadsAreRecognized() {
+    List<String> threadNames = Arrays.asList("unicast receiver", "multicast receiver", "Geode UDP");
+    for (String threadName : threadNames) {
+      assertThat(DistributionMessage.isMembershipMessengerThreadName(threadName)).isTrue();
+    }
+  }
 }