You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2021/02/17 02:42:33 UTC

[geode] branch support/1.12 updated: GEODE-8947: Use waiting thread pool only in limited senario. (#6038)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 3a91d92  GEODE-8947: Use waiting thread pool only in limited senario. (#6038)
3a91d92 is described below

commit 3a91d92b910a8acf39cfb64d57e870566cc7bfb6
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Tue Feb 16 17:47:27 2021 -0800

    GEODE-8947: Use waiting thread pool only in limited senario. (#6038)
    
    * GEODE-8947: Use waiting thread pool only in limited senario.
    
      * Only when processing transactional message and when conserve-sockets set to true case, a
        separate thread in waiting pool will be used to process the message.
      * This is to addresses performance issue if there won't be deadlock.
    
    (cherry picked from commit b98cf4a3d1fb5e5d64c2d298827d937183ab2b79)
---
 .../internal/cache/tx/RemoteOperationMessage.java  | 19 ++++++++-
 .../cache/tx/RemoteOperationMessageTest.java       | 47 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
index ba49d73..a4cbef5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache.tx;
 
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -176,7 +177,23 @@ public abstract class RemoteOperationMessage extends DistributionMessage
       sendReply(getSender(), this.processorId, dm, replyException, null, 0);
       return;
     }
-    dm.getExecutors().getWaitingThreadPool().execute(() -> doRemoteOperation(dm, cache));
+
+    if (dm.getSystem().threadOwnsResources()) {
+      // reply inline if thread owns socket.
+      doRemoteOperation(dm, cache);
+      return;
+    }
+
+    if (isTransactional()) {
+      dm.getExecutors().getWaitingThreadPool().execute(() -> doRemoteOperation(dm, cache));
+    } else {
+      // reply inline for non-transactional case.
+      doRemoteOperation(dm, cache);
+    }
+  }
+
+  boolean isTransactional() {
+    return getTXUniqId() != TXManagerImpl.NOTX && canParticipateInTransaction();
   }
 
   void doRemoteOperation(ClusterDistributionManager dm, InternalCache cache) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
index e10c2f0..aa175d3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
@@ -21,6 +21,7 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -302,6 +303,52 @@ public class RemoteOperationMessageTest {
         .hasMessageContaining("system failure");
   }
 
+
+  @Test
+  public void processInvokesDoRemoteOperationIfThreadOwnsResources() {
+    when(system.threadOwnsResources()).thenReturn(true);
+    doNothing().when(msg).doRemoteOperation(dm, cache);
+
+    msg.process(dm);
+
+    verify(msg).doRemoteOperation(dm, cache);
+    verify(msg, never()).isTransactional();
+  }
+
+  @Test
+  public void processInvokesDoRemoteOperationIfThreadDoesNotOwnResourcesAndNotTransactional() {
+    when(system.threadOwnsResources()).thenReturn(false);
+    doReturn(false).when(msg).isTransactional();
+    doNothing().when(msg).doRemoteOperation(dm, cache);
+
+    msg.process(dm);
+
+    verify(msg).doRemoteOperation(dm, cache);
+    verify(msg).isTransactional();
+  }
+
+  @Test
+  public void isTransactionalReturnsFalseIfTXUniqueIdIsNOTX() {
+    assertThat(msg.getTXUniqId()).isEqualTo(TXManagerImpl.NOTX);
+    assertThat(msg.isTransactional()).isFalse();
+  }
+
+  @Test
+  public void isTransactionalReturnsFalseIfCannotParticipateInTransaction() {
+    doReturn(1).when(msg).getTXUniqId();
+    doReturn(false).when(msg).canParticipateInTransaction();
+
+    assertThat(msg.isTransactional()).isFalse();
+  }
+
+  @Test
+  public void isTransactionalReturnsTrueIfHasTXUniqueIdAndCanParticipateInTransaction() {
+    doReturn(1).when(msg).getTXUniqId();
+
+    assertThat(msg.canParticipateInTransaction()).isTrue();
+    assertThat(msg.isTransactional()).isTrue();
+  }
+
   private static class TestableRemoteOperationMessage extends RemoteOperationMessage {
 
     private boolean operationOnRegionResult = true;