You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2022/02/15 18:44:51 UTC

[geode] branch support/1.15 updated: GEODE-10042: do not make ClientUserAuths null when we are not unregister client yet. (#7357)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new 5a656ce  GEODE-10042: do not make ClientUserAuths null when we are not unregister client yet. (#7357)
5a656ce is described below

commit 5a656ce402280c377f72f9baf23e422dea528afd
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Tue Feb 15 10:37:18 2022 -0800

    GEODE-10042: do not make ClientUserAuths null when we are not unregister client yet. (#7357)
    
    * make cleanUserAuths synchronized to avoid NPE
    * This also pass down client termination reason when we clean up client threads.
    * since we introduced a lock object for clientUserAuths, revert some  old code to not to catch NPE but use the lock
    * synchronize all clientUserAuths updates.
    
    (cherry picked from commit 592dd652c263c2f043c6f6a11c5fd7f8a61c84a6)
---
 .../cache/tier/sockets/CacheClientProxyTest.java   | 16 +++-
 .../cache/tier/sockets/CacheClientProxy.java       | 43 ++++++-----
 .../cache/tier/sockets/ClientHealthMonitor.java    | 14 ++--
 .../cache/tier/sockets/ServerConnection.java       | 86 ++++++++++++----------
 .../cache/tier/sockets/ServerConnectionTest.java   | 48 +++++++++++-
 5 files changed, 141 insertions(+), 66 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index ca7acf0..62736ff 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -26,8 +26,11 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.util.Scanner;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -52,7 +55,7 @@ import org.apache.geode.test.junit.rules.ServerStarterRule;
 public class CacheClientProxyTest {
 
   @Rule
-  public ServerStarterRule serverRule = new ServerStarterRule().withAutoStart();
+  public ServerStarterRule serverRule = new ServerStarterRule().withLogFile().withAutoStart();
 
   @Rule
   public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@@ -96,11 +99,20 @@ public class CacheClientProxyTest {
   }
 
   @Test
-  public void closeSocket1000Times() {
+  public void closeSocket1000Times() throws FileNotFoundException {
     // run it for 1000 times to introduce conflicts between threads
     for (int i = 0; i < 1000; i++) {
       closeSocketShouldBeAtomic();
     }
+
+    // make sure there is no NPE warning in the log file
+    File logFile = new File(serverRule.getWorkingDir(), "server.log");
+    Scanner scanner = new Scanner(logFile);
+    while (scanner.hasNextLine()) {
+      String line = scanner.nextLine();
+      assertThat(line).describedAs("File: %s, Line: %s", logFile.getAbsolutePath(), line)
+          .doesNotContain("NullPointerException");
+    }
   }
 
   @Test
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 87ac740..43be442 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -785,24 +785,7 @@ public class CacheClientProxy implements ClientSession {
     // Close the Authorization callback or subject if we are not keeping the proxy
     try {
       if (!pauseDurable) {
-        // single user case -- old security
-        if (postAuthzCallback != null) {
-          postAuthzCallback.close();
-          postAuthzCallback = null;
-        }
-        // single user case -- integrated security
-        // connection is closed, so we can log out this subject
-        else if (subject != null) {
-          secureLogger.debug("CacheClientProxy.close, logging out {}. ", subject.getPrincipal());
-          subject.logout();
-          subject = null;
-        }
-        // for multiUser case, in non-durable case, we are closing the connection
-        else if (clientUserAuths != null) {
-          secureLogger.debug("CacheClientProxy.close, cleanup all client subjects. ");
-          clientUserAuths.cleanup(true);
-          clientUserAuths = null;
-        }
+        cleanClientAuths();
       }
     } catch (Exception ex) {
       logger.warn("{}", this, ex);
@@ -812,6 +795,30 @@ public class CacheClientProxy implements ClientSession {
     return keepProxy;
   }
 
+  // this needs to synchronized to avoid NPE between null check and operations
+  private void cleanClientAuths() {
+    synchronized (clientUserAuthsLock) {
+      // single user case -- old security
+      if (postAuthzCallback != null) {
+        postAuthzCallback.close();
+        postAuthzCallback = null;
+      }
+      // single user case -- integrated security
+      // connection is closed, so we can log out this subject
+      else if (subject != null) {
+        secureLogger.debug("CacheClientProxy.close, logging out {}. ", subject.getPrincipal());
+        subject.logout();
+        subject = null;
+      }
+      // for multiUser case, in non-durable case, we are closing the connection
+      else if (clientUserAuths != null) {
+        secureLogger.debug("CacheClientProxy.close, cleanup all client subjects. ");
+        clientUserAuths.cleanup(true);
+        clientUserAuths = null;
+      }
+    }
+  }
+
   protected void pauseDispatching() {
     if (_messageDispatcher == null) {
       return;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index dfa9a0b..6d363c1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -305,11 +305,11 @@ public class ClientHealthMonitor {
   }
 
   public void removeAllConnectionsAndUnregisterClient(ClientProxyMembershipID proxyID,
-      Throwable t) {
+      Throwable throwable) {
     // Remove all connections
-    cleanupClientThreads(proxyID, false);
+    cleanupClientThreads(proxyID, false, throwable);
 
-    unregisterClient(proxyID, false, t);
+    unregisterClient(proxyID, false, throwable);
   }
 
   /**
@@ -499,7 +499,8 @@ public class ClientHealthMonitor {
     return connectedIncomingGateways;
   }
 
-  private boolean cleanupClientThreads(ClientProxyMembershipID proxyID, boolean timedOut) {
+  private boolean cleanupClientThreads(ClientProxyMembershipID proxyID, boolean timedOut,
+      Throwable reason) {
     boolean result = false;
     Set<ServerConnection> serverConnections = null;
     synchronized (proxyIdConnections) {
@@ -512,6 +513,9 @@ public class ClientHealthMonitor {
       if (serverConnections != null) {
         result = true;
         for (ServerConnection serverConnection : serverConnections) {
+          if (reason != null) {
+            serverConnection.setClientDisconnectedException(reason);
+          }
           serverConnection.handleTermination(timedOut);
         }
       }
@@ -803,7 +807,7 @@ public class ClientHealthMonitor {
                 // any of its ServerConnection threads are currently processing
                 // a message. If so, let it go. If not, disconnect it.
                 if (prepareToTerminateIfNoConnectionIsProcessing(proxyID)) {
-                  if (cleanupClientThreads(proxyID, true)) {
+                  if (cleanupClientThreads(proxyID, true, null)) {
                     logger.warn(
                         "Monitoring client with member id {}. It had been {} ms since the latest heartbeat. Max interval is {}. Terminated client.",
                         entry.getKey(), currentTime - latestHeartbeat,
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index fe38216..689325d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -45,6 +45,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.logging.log4j.Logger;
 import org.apache.shiro.subject.Subject;
 import org.apache.shiro.util.ThreadState;
+import org.jetbrains.annotations.TestOnly;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
@@ -261,6 +262,7 @@ public class ServerConnection implements Runnable {
 
 
   private ClientUserAuths clientUserAuths;
+  private final Object clientUserAuthsLock = new Object();
 
   // this is constant(server and client) for first user request, after that it is random
   // this also need to send in handshake
@@ -534,18 +536,13 @@ public class ServerConnection implements Runnable {
   }
 
   private long setUserAuthorizeAndPostAuthorizeRequest(AuthorizeRequest authzRequest,
-      AuthorizeRequestPP postAuthzRequest) throws IOException {
+      AuthorizeRequestPP postAuthzRequest) {
     UserAuthAttributes userAuthAttr = new UserAuthAttributes(authzRequest, postAuthzRequest);
-    if (clientUserAuths == null) {
-      initializeClientUserAuths();
-    }
-    try {
-      return clientUserAuths.putUserAuth(userAuthAttr);
-    } catch (NullPointerException exception) {
-      if (isTerminated()) {
-        throw new IOException("Server connection is terminated.");
+    synchronized (clientUserAuthsLock) {
+      if (clientUserAuths == null) {
+        initializeClientUserAuths();
       }
-      throw exception;
+      return clientUserAuths.putUserAuth(userAuthAttr);
     }
   }
 
@@ -798,7 +795,9 @@ public class ServerConnection implements Runnable {
     if (verifyClientConnection()) {
       initializeCommands();
       if (!getCommunicationMode().isWAN()) {
-        initializeClientUserAuths();
+        synchronized (clientUserAuthsLock) {
+          initializeClientUserAuths();
+        }
       }
     }
     if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
@@ -959,9 +958,13 @@ public class ServerConnection implements Runnable {
     }
   }
 
+  // this needs to be synchronized to avoid NPE between null check and cleanup
   private void cleanClientAuths() {
-    if (clientUserAuths != null) {
-      clientUserAuths.cleanup(false);
+    synchronized (clientUserAuthsLock) {
+      if (clientUserAuths != null) {
+        clientUserAuths.cleanup(false);
+        clientUserAuths = null;
+      }
     }
   }
 
@@ -1042,12 +1045,7 @@ public class ServerConnection implements Runnable {
         chmRegistered = false;
       }
     }
-    if (unregisterClient) {
-      // last server connection call all close on auth objects
-      secureLogger.debug("ServerConnection.handleTermination clean client auths");
-      cleanClientAuths();
-    }
-    clientUserAuths = null;
+
     if (needsUnregister) {
       acceptor.getClientHealthMonitor().removeConnection(proxyId, this);
       if (unregisterClient) {
@@ -1056,6 +1054,12 @@ public class ServerConnection implements Runnable {
       }
     }
 
+    if (unregisterClient) {
+      // last server connection call all close on auth objects
+      secureLogger.debug("ServerConnection.handleTermination clean client auths");
+      cleanClientAuths();
+    }
+
     if (cleanupStats) {
       acceptor.getConnectionListener().connectionClosed(clientDeparted, communicationMode);
     }
@@ -1120,13 +1124,13 @@ public class ServerConnection implements Runnable {
         throw new AuthenticationFailedException("Authentication failed");
       }
 
-      try {
-        // first try integrated security
-        clientUserAuths.removeSubject(aIds.getUniqueId());
-        // then, try the old way
-        clientUserAuths.removeUserId(aIds.getUniqueId(), keepAlive);
-      } catch (NullPointerException exception) {
-        logger.debug("Exception", exception);
+      synchronized (clientUserAuthsLock) {
+        if (clientUserAuths != null) {
+          // first try integrated security
+          clientUserAuths.removeSubject(aIds.getUniqueId());
+          // then, try the old way
+          clientUserAuths.removeUserId(aIds.getUniqueId(), keepAlive);
+        }
       }
     } catch (Exception exception) {
       throw new AuthenticationFailedException("Authentication failed", exception);
@@ -1231,11 +1235,18 @@ public class ServerConnection implements Runnable {
     return uniqueId;
   }
 
-  @VisibleForTesting
+  @TestOnly
   protected ClientUserAuths getClientUserAuths() {
     return clientUserAuths;
   }
 
+  @TestOnly
+  protected void setClientUserAuths(ClientUserAuths clientUserAuths) {
+    synchronized (clientUserAuthsLock) {
+      this.clientUserAuths = clientUserAuths;
+    }
+  }
+
   private void setSecurityPart() {
     try {
       connectionId = randomConnectionIdGen.nextLong();
@@ -1834,13 +1845,10 @@ public class ServerConnection implements Runnable {
     long uniqueId = getUniqueId();
 
     UserAuthAttributes uaa = null;
-    try {
-      uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
-    } catch (NullPointerException npe) {
-      if (isTerminated()) {
-        throw new IOException("Server connection is terminated.");
+    synchronized (clientUserAuthsLock) {
+      if (clientUserAuths != null) {
+        uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
       }
-      logger.debug("Unexpected exception {}", npe.toString());
     }
     if (uaa == null) {
       throw new AuthenticationRequiredException(USER_NOT_FOUND);
@@ -1878,14 +1886,12 @@ public class ServerConnection implements Runnable {
     long uniqueId = getUniqueId();
 
     UserAuthAttributes uaa = null;
-    try {
-      uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
-    } catch (NullPointerException npe) {
-      if (isTerminated()) {
-        throw new IOException("Server connection is terminated.");
+    synchronized (clientUserAuthsLock) {
+      if (clientUserAuths != null) {
+        uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
       }
-      logger.debug("Unexpected exception", npe);
     }
+
     if (uaa == null) {
       throw new AuthenticationRequiredException(USER_NOT_FOUND);
     }
@@ -1930,7 +1936,7 @@ public class ServerConnection implements Runnable {
   }
 
   /**
-   * For legacy auth?
+   * For legacy auth
    */
   private long getUniqueId(Principal principal)
       throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index 7e15eab..e32a031 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -34,7 +34,10 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.shiro.subject.Subject;
 import org.junit.Before;
 import org.junit.Rule;
@@ -62,7 +65,6 @@ import org.apache.geode.test.junit.categories.ClientServerTest;
 
 @Category(ClientServerTest.class)
 public class ServerConnectionTest {
-
   @Rule
   public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
 
@@ -81,6 +83,7 @@ public class ServerConnectionTest {
   private DistributionManager distributionManager;
   private ThreadsMonitoring threadsMonitoring;
   private CacheClientNotifier notifier;
+  private ClientHealthMonitor clientHealthMonitor;
 
   @Before
   public void setUp() throws IOException {
@@ -92,6 +95,7 @@ public class ServerConnectionTest {
     securityService = mock(SecurityService.class);
     command = mock(Command.class);
     notifier = mock(CacheClientNotifier.class);
+    clientHealthMonitor = mock(ClientHealthMonitor.class);
 
     when(inetAddress.getHostAddress()).thenReturn("localhost");
     when(socket.getInetAddress()).thenReturn(inetAddress);
@@ -276,4 +280,46 @@ public class ServerConnectionTest {
     assertThat(spy.getProcessMessages()).isFalse();
     verify(spy, never()).resumeThreadMonitoring();
   }
+
+  @Test
+  public void handleTerminationWithoutUnrgisterClientShouldNotNullClientAuths() {
+    when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
+    when(acceptor.getCacheClientNotifier()).thenReturn(notifier);
+    ClientUserAuths clientUserAuths = mock(ClientUserAuths.class);
+    ServerConnection spy = spy(serverConnection);
+    doReturn(new HashMap<>()).when(clientHealthMonitor).getCleanupTable();
+    doReturn(new HashMap<>()).when(clientHealthMonitor).getCleanupProxyIdTable();
+    spy.setClientUserAuths(clientUserAuths);
+
+    spy.handleTermination(false);
+    assertThat(spy.getClientUserAuths()).isNotNull();
+
+    // subsequent putSubject call will be successful
+    Subject subject = mock(Subject.class);
+    spy.putSubject(subject, -1);
+  }
+
+  @Test
+  public void handleTerminationWithUnregisterClientShouldNullClientAuths() {
+    when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
+    when(acceptor.getCacheClientNotifier()).thenReturn(notifier);
+    when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
+    ClientUserAuths clientUserAuths = mock(ClientUserAuths.class);
+    ServerConnection spy = spy(serverConnection);
+    Map<ServerSideHandshake, MutableInt> cleanupTable = mock(Map.class);
+    when(cleanupTable.get(any())).thenReturn(mock(MutableInt.class));
+    doReturn(cleanupTable).when(clientHealthMonitor).getCleanupTable();
+    doReturn(new HashMap<>()).when(clientHealthMonitor).getCleanupProxyIdTable();
+    spy.setClientUserAuths(clientUserAuths);
+
+    ClientProxyMembershipID proxyId = mock(ClientProxyMembershipID.class);
+    when(proxyId.isDurable()).thenReturn(true);
+    when(proxyId.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
+    spy.setProxyId(proxyId);
+
+    // this will make handleTermination to unregister the client
+    spy.processHandShake();
+    spy.handleTermination(false);
+    assertThat(spy.getClientUserAuths()).isNull();
+  }
 }