You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/10/10 23:23:27 UTC
[geode] 01/01: GEODE-7286: Reset _socketClosed AtomicBoolean when
durable client reconnects
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch feature/GEODE-7286
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7fb8ed002ec3a486681323e60d2cdbe8cfc2475b
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Thu Oct 10 16:21:56 2019 -0700
GEODE-7286: Reset _socketClosed AtomicBoolean when durable client reconnects
---
...ientConnectDisconnectSocketDistributedTest.java | 132 +++++++++++++++++++++
.../cache/tier/sockets/CacheClientProxy.java | 3 +
2 files changed, 135 insertions(+)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientConnectDisconnectSocketDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientConnectDisconnectSocketDistributedTest.java
new file mode 100644
index 0000000..fbfa1aa
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientConnectDisconnectSocketDistributedTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCacheServer;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class DurableClientConnectDisconnectSocketDistributedTest implements Serializable {
+
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Test
+ public void testSocketClosedOnClientDisconnect() throws Exception {
+ // Start Locator
+ MemberVM locator = cluster.startLocatorVM(0);
+
+ // Start server
+ int locatorPort = locator.getPort();
+ MemberVM server = cluster.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
+
+ // Connect client
+ ClientVM client =
+ cluster.startClientVM(2, getDurableClientProperties(testName.getMethodName()),
+ (ccf) -> {
+ ccf.setPoolSubscriptionEnabled(true);
+ ccf.addPoolLocator("localhost", locatorPort);
+ });
+
+ // Invoke readyForEvents in client
+ client.invoke(() -> readyForEvents());
+
+ // Verify client socket is connected on the server
+ server.invoke(() -> verifyCacheClientProxySocketIsOpen());
+
+ // Close client
+ client.invoke(() -> closeClient());
+
+ // Wait for the client socket to be closed on the server
+ server.invoke(() -> waitForCacheClientProxySocketToBeClosed());
+
+ // Reconnect the client
+ client =
+ cluster.startClientVM(2, getDurableClientProperties(testName.getMethodName()),
+ (ccf) -> {
+ ccf.setPoolSubscriptionEnabled(true);
+ ccf.addPoolLocator("localhost", locatorPort);
+ });
+
+ // Invoke readyForEvents in client
+ client.invoke(() -> readyForEvents());
+
+ // Verify client socket is connected on the server
+ server.invoke(() -> verifyCacheClientProxySocketIsOpen());
+
+ // Close client
+ client.invoke(() -> closeClient());
+
+ // Wait for the client socket to be closed on the server
+ server.invoke(() -> waitForCacheClientProxySocketToBeClosed());
+ }
+
+ protected Properties getDurableClientProperties(String durableClientId) {
+ Properties properties = new Properties();
+ properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
+ return properties;
+ }
+
+ private void closeClient() {
+ ClusterStartupRule.getClientCache().close(true);
+ }
+
+ private void readyForEvents() {
+ ClusterStartupRule.getClientCache().readyForEvents();
+ }
+
+ private AcceptorImpl getAcceptor() {
+ CacheServer cacheServer = ClusterStartupRule.getCache().getCacheServers().get(0);
+ return (AcceptorImpl) ((InternalCacheServer) cacheServer).getAcceptor();
+ }
+
+ private void verifyCacheClientProxySocketIsOpen() {
+ // Get the acceptor
+ AcceptorImpl acceptor = getAcceptor();
+
+ // Wait for the CacheClientProxy to be created since its asynchronous
+ await().until(() -> acceptor.getCacheClientNotifier().getClientProxies().size() == 1);
+
+ CacheClientProxy proxy = acceptor.getCacheClientNotifier().getClientProxies().iterator().next();
+ assertThat(proxy.getSocket().isClosed()).isFalse();
+ }
+
+ private void waitForCacheClientProxySocketToBeClosed() {
+ // Get the acceptor
+ AcceptorImpl acceptor = getAcceptor();
+
+ // Get the CacheClientProxy
+ CacheClientProxy proxy = acceptor.getCacheClientNotifier().getClientProxies().iterator().next();
+
+ // Wait for the CacheClientProxy's socket to be closed
+ await().until(() -> proxy.getSocket().isClosed());
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 945f614..08782b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -1838,6 +1838,9 @@ public class CacheClientProxy implements ClientSession {
this._messageDispatcher._messageQueue.setPrimary(ip);
this._messageDispatcher._messageQueue.setClientConflation(cc);
+ // Reset the _socketClosed AtomicBoolean
+ this._socketClosed.compareAndSet(true, false);
+
reinitializeClientAuths();
this.creationDate = new Date();
if (logger.isDebugEnabled()) {