You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/12/08 01:50:29 UTC

[geode] branch develop updated: GEODE-6164: CacheClientProxy's closeSocket should be called atomically (#2972)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4fde138  GEODE-6164: CacheClientProxy's closeSocket should be called atomically (#2972)
4fde138 is described below

commit 4fde138665a16c99949a8571ee14a466fd6a4025
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri Dec 7 17:50:19 2018 -0800

    GEODE-6164: CacheClientProxy's closeSocket should be called atomically (#2972)
---
 .../cache/tier/sockets/CacheClientProxyTest.java   | 97 ++++++++++++++++++++++
 .../cache/tier/sockets/CacheClientProxy.java       | 18 ++--
 2 files changed, 109 insertions(+), 6 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
new file mode 100644
index 0000000..0d21cd3
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+public class CacheClientProxyTest {
+
+  @Rule
+  public ServerStarterRule serverRule = new ServerStarterRule().withAutoStart();
+
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  @Test
+  public void closeSocketShouldBeAtomic() {
+
+    final CacheServerStats stats = mock(CacheServerStats.class);
+    doNothing().when(stats).incCurrentQueueConnections();
+
+    final InternalCache cache = serverRule.getCache();
+
+    final CacheClientNotifier ccn = mock(CacheClientNotifier.class);
+    final SocketCloser sc = mock(SocketCloser.class);
+    when(ccn.getCache()).thenReturn(cache);
+    when(ccn.getAcceptorStats()).thenReturn(stats);
+    when(ccn.getSocketCloser()).thenReturn(sc);
+
+    final Socket socket = mock(Socket.class);
+    final InetAddress address = mock(InetAddress.class);
+    when(socket.getInetAddress()).thenReturn(address);
+    when(address.getHostAddress()).thenReturn("localhost");
+    doNothing().when(sc).asyncClose(any(), eq("localhost"), eq(null));
+
+    final ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
+    final DistributedMember member = cache.getDistributedSystem().getDistributedMember();
+    when(proxyID.getDistributedMember()).thenReturn(member);
+
+    CacheClientProxy proxy = new CacheClientProxy(ccn, socket, proxyID, true,
+        Handshake.CONFLATION_DEFAULT, Version.CURRENT, 1L, true,
+        null, null);
+
+    CompletableFuture<Void> result1 = executorServiceRule.runAsync(() -> proxy.close());
+    CompletableFuture<Void> result2 = executorServiceRule.runAsync(() -> proxy.close());
+    CompletableFuture<Void> result3 = executorServiceRule.runAsync(() -> proxy.close());
+    CompletableFuture.allOf(result1, result2, result3).join();
+    assertThatCode(() -> result1.get(60, SECONDS)).doesNotThrowAnyException();
+    assertThatCode(() -> result2.get(60, SECONDS)).doesNotThrowAnyException();
+    assertThatCode(() -> result3.get(60, SECONDS)).doesNotThrowAnyException();
+    verify(ccn, times(2)).getSocketCloser();
+    assertNull(proxy._remoteHostAddress);
+  }
+
+  @Test
+  public void closeSocket1000Times() {
+    // run it for 1000 times to introduce conflicts between threads
+    for (int i = 0; i < 1000; i++) {
+      closeSocketShouldBeAtomic();
+    }
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 7e6fcfd..f1dccda 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -945,17 +945,23 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-  private void closeSocket() {
-    if (this._socketClosed.compareAndSet(false, true)) {
-      // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
-          null);
+  private boolean closeSocket() {
+    String remoteHostAddress = this._remoteHostAddress;
+    if (this._socketClosed.compareAndSet(false, true) && remoteHostAddress != null) {
+      // Only one thread is expected to close the socket
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, remoteHostAddress, null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
+      return true;
     }
+    return false;
   }
 
   private void closeTransientFields() {
-    closeSocket();
+    if (!closeSocket()) {
+      // The thread who closed the socket will be responsible to
+      // releaseResourcesForAddress and clearClientInterestList
+      return;
+    }
 
     // Null out comm buffer, host address, ports and proxy id. All will be
     // replaced when the client reconnects.