You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/23 16:30:26 UTC
[geode] branch develop updated: GEODE-9825: processInputBuffer resize retains data (#7131)
This is an automated email from the ASF dual-hosted git repository.
burcham pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new fb142e1 GEODE-9825: processInputBuffer resize retains data (#7131)
fb142e1 is described below
commit fb142e1bbd42d6af2463fd9b9b49ef3e5519cfcb
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Nov 23 08:29:11 2021 -0800
GEODE-9825: processInputBuffer resize retains data (#7131)
---
.../internal/P2PMessagingConcurrencyDUnitTest.java | 136 +++++++++++++++------
.../org/apache/geode/internal/tcp/Connection.java | 12 +-
2 files changed, 111 insertions(+), 37 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
index e54191a..c204777 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -15,6 +15,7 @@
package org.apache.geode.distributed.internal;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.DataInput;
@@ -29,12 +30,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
+import junitparams.Parameters;
import org.jetbrains.annotations.NotNull;
-import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.ssl.CertStores;
@@ -48,15 +50,24 @@ import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
import org.apache.geode.test.version.VersionManager;
/**
- * Tests one-way P2P messaging between two peers. A shared,
- * ordered connection is used and many concurrent tasks
- * compete on the sending side. Tests with TLS enabled
- * to exercise ByteBufferSharing and friends.
+ * Tests one-way P2P messaging between two peers.
+ * Many concurrent tasks compete on the sending side.
+ * The main purpose of the test is to exercise
+ * ByteBufferSharing and friends.
+ *
+ * Tests combinations of: conserve-sockets true/false,
+ * TLS on/off, and socket-buffer-size for sender
+ * and receiver both set to the default (and equal)
+ * and set to the sender's buffer twice as big as the
+ * receiver's buffer.
+ *
*/
@Category({MembershipTest.class})
+@RunWith(GeodeParamsRunner.class)
public class P2PMessagingConcurrencyDUnitTest {
// how many messages will each sender generate?
@@ -71,6 +82,8 @@ public class P2PMessagingConcurrencyDUnitTest {
// random seed
private static final int RANDOM_SEED = 1234;
+ private static Properties securityProperties;
+
@Rule
public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
@@ -87,21 +100,56 @@ public class P2PMessagingConcurrencyDUnitTest {
*/
private static LongAdder bytesTransferredAdder;
- @Before
- public void before() throws GeneralSecurityException, IOException {
- final Properties configuration = gemFireConfiguration();
+ private void configure(
+ final boolean conserveSockets,
+ final boolean useTLS,
+ final int sendSocketBufferSize,
+ final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+ final Properties senderConfiguration =
+ gemFireConfiguration(conserveSockets, useTLS, sendSocketBufferSize);
+ final Properties receiverConfiguration =
+ gemFireConfiguration(conserveSockets, useTLS, receiveSocketBufferSize);
final MemberVM locator =
clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
- x -> x.withProperties(configuration).withConnectionToLocator()
+ x -> x.withProperties(senderConfiguration).withConnectionToLocator()
.withoutClusterConfigurationService().withoutManagementRestService());
- sender = clusterStartupRule.startServerVM(1, configuration, locator.getPort());
- receiver = clusterStartupRule.startServerVM(2, configuration, locator.getPort());
+ sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+ receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
}
@Test
- public void testP2PMessagingWithTLS() {
+ @Parameters({
+ /*
+ * all combinations of flags with buffer sizes:
+ * (equal), larger/smaller, smaller/larger, minimal
+ */
+ "true, true, 32768, 32768",
+ "true, true, 65536, 32768",
+ "true, true, 32768, 65536",
+ "true, true, 1024, 1024",
+ "true, false, 32768, 32768",
+ "true, false, 65536, 32768",
+ "true, false, 32768, 65536",
+ "true, false, 1024, 1024",
+ "false, true, 32768, 32768",
+ "false, true, 65536, 32768",
+ "false, true, 32768, 65536",
+ "false, true, 1024, 1024",
+ "false, false, 32768, 32768",
+ "false, false, 65536, 32768",
+ "false, false, 32768, 65536",
+ "false, false, 1024, 1024",
+ })
+ public void testP2PMessaging(
+ final boolean conserveSockets,
+ final boolean useTLS,
+ final int sendSocketBufferSize,
+ final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+ configure(conserveSockets, useTLS, sendSocketBufferSize, receiveSocketBufferSize);
final InternalDistributedMember receiverMember =
receiver.invoke(() -> {
@@ -172,10 +220,16 @@ public class P2PMessagingConcurrencyDUnitTest {
});
- final long bytesSent = sender.invoke(() -> bytesTransferredAdder.sum());
- final long bytesReceived = receiver.invoke(() -> bytesTransferredAdder.sum());
+ final long bytesSent = getByteCount(sender);
- assertThat(bytesReceived).as("bytes received != bytes sent").isEqualTo(bytesSent);
+ await().untilAsserted(
+ () -> assertThat(getByteCount(receiver))
+ .as("bytes received != bytes sent")
+ .isEqualTo(bytesSent));
+ }
+
+ private long getByteCount(final MemberVM member) {
+ return member.invoke(() -> bytesTransferredAdder.sum());
}
private static ClusterDistributionManager getCDM() {
@@ -245,7 +299,7 @@ public class P2PMessagingConcurrencyDUnitTest {
throws IOException, ClassNotFoundException {
super.fromData(in, context);
- final int messageId = in.readInt();
+ messageId = in.readInt();
final int length = in.readInt();
@@ -263,10 +317,19 @@ public class P2PMessagingConcurrencyDUnitTest {
}
@NotNull
- private static Properties gemFireConfiguration()
+ private static Properties gemFireConfiguration(
+ final boolean conserveSockets, final boolean useTLS,
+ final int socketBufferSize)
throws GeneralSecurityException, IOException {
- final Properties props = securityProperties();
+ final Properties props;
+ if (useTLS) {
+ props = securityProperties();
+ } else {
+ props = new Properties();
+ }
+
+ props.setProperty("socket-buffer-size", String.valueOf(socketBufferSize));
/*
* This is something we intend to test!
@@ -276,29 +339,32 @@ public class P2PMessagingConcurrencyDUnitTest {
*
* careful: if you set a boolean it doesn't take hold! setting a String
*/
- props.setProperty("conserve-sockets", "true");
+ props.setProperty("conserve-sockets", String.valueOf(conserveSockets));
return props;
}
@NotNull
private static Properties securityProperties() throws GeneralSecurityException, IOException {
- final CertificateMaterial ca = new CertificateBuilder()
- .commonName("Test CA")
- .isCA()
- .generate();
-
- final CertificateMaterial serverCertificate = new CertificateBuilder()
- .commonName("member")
- .issuedBy(ca)
- .generate();
-
- final CertStores memberStore = new CertStores("member");
- memberStore.withCertificate("member", serverCertificate);
- memberStore.trust("ca", ca);
- // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
- final Properties props = memberStore.propertiesWith("all", false, false);
- return props;
+ // subsequent calls must return the same value so members agree on credentials
+ if (securityProperties == null) {
+ final CertificateMaterial ca = new CertificateBuilder()
+ .commonName("Test CA")
+ .isCA()
+ .generate();
+
+ final CertificateMaterial serverCertificate = new CertificateBuilder()
+ .commonName("member")
+ .issuedBy(ca)
+ .generate();
+
+ final CertStores memberStore = new CertStores("member");
+ memberStore.withCertificate("member", serverCertificate);
+ memberStore.trust("ca", ca);
+ // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+ securityProperties = memberStore.propertiesWith("all", false, false);
+ }
+ return securityProperties;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 0cd352f..0a3a14e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2770,6 +2770,9 @@ public class Connection implements Runnable {
/**
* processes the current NIO buffer. If there are complete messages in the buffer, they are
* deserialized and passed to TCPConduit for further processing
+ *
+ * pre-condition: inputBuffer (from inputSharing.getBuffer()) is in WRITABLE mode
+ * post-condition: inputBuffer is in WRITABLE mode
*/
private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
throws ConnectionException, IOException {
@@ -2846,12 +2849,12 @@ public class Connection implements Runnable {
"Allocating larger network read buffer, new size is {} old size was {}.",
allocSize, oldBufferSize);
inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
+ makeReadableBufferWriteable(inputBuffer);
} else {
if (inputBuffer.position() != 0) {
inputBuffer.compact();
} else {
- inputBuffer.position(inputBuffer.limit());
- inputBuffer.limit(inputBuffer.capacity());
+ makeReadableBufferWriteable(inputBuffer);
}
}
}
@@ -2865,6 +2868,11 @@ public class Connection implements Runnable {
}
}
+ private void makeReadableBufferWriteable(final ByteBuffer inputBuffer) {
+ inputBuffer.position(inputBuffer.limit());
+ inputBuffer.limit(inputBuffer.capacity());
+ }
+
private boolean readHandshakeForReceiver(final DataInput dis) {
try {
checkHandshakeInitialByte(dis);