You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2022/02/15 18:44:51 UTC
[geode] branch support/1.15 updated: GEODE-10042: do not make ClientUserAuths null when we are not unregister client yet. (#7357)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.15 by this push:
new 5a656ce GEODE-10042: do not make ClientUserAuths null when we are not unregister client yet. (#7357)
5a656ce is described below
commit 5a656ce402280c377f72f9baf23e422dea528afd
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Tue Feb 15 10:37:18 2022 -0800
GEODE-10042: do not make ClientUserAuths null when we are not unregister client yet. (#7357)
* make cleanUserAuths synchronized to avoid NPE
* This also pass down client termination reason when we clean up client threads.
* since we introduced a lock object for clientUserAuths, revert some old code to not to catch NPE but use the lock
* synchronize all clientUserAuths updates.
(cherry picked from commit 592dd652c263c2f043c6f6a11c5fd7f8a61c84a6)
---
.../cache/tier/sockets/CacheClientProxyTest.java | 16 +++-
.../cache/tier/sockets/CacheClientProxy.java | 43 ++++++-----
.../cache/tier/sockets/ClientHealthMonitor.java | 14 ++--
.../cache/tier/sockets/ServerConnection.java | 86 ++++++++++++----------
.../cache/tier/sockets/ServerConnectionTest.java | 48 +++++++++++-
5 files changed, 141 insertions(+), 66 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index ca7acf0..62736ff 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -26,8 +26,11 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.net.InetAddress;
import java.net.Socket;
+import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,7 +55,7 @@ import org.apache.geode.test.junit.rules.ServerStarterRule;
public class CacheClientProxyTest {
@Rule
- public ServerStarterRule serverRule = new ServerStarterRule().withAutoStart();
+ public ServerStarterRule serverRule = new ServerStarterRule().withLogFile().withAutoStart();
@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@@ -96,11 +99,20 @@ public class CacheClientProxyTest {
}
@Test
- public void closeSocket1000Times() {
+ public void closeSocket1000Times() throws FileNotFoundException {
// run it for 1000 times to introduce conflicts between threads
for (int i = 0; i < 1000; i++) {
closeSocketShouldBeAtomic();
}
+
+ // make sure there is no NPE warning in the log file
+ File logFile = new File(serverRule.getWorkingDir(), "server.log");
+ Scanner scanner = new Scanner(logFile);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ assertThat(line).describedAs("File: %s, Line: %s", logFile.getAbsolutePath(), line)
+ .doesNotContain("NullPointerException");
+ }
}
@Test
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 87ac740..43be442 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -785,24 +785,7 @@ public class CacheClientProxy implements ClientSession {
// Close the Authorization callback or subject if we are not keeping the proxy
try {
if (!pauseDurable) {
- // single user case -- old security
- if (postAuthzCallback != null) {
- postAuthzCallback.close();
- postAuthzCallback = null;
- }
- // single user case -- integrated security
- // connection is closed, so we can log out this subject
- else if (subject != null) {
- secureLogger.debug("CacheClientProxy.close, logging out {}. ", subject.getPrincipal());
- subject.logout();
- subject = null;
- }
- // for multiUser case, in non-durable case, we are closing the connection
- else if (clientUserAuths != null) {
- secureLogger.debug("CacheClientProxy.close, cleanup all client subjects. ");
- clientUserAuths.cleanup(true);
- clientUserAuths = null;
- }
+ cleanClientAuths();
}
} catch (Exception ex) {
logger.warn("{}", this, ex);
@@ -812,6 +795,30 @@ public class CacheClientProxy implements ClientSession {
return keepProxy;
}
+ // this needs to synchronized to avoid NPE between null check and operations
+ private void cleanClientAuths() {
+ synchronized (clientUserAuthsLock) {
+ // single user case -- old security
+ if (postAuthzCallback != null) {
+ postAuthzCallback.close();
+ postAuthzCallback = null;
+ }
+ // single user case -- integrated security
+ // connection is closed, so we can log out this subject
+ else if (subject != null) {
+ secureLogger.debug("CacheClientProxy.close, logging out {}. ", subject.getPrincipal());
+ subject.logout();
+ subject = null;
+ }
+ // for multiUser case, in non-durable case, we are closing the connection
+ else if (clientUserAuths != null) {
+ secureLogger.debug("CacheClientProxy.close, cleanup all client subjects. ");
+ clientUserAuths.cleanup(true);
+ clientUserAuths = null;
+ }
+ }
+ }
+
protected void pauseDispatching() {
if (_messageDispatcher == null) {
return;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index dfa9a0b..6d363c1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -305,11 +305,11 @@ public class ClientHealthMonitor {
}
public void removeAllConnectionsAndUnregisterClient(ClientProxyMembershipID proxyID,
- Throwable t) {
+ Throwable throwable) {
// Remove all connections
- cleanupClientThreads(proxyID, false);
+ cleanupClientThreads(proxyID, false, throwable);
- unregisterClient(proxyID, false, t);
+ unregisterClient(proxyID, false, throwable);
}
/**
@@ -499,7 +499,8 @@ public class ClientHealthMonitor {
return connectedIncomingGateways;
}
- private boolean cleanupClientThreads(ClientProxyMembershipID proxyID, boolean timedOut) {
+ private boolean cleanupClientThreads(ClientProxyMembershipID proxyID, boolean timedOut,
+ Throwable reason) {
boolean result = false;
Set<ServerConnection> serverConnections = null;
synchronized (proxyIdConnections) {
@@ -512,6 +513,9 @@ public class ClientHealthMonitor {
if (serverConnections != null) {
result = true;
for (ServerConnection serverConnection : serverConnections) {
+ if (reason != null) {
+ serverConnection.setClientDisconnectedException(reason);
+ }
serverConnection.handleTermination(timedOut);
}
}
@@ -803,7 +807,7 @@ public class ClientHealthMonitor {
// any of its ServerConnection threads are currently processing
// a message. If so, let it go. If not, disconnect it.
if (prepareToTerminateIfNoConnectionIsProcessing(proxyID)) {
- if (cleanupClientThreads(proxyID, true)) {
+ if (cleanupClientThreads(proxyID, true, null)) {
logger.warn(
"Monitoring client with member id {}. It had been {} ms since the latest heartbeat. Max interval is {}. Terminated client.",
entry.getKey(), currentTime - latestHeartbeat,
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index fe38216..689325d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -45,6 +45,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.util.ThreadState;
+import org.jetbrains.annotations.TestOnly;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
@@ -261,6 +262,7 @@ public class ServerConnection implements Runnable {
private ClientUserAuths clientUserAuths;
+ private final Object clientUserAuthsLock = new Object();
// this is constant(server and client) for first user request, after that it is random
// this also need to send in handshake
@@ -534,18 +536,13 @@ public class ServerConnection implements Runnable {
}
private long setUserAuthorizeAndPostAuthorizeRequest(AuthorizeRequest authzRequest,
- AuthorizeRequestPP postAuthzRequest) throws IOException {
+ AuthorizeRequestPP postAuthzRequest) {
UserAuthAttributes userAuthAttr = new UserAuthAttributes(authzRequest, postAuthzRequest);
- if (clientUserAuths == null) {
- initializeClientUserAuths();
- }
- try {
- return clientUserAuths.putUserAuth(userAuthAttr);
- } catch (NullPointerException exception) {
- if (isTerminated()) {
- throw new IOException("Server connection is terminated.");
+ synchronized (clientUserAuthsLock) {
+ if (clientUserAuths == null) {
+ initializeClientUserAuths();
}
- throw exception;
+ return clientUserAuths.putUserAuth(userAuthAttr);
}
}
@@ -798,7 +795,9 @@ public class ServerConnection implements Runnable {
if (verifyClientConnection()) {
initializeCommands();
if (!getCommunicationMode().isWAN()) {
- initializeClientUserAuths();
+ synchronized (clientUserAuthsLock) {
+ initializeClientUserAuths();
+ }
}
}
if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
@@ -959,9 +958,13 @@ public class ServerConnection implements Runnable {
}
}
+ // this needs to be synchronized to avoid NPE between null check and cleanup
private void cleanClientAuths() {
- if (clientUserAuths != null) {
- clientUserAuths.cleanup(false);
+ synchronized (clientUserAuthsLock) {
+ if (clientUserAuths != null) {
+ clientUserAuths.cleanup(false);
+ clientUserAuths = null;
+ }
}
}
@@ -1042,12 +1045,7 @@ public class ServerConnection implements Runnable {
chmRegistered = false;
}
}
- if (unregisterClient) {
- // last server connection call all close on auth objects
- secureLogger.debug("ServerConnection.handleTermination clean client auths");
- cleanClientAuths();
- }
- clientUserAuths = null;
+
if (needsUnregister) {
acceptor.getClientHealthMonitor().removeConnection(proxyId, this);
if (unregisterClient) {
@@ -1056,6 +1054,12 @@ public class ServerConnection implements Runnable {
}
}
+ if (unregisterClient) {
+ // last server connection call all close on auth objects
+ secureLogger.debug("ServerConnection.handleTermination clean client auths");
+ cleanClientAuths();
+ }
+
if (cleanupStats) {
acceptor.getConnectionListener().connectionClosed(clientDeparted, communicationMode);
}
@@ -1120,13 +1124,13 @@ public class ServerConnection implements Runnable {
throw new AuthenticationFailedException("Authentication failed");
}
- try {
- // first try integrated security
- clientUserAuths.removeSubject(aIds.getUniqueId());
- // then, try the old way
- clientUserAuths.removeUserId(aIds.getUniqueId(), keepAlive);
- } catch (NullPointerException exception) {
- logger.debug("Exception", exception);
+ synchronized (clientUserAuthsLock) {
+ if (clientUserAuths != null) {
+ // first try integrated security
+ clientUserAuths.removeSubject(aIds.getUniqueId());
+ // then, try the old way
+ clientUserAuths.removeUserId(aIds.getUniqueId(), keepAlive);
+ }
}
} catch (Exception exception) {
throw new AuthenticationFailedException("Authentication failed", exception);
@@ -1231,11 +1235,18 @@ public class ServerConnection implements Runnable {
return uniqueId;
}
- @VisibleForTesting
+ @TestOnly
protected ClientUserAuths getClientUserAuths() {
return clientUserAuths;
}
+ @TestOnly
+ protected void setClientUserAuths(ClientUserAuths clientUserAuths) {
+ synchronized (clientUserAuthsLock) {
+ this.clientUserAuths = clientUserAuths;
+ }
+ }
+
private void setSecurityPart() {
try {
connectionId = randomConnectionIdGen.nextLong();
@@ -1834,13 +1845,10 @@ public class ServerConnection implements Runnable {
long uniqueId = getUniqueId();
UserAuthAttributes uaa = null;
- try {
- uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
- } catch (NullPointerException npe) {
- if (isTerminated()) {
- throw new IOException("Server connection is terminated.");
+ synchronized (clientUserAuthsLock) {
+ if (clientUserAuths != null) {
+ uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
}
- logger.debug("Unexpected exception {}", npe.toString());
}
if (uaa == null) {
throw new AuthenticationRequiredException(USER_NOT_FOUND);
@@ -1878,14 +1886,12 @@ public class ServerConnection implements Runnable {
long uniqueId = getUniqueId();
UserAuthAttributes uaa = null;
- try {
- uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
- } catch (NullPointerException npe) {
- if (isTerminated()) {
- throw new IOException("Server connection is terminated.");
+ synchronized (clientUserAuthsLock) {
+ if (clientUserAuths != null) {
+ uaa = clientUserAuths.getUserAuthAttributes(uniqueId);
}
- logger.debug("Unexpected exception", npe);
}
+
if (uaa == null) {
throw new AuthenticationRequiredException(USER_NOT_FOUND);
}
@@ -1930,7 +1936,7 @@ public class ServerConnection implements Runnable {
}
/**
- * For legacy auth?
+ * For legacy auth
*/
private long getUniqueId(Principal principal)
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index 7e15eab..e32a031 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -34,7 +34,10 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.shiro.subject.Subject;
import org.junit.Before;
import org.junit.Rule;
@@ -62,7 +65,6 @@ import org.apache.geode.test.junit.categories.ClientServerTest;
@Category(ClientServerTest.class)
public class ServerConnectionTest {
-
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
@@ -81,6 +83,7 @@ public class ServerConnectionTest {
private DistributionManager distributionManager;
private ThreadsMonitoring threadsMonitoring;
private CacheClientNotifier notifier;
+ private ClientHealthMonitor clientHealthMonitor;
@Before
public void setUp() throws IOException {
@@ -92,6 +95,7 @@ public class ServerConnectionTest {
securityService = mock(SecurityService.class);
command = mock(Command.class);
notifier = mock(CacheClientNotifier.class);
+ clientHealthMonitor = mock(ClientHealthMonitor.class);
when(inetAddress.getHostAddress()).thenReturn("localhost");
when(socket.getInetAddress()).thenReturn(inetAddress);
@@ -276,4 +280,46 @@ public class ServerConnectionTest {
assertThat(spy.getProcessMessages()).isFalse();
verify(spy, never()).resumeThreadMonitoring();
}
+
+ @Test
+ public void handleTerminationWithoutUnrgisterClientShouldNotNullClientAuths() {
+ when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
+ when(acceptor.getCacheClientNotifier()).thenReturn(notifier);
+ ClientUserAuths clientUserAuths = mock(ClientUserAuths.class);
+ ServerConnection spy = spy(serverConnection);
+ doReturn(new HashMap<>()).when(clientHealthMonitor).getCleanupTable();
+ doReturn(new HashMap<>()).when(clientHealthMonitor).getCleanupProxyIdTable();
+ spy.setClientUserAuths(clientUserAuths);
+
+ spy.handleTermination(false);
+ assertThat(spy.getClientUserAuths()).isNotNull();
+
+ // subsequent putSubject call will be successful
+ Subject subject = mock(Subject.class);
+ spy.putSubject(subject, -1);
+ }
+
+ @Test
+ public void handleTerminationWithUnregisterClientShouldNullClientAuths() {
+ when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
+ when(acceptor.getCacheClientNotifier()).thenReturn(notifier);
+ when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
+ ClientUserAuths clientUserAuths = mock(ClientUserAuths.class);
+ ServerConnection spy = spy(serverConnection);
+ Map<ServerSideHandshake, MutableInt> cleanupTable = mock(Map.class);
+ when(cleanupTable.get(any())).thenReturn(mock(MutableInt.class));
+ doReturn(cleanupTable).when(clientHealthMonitor).getCleanupTable();
+ doReturn(new HashMap<>()).when(clientHealthMonitor).getCleanupProxyIdTable();
+ spy.setClientUserAuths(clientUserAuths);
+
+ ClientProxyMembershipID proxyId = mock(ClientProxyMembershipID.class);
+ when(proxyId.isDurable()).thenReturn(true);
+ when(proxyId.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
+ spy.setProxyId(proxyId);
+
+ // this will make handleTermination to unregister the client
+ spy.processHandShake();
+ spy.handleTermination(false);
+ assertThat(spy.getClientUserAuths()).isNull();
+ }
}