You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/03/20 22:57:08 UTC
[geode] 01/01: GEODE-6008 work in progress
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-6008
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 071c4e4c98144741e30adcee19d72bec2cdc2c34
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Mar 20 15:56:22 2020 -0700
GEODE-6008 work in progress
---
...tServerHostNameVerificationDistributedTest.java | 2 +
.../internal/tcp/TCPConduitBareBonesDUnitTest.java | 76 ++++++++++++++++++++++
.../apache/geode/distributed/internal/DMStats.java | 2 +
.../distributed/internal/DistributionStats.java | 9 +++
.../internal/LonerDistributionManager.java | 5 ++
.../org/apache/geode/internal/tcp/Connection.java | 29 +++++++--
.../org/apache/geode/internal/tcp/TCPConduit.java | 4 +-
7 files changed, 118 insertions(+), 9 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
index 8f1db5f..8ce2b34 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
@@ -39,6 +39,7 @@ import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.ssl.CertStores;
import org.apache.geode.cache.ssl.CertificateBuilder;
import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
@@ -162,6 +163,7 @@ public class ClientServerHostNameVerificationDistributedTest {
.sanDnsName(InetAddress.getLoopbackAddress().getHostName())
.sanDnsName(InetAddress.getLocalHost().getHostName())
.sanDnsName(InetAddress.getLocalHost().getCanonicalHostName())
+ .sanIpAddress(LocalHostUtil.getLocalHost())
.sanIpAddress(InetAddress.getLocalHost())
.generate();
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java
new file mode 100644
index 0000000..77eb02e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Properties;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.distributed.internal.DistributionImpl;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+public class TCPConduitBareBonesDUnitTest {
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule(0);
+
+ @Test
+ public void testShortTermSharedConnections() throws Exception {
+ // create and close many shared, ordered connections. Also see CloseConnectionTest,
+ // which ensures that the outgoing socket in a shared connection is properly closed
+ Properties properties = new Properties();
+ properties.put(ConfigurationProperties.LOG_LEVEL, "fine");
+ properties.put(ConfigurationProperties.LOCATORS, DistributedTestUtils.getLocators());
+ DistributionConfig configuration = new DistributionConfigImpl(properties);
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) InternalDistributedSystem.connect(configuration.toProperties());
+ TCPConduit conduit =
+ ((DistributionImpl) distributedSystem.getDistributionManager().getDistribution())
+ .getDirectChannel().getConduit();
+
+ InternalDistributedMember memberID =
+ distributedSystem.getDistributionManager().getDistribution().getView().getCreator();
+ ConnectionTable.threadWantsSharedResources();
+
+ try {
+ for (int i = 0; i < 1000; i++) {
+ long buffersSize =
+ distributedSystem.getDistributionManager().getStats().getSenderBufferSize(true);
+ // connect to the dunit locator. The first iteration will get the Connection that was formed
+ // during startup, but we'll close it and start creating new ones.
+ Connection connection =
+ conduit.getConnection(memberID, true, false, System.currentTimeMillis(), 15000, 0);
+ System.out.println("Test iteration " + i + ": " + connection);
+ connection.requestClose("for test");
+ // the connection should be stopped at this point
+ assertThat(connection.isReceiverStopped()).isTrue();
+ // make sure there are no double releases of ByteBuffers
+ assertThat(distributedSystem.getDistributionManager().getStats().getSenderBufferSize(true))
+ .isEqualTo(buffersSize);
+ }
+ } finally {
+ distributedSystem.disconnect();
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
index 0f880b2..f910e9d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
@@ -475,6 +475,8 @@ public interface DMStats extends MembershipStatistics {
*/
void incSenderBufferSize(int inc, boolean direct);
+ long getSenderBufferSize(boolean direct);
+
/**
* @since GemFire 5.0.2.4
*/
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index b08a512..c44f29a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -2287,6 +2287,15 @@ public class DistributionStats implements DMStats {
}
@Override
+ public long getSenderBufferSize(boolean direct) {
+ if (direct) {
+ return stats.getLong(senderDirectBufferSizeId);
+ } else {
+ return stats.getLong(senderHeapBufferSizeId);
+ }
+ }
+
+ @Override
public void incMessagesBeingReceived(boolean newMsg, int bytes) {
if (newMsg) {
stats.incInt(messagesBeingReceivedId, 1);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index f2e9f51..9cb34d2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -800,6 +800,11 @@ public class LonerDistributionManager implements DistributionManager {
public void incSenderBufferSize(int inc, boolean direct) {}
@Override
+ public long getSenderBufferSize(boolean direct) {
+ return 0;
+ }
+
+ @Override
public long startSocketLock() {
return 0;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 8c8a2fc..893d42d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -471,6 +471,12 @@ public class Connection implements Runnable {
private volatile boolean ackTimedOut;
/**
+ * a Reader thread for an shared Connection will remain around in order to
+ * ensure that the socket is properly closed.
+ */
+ private volatile boolean isResidualReaderThread;
+
+ /**
* creates a "reader" connection that we accepted (it was initiated by an explicit connect being
* done on the other side).
*/
@@ -1326,7 +1332,7 @@ public class Connection implements Runnable {
}
// make sure our socket is closed
asyncClose(false);
- if (!isReceiver) {
+ if (!isReceiver && !isResidualReaderThread) {
// receivers release the input buffer when exiting run(). Senders use the
// inputBuffer for reading direct-reply responses
releaseInputBuffer();
@@ -1465,6 +1471,7 @@ public class Connection implements Runnable {
asyncClose(false);
}
}
+
releaseInputBuffer();
// make sure that if the reader thread exits we notify a thread waiting for the handshake.
@@ -1508,6 +1515,9 @@ public class Connection implements Runnable {
}
private void readMessages() {
+ if (closing.get()) {
+ return;
+ }
// take a snapshot of uniqueId to detect reconnect attempts
SocketChannel channel;
try {
@@ -1552,7 +1562,7 @@ public class Connection implements Runnable {
}
// we should not change the state of the connection if we are a handshake reader thread
// as there is a race between this thread and the application thread doing direct ack
- boolean isHandShakeReader = false;
+ boolean handshakeHasBeenRead = false;
// if we're using SSL/TLS the input buffer may already have data to process
boolean skipInitialRead = getInputBuffer().position() > 0;
try {
@@ -1607,7 +1617,7 @@ public class Connection implements Runnable {
}
processInputBuffer();
- if (!isHandShakeReader && !isReceiver && (handshakeRead || handshakeCancelled)) {
+ if (!handshakeHasBeenRead && !isReceiver && (handshakeRead || handshakeCancelled)) {
if (logger.isDebugEnabled()) {
if (handshakeRead) {
logger.debug("handshake has been read {}", this);
@@ -1615,12 +1625,16 @@ public class Connection implements Runnable {
logger.debug("handshake has been cancelled {}", this);
}
}
- isHandShakeReader = true;
+ handshakeHasBeenRead = true;
// Once we have read the handshake for unshared connections, the reader can skip
// processing messages
if (!sharedResource || asyncMode) {
break;
+ } else {
+ // not exiting and not a Reader spawned from a ServerSocket.accept(), so
+ // let's set some state noting that this is happening
+ isResidualReaderThread = true;
}
}
@@ -1663,7 +1677,7 @@ public class Connection implements Runnable {
return;
} catch (Exception e) {
- owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
if (!stopped && !isSocketClosed()) {
logger.fatal(String.format("%s exception in channel read", p2pReaderName()), e);
}
@@ -1676,14 +1690,15 @@ public class Connection implements Runnable {
}
}
} finally {
- if (!isHandShakeReader || (sharedResource && !asyncMode)) {
+ isResidualReaderThread = false;
+ if (!handshakeHasBeenRead || (sharedResource && !asyncMode)) {
synchronized (stateLock) {
connectionState = STATE_IDLE;
}
}
if (logger.isDebugEnabled()) {
logger.debug("readMessages terminated id={} from {} isHandshakeReader={}", conduitIdStr,
- remoteAddr, isHandShakeReader);
+ remoteAddr, handshakeHasBeenRead);
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 6f5edda..51af0de 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -305,14 +305,14 @@ public class TCPConduit implements Runnable {
logger.warn("exception parsing p2p.idleConnectionTimeout", e);
}
- s = p.getProperty("membership_port_range_start");
+ s = p.getProperty("membership_port_range_start", "" + DEFAULT_MEMBERSHIP_PORT_RANGE[0]);
try {
tcpPortRange[0] = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("Exception parsing membership-port-range start port.", e);
}
- s = p.getProperty("membership_port_range_end");
+ s = p.getProperty("membership_port_range_end", "" + DEFAULT_MEMBERSHIP_PORT_RANGE[1]);
try {
tcpPortRange[1] = Integer.parseInt(s);
} catch (Exception e) {