You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2022/06/15 17:58:00 UTC

[geode] branch support/1.15 updated: GEODE-10380: use waitingThreadPool to notify dispatcher at re_auth (#7801)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new 1869f2c066 GEODE-10380: use waitingThreadPool to notify dispatcher at re_auth (#7801)
1869f2c066 is described below

commit 1869f2c06681bb73de727d2080d76c6215db9fb9
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Wed Jun 15 10:56:35 2022 -0700

    GEODE-10380: use waitingThreadPool to notify dispatcher at re_auth (#7801)
    
    
    (cherry picked from commit b3fef2a9989ecb5897325a7a84377a8ac7d30028)
---
 .../cache/tier/sockets/CacheClientProxy.java       |  7 +++-
 .../cache/tier/sockets/CacheClientProxyTest.java   | 43 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index bf57daae16..57ffcfc341 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -727,7 +728,11 @@ public class CacheClientProxy implements ClientSession {
     if (_messageDispatcher == null) {
       return;
     }
-    _messageDispatcher.notifyReAuthentication();
+
+    // use another thread to do the notification so that the server operation won't be blocked
+    ExecutorService threadPool =
+        _cache.getDistributionManager().getExecutors().getWaitingThreadPool();
+    threadPool.submit(() -> _messageDispatcher.notifyReAuthentication());
   }
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index 3b4c63f82e..9b150be3f3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -20,7 +20,9 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -30,12 +32,18 @@ import static org.mockito.Mockito.when;
 
 import java.net.InetAddress;
 import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.shiro.subject.Subject;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.OperationExecutors;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.CacheClientProxyStatsFactory;
@@ -43,6 +51,7 @@ import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.MessageDisp
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class CacheClientProxyTest {
   private CacheClientProxy proxyWithSingleUser;
@@ -71,6 +80,7 @@ public class CacheClientProxyTest {
     when(socket.getInetAddress()).thenReturn(inetAddress);
     when(notifier.getAcceptorStats()).thenReturn(stats);
     id = mock(ClientProxyMembershipID.class);
+    when(id.getDurableId()).thenReturn("proxy_id");
     version = KnownVersion.TEST_VERSION;
     securityService = mock(SecurityService.class);
     subject = mock(Subject.class);
@@ -175,4 +185,37 @@ public class CacheClientProxyTest {
     verify(subject, never()).logout();
     verify(clientUserAuths, times(1)).cleanup(anyBoolean());
   }
+
+  @Rule
+  public ExecutorServiceRule executorService = new ExecutorServiceRule();
+
+  @Test
+  public void notifyReAuthenticationIsNotBlocked() {
+    CacheClientProxy spy = spy(proxyWithSingleUser);
+    MessageDispatcher dispatcher = mock(MessageDispatcher.class);
+    doReturn(dispatcher).when(spy).createMessageDispatcher(any());
+    spy.initializeMessageDispatcher();
+    DistributionManager manager = mock(DistributionManager.class);
+    OperationExecutors executors = mock(OperationExecutors.class);
+    ExecutorService executor = executorService.getExecutorService();
+    when(cache.getDistributionManager()).thenReturn(manager);
+    when(manager.getExecutors()).thenReturn(executors);
+    when(executors.getWaitingThreadPool()).thenReturn(executor);
+
+    AtomicBoolean updated = new AtomicBoolean(false);
+
+    // simulating a blocked message dispatcher when notify reauth
+    doAnswer((Answer<Void>) invocation -> {
+      while (!updated.get()) {
+        Thread.sleep(200);
+      }
+      return null;
+    }).when(dispatcher).notifyReAuthentication();
+
+    // proxy.notifyReauthentication won't be blocked
+    spy.notifyReAuthentication();
+    assertThat(updated.get()).isFalse();
+  }
+
+
 }