You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2021/02/17 02:42:33 UTC
[geode] branch support/1.12 updated: GEODE-8947: Use waiting thread
pool only in limited senario. (#6038)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new 3a91d92 GEODE-8947: Use waiting thread pool only in limited senario. (#6038)
3a91d92 is described below
commit 3a91d92b910a8acf39cfb64d57e870566cc7bfb6
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Tue Feb 16 17:47:27 2021 -0800
GEODE-8947: Use waiting thread pool only in limited senario. (#6038)
* GEODE-8947: Use waiting thread pool only in limited senario.
* Only when processing transactional message and when conserve-sockets set to true case, a
separate thread in waiting pool will be used to process the message.
* This is to addresses performance issue if there won't be deadlock.
(cherry picked from commit b98cf4a3d1fb5e5d64c2d298827d937183ab2b79)
---
.../internal/cache/tx/RemoteOperationMessage.java | 19 ++++++++-
.../cache/tx/RemoteOperationMessageTest.java | 47 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 1 deletion(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
index ba49d73..a4cbef5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache.tx;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -176,7 +177,23 @@ public abstract class RemoteOperationMessage extends DistributionMessage
sendReply(getSender(), this.processorId, dm, replyException, null, 0);
return;
}
- dm.getExecutors().getWaitingThreadPool().execute(() -> doRemoteOperation(dm, cache));
+
+ if (dm.getSystem().threadOwnsResources()) {
+ // reply inline if thread owns socket.
+ doRemoteOperation(dm, cache);
+ return;
+ }
+
+ if (isTransactional()) {
+ dm.getExecutors().getWaitingThreadPool().execute(() -> doRemoteOperation(dm, cache));
+ } else {
+ // reply inline for non-transactional case.
+ doRemoteOperation(dm, cache);
+ }
+ }
+
+ boolean isTransactional() {
+ return getTXUniqId() != TXManagerImpl.NOTX && canParticipateInTransaction();
}
void doRemoteOperation(ClusterDistributionManager dm, InternalCache cache) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
index e10c2f0..aa175d3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
@@ -21,6 +21,7 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -302,6 +303,52 @@ public class RemoteOperationMessageTest {
.hasMessageContaining("system failure");
}
+
+ @Test
+ public void processInvokesDoRemoteOperationIfThreadOwnsResources() {
+ when(system.threadOwnsResources()).thenReturn(true);
+ doNothing().when(msg).doRemoteOperation(dm, cache);
+
+ msg.process(dm);
+
+ verify(msg).doRemoteOperation(dm, cache);
+ verify(msg, never()).isTransactional();
+ }
+
+ @Test
+ public void processInvokesDoRemoteOperationIfThreadDoesNotOwnResourcesAndNotTransactional() {
+ when(system.threadOwnsResources()).thenReturn(false);
+ doReturn(false).when(msg).isTransactional();
+ doNothing().when(msg).doRemoteOperation(dm, cache);
+
+ msg.process(dm);
+
+ verify(msg).doRemoteOperation(dm, cache);
+ verify(msg).isTransactional();
+ }
+
+ @Test
+ public void isTransactionalReturnsFalseIfTXUniqueIdIsNOTX() {
+ assertThat(msg.getTXUniqId()).isEqualTo(TXManagerImpl.NOTX);
+ assertThat(msg.isTransactional()).isFalse();
+ }
+
+ @Test
+ public void isTransactionalReturnsFalseIfCannotParticipateInTransaction() {
+ doReturn(1).when(msg).getTXUniqId();
+ doReturn(false).when(msg).canParticipateInTransaction();
+
+ assertThat(msg.isTransactional()).isFalse();
+ }
+
+ @Test
+ public void isTransactionalReturnsTrueIfHasTXUniqueIdAndCanParticipateInTransaction() {
+ doReturn(1).when(msg).getTXUniqId();
+
+ assertThat(msg.canParticipateInTransaction()).isTrue();
+ assertThat(msg.isTransactional()).isTrue();
+ }
+
private static class TestableRemoteOperationMessage extends RemoteOperationMessage {
private boolean operationOnRegionResult = true;