You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/10/10 23:23:26 UTC

[geode] branch feature/GEODE-7286 created (now 7fb8ed0)

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a change to branch feature/GEODE-7286
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 7fb8ed0  GEODE-7286: Reset _socketClosed AtomicBoolean when durable client reconnects

This branch includes the following new commits:

     new 7fb8ed0  GEODE-7286: Reset _socketClosed AtomicBoolean when durable client reconnects

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-7286: Reset _socketClosed AtomicBoolean when durable client reconnects

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-7286
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7fb8ed002ec3a486681323e60d2cdbe8cfc2475b
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Thu Oct 10 16:21:56 2019 -0700

    GEODE-7286: Reset _socketClosed AtomicBoolean when durable client reconnects
---
 ...ientConnectDisconnectSocketDistributedTest.java | 132 +++++++++++++++++++++
 .../cache/tier/sockets/CacheClientProxy.java       |   3 +
 2 files changed, 135 insertions(+)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientConnectDisconnectSocketDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientConnectDisconnectSocketDistributedTest.java
new file mode 100644
index 0000000..fbfa1aa
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientConnectDisconnectSocketDistributedTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCacheServer;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class DurableClientConnectDisconnectSocketDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Test
+  public void testSocketClosedOnClientDisconnect() throws Exception {
+    // Start Locator
+    MemberVM locator = cluster.startLocatorVM(0);
+
+    // Start server
+    int locatorPort = locator.getPort();
+    MemberVM server = cluster.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
+
+    // Connect client
+    ClientVM client =
+        cluster.startClientVM(2, getDurableClientProperties(testName.getMethodName()),
+            (ccf) -> {
+              ccf.setPoolSubscriptionEnabled(true);
+              ccf.addPoolLocator("localhost", locatorPort);
+            });
+
+    // Invoke readyForEvents in client
+    client.invoke(() -> readyForEvents());
+
+    // Verify client socket is connected on the server
+    server.invoke(() -> verifyCacheClientProxySocketIsOpen());
+
+    // Close client
+    client.invoke(() -> closeClient());
+
+    // Wait for the client socket to be closed on the server
+    server.invoke(() -> waitForCacheClientProxySocketToBeClosed());
+
+    // Reconnect the client
+    client =
+        cluster.startClientVM(2, getDurableClientProperties(testName.getMethodName()),
+            (ccf) -> {
+              ccf.setPoolSubscriptionEnabled(true);
+              ccf.addPoolLocator("localhost", locatorPort);
+            });
+
+    // Invoke readyForEvents in client
+    client.invoke(() -> readyForEvents());
+
+    // Verify client socket is connected on the server
+    server.invoke(() -> verifyCacheClientProxySocketIsOpen());
+
+    // Close client
+    client.invoke(() -> closeClient());
+
+    // Wait for the client socket to be closed on the server
+    server.invoke(() -> waitForCacheClientProxySocketToBeClosed());
+  }
+
+  protected Properties getDurableClientProperties(String durableClientId) {
+    Properties properties = new Properties();
+    properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
+    return properties;
+  }
+
+  private void closeClient() {
+    ClusterStartupRule.getClientCache().close(true);
+  }
+
+  private void readyForEvents() {
+    ClusterStartupRule.getClientCache().readyForEvents();
+  }
+
+  private AcceptorImpl getAcceptor() {
+    CacheServer cacheServer = ClusterStartupRule.getCache().getCacheServers().get(0);
+    return (AcceptorImpl) ((InternalCacheServer) cacheServer).getAcceptor();
+  }
+
+  private void verifyCacheClientProxySocketIsOpen() {
+    // Get the acceptor
+    AcceptorImpl acceptor = getAcceptor();
+
+    // Wait for the CacheClientProxy to be created since its asynchronous
+    await().until(() -> acceptor.getCacheClientNotifier().getClientProxies().size() == 1);
+
+    CacheClientProxy proxy = acceptor.getCacheClientNotifier().getClientProxies().iterator().next();
+    assertThat(proxy.getSocket().isClosed()).isFalse();
+  }
+
+  private void waitForCacheClientProxySocketToBeClosed() {
+    // Get the acceptor
+    AcceptorImpl acceptor = getAcceptor();
+
+    // Get the CacheClientProxy
+    CacheClientProxy proxy = acceptor.getCacheClientNotifier().getClientProxies().iterator().next();
+
+    // Wait for the CacheClientProxy's socket to be closed
+    await().until(() -> proxy.getSocket().isClosed());
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 945f614..08782b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -1838,6 +1838,9 @@ public class CacheClientProxy implements ClientSession {
     this._messageDispatcher._messageQueue.setPrimary(ip);
     this._messageDispatcher._messageQueue.setClientConflation(cc);
 
+    // Reset the _socketClosed AtomicBoolean
+    this._socketClosed.compareAndSet(true, false);
+
     reinitializeClientAuths();
     this.creationDate = new Date();
     if (logger.isDebugEnabled()) {