You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/12/08 01:50:29 UTC
[geode] branch develop updated: GEODE-6164: CacheClientProxy's
closeSocket should be called atomically (#2972)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4fde138 GEODE-6164: CacheClientProxy's closeSocket should be called atomically (#2972)
4fde138 is described below
commit 4fde138665a16c99949a8571ee14a466fd6a4025
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri Dec 7 17:50:19 2018 -0800
GEODE-6164: CacheClientProxy's closeSocket should be called atomically (#2972)
---
.../cache/tier/sockets/CacheClientProxyTest.java | 97 ++++++++++++++++++++++
.../cache/tier/sockets/CacheClientProxy.java | 18 ++--
2 files changed, 109 insertions(+), 6 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
new file mode 100644
index 0000000..0d21cd3
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+public class CacheClientProxyTest {
+
+ @Rule
+ public ServerStarterRule serverRule = new ServerStarterRule().withAutoStart();
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ @Test
+ public void closeSocketShouldBeAtomic() {
+
+ final CacheServerStats stats = mock(CacheServerStats.class);
+ doNothing().when(stats).incCurrentQueueConnections();
+
+ final InternalCache cache = serverRule.getCache();
+
+ final CacheClientNotifier ccn = mock(CacheClientNotifier.class);
+ final SocketCloser sc = mock(SocketCloser.class);
+ when(ccn.getCache()).thenReturn(cache);
+ when(ccn.getAcceptorStats()).thenReturn(stats);
+ when(ccn.getSocketCloser()).thenReturn(sc);
+
+ final Socket socket = mock(Socket.class);
+ final InetAddress address = mock(InetAddress.class);
+ when(socket.getInetAddress()).thenReturn(address);
+ when(address.getHostAddress()).thenReturn("localhost");
+ doNothing().when(sc).asyncClose(any(), eq("localhost"), eq(null));
+
+ final ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
+ final DistributedMember member = cache.getDistributedSystem().getDistributedMember();
+ when(proxyID.getDistributedMember()).thenReturn(member);
+
+ CacheClientProxy proxy = new CacheClientProxy(ccn, socket, proxyID, true,
+ Handshake.CONFLATION_DEFAULT, Version.CURRENT, 1L, true,
+ null, null);
+
+ CompletableFuture<Void> result1 = executorServiceRule.runAsync(() -> proxy.close());
+ CompletableFuture<Void> result2 = executorServiceRule.runAsync(() -> proxy.close());
+ CompletableFuture<Void> result3 = executorServiceRule.runAsync(() -> proxy.close());
+ CompletableFuture.allOf(result1, result2, result3).join();
+ assertThatCode(() -> result1.get(60, SECONDS)).doesNotThrowAnyException();
+ assertThatCode(() -> result2.get(60, SECONDS)).doesNotThrowAnyException();
+ assertThatCode(() -> result3.get(60, SECONDS)).doesNotThrowAnyException();
+ verify(ccn, times(2)).getSocketCloser();
+ assertNull(proxy._remoteHostAddress);
+ }
+
+ @Test
+ public void closeSocket1000Times() {
+ // run it for 1000 times to introduce conflicts between threads
+ for (int i = 0; i < 1000; i++) {
+ closeSocketShouldBeAtomic();
+ }
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 7e6fcfd..f1dccda 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -945,17 +945,23 @@ public class CacheClientProxy implements ClientSession {
}
}
- private void closeSocket() {
- if (this._socketClosed.compareAndSet(false, true)) {
- // Close the socket
- this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
- null);
+ private boolean closeSocket() {
+ String remoteHostAddress = this._remoteHostAddress;
+ if (this._socketClosed.compareAndSet(false, true) && remoteHostAddress != null) {
+ // Only one thread is expected to close the socket
+ this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, remoteHostAddress, null);
getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
+ return true;
}
+ return false;
}
private void closeTransientFields() {
- closeSocket();
+ if (!closeSocket()) {
+ // The thread who closed the socket will be responsible to
+ // releaseResourcesForAddress and clearClientInterestList
+ return;
+ }
// Null out comm buffer, host address, ports and proxy id. All will be
// replaced when the client reconnects.