You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/23 16:30:26 UTC

[geode] branch develop updated: GEODE-9825: processInputBuffer resize retains data (#7131)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new fb142e1  GEODE-9825: processInputBuffer resize retains data (#7131)
fb142e1 is described below

commit fb142e1bbd42d6af2463fd9b9b49ef3e5519cfcb
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Nov 23 08:29:11 2021 -0800

    GEODE-9825: processInputBuffer resize retains data (#7131)
---
 .../internal/P2PMessagingConcurrencyDUnitTest.java | 136 +++++++++++++++------
 .../org/apache/geode/internal/tcp/Connection.java  |  12 +-
 2 files changed, 111 insertions(+), 37 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
index e54191a..c204777 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -15,6 +15,7 @@
 
 package org.apache.geode.distributed.internal;
 
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.DataInput;
@@ -29,12 +30,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 
+import junitparams.Parameters;
 import org.jetbrains.annotations.NotNull;
-import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
 
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.ssl.CertStores;
@@ -48,15 +50,24 @@ import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
 import org.apache.geode.test.version.VersionManager;
 
 /**
- * Tests one-way P2P messaging between two peers. A shared,
- * ordered connection is used and many concurrent tasks
- * compete on the sending side. Tests with TLS enabled
- * to exercise ByteBufferSharing and friends.
+ * Tests one-way P2P messaging between two peers.
+ * Many concurrent tasks compete on the sending side.
+ * The main purpose of the test is to exercise
+ * ByteBufferSharing and friends.
+ *
+ * Tests combinations of: conserve-sockets true/false,
+ * TLS on/off, and socket-buffer-size for sender
+ * and receiver both set to the default (and equal)
+ * and set to the sender's buffer twice as big as the
+ * receiver's buffer.
+ *
  */
 @Category({MembershipTest.class})
+@RunWith(GeodeParamsRunner.class)
 public class P2PMessagingConcurrencyDUnitTest {
 
   // how many messages will each sender generate?
@@ -71,6 +82,8 @@ public class P2PMessagingConcurrencyDUnitTest {
   // random seed
   private static final int RANDOM_SEED = 1234;
 
+  private static Properties securityProperties;
+
   @Rule
   public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
 
@@ -87,21 +100,56 @@ public class P2PMessagingConcurrencyDUnitTest {
    */
   private static LongAdder bytesTransferredAdder;
 
-  @Before
-  public void before() throws GeneralSecurityException, IOException {
-    final Properties configuration = gemFireConfiguration();
+  private void configure(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    final Properties senderConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, sendSocketBufferSize);
+    final Properties receiverConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, receiveSocketBufferSize);
 
     final MemberVM locator =
         clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
-            x -> x.withProperties(configuration).withConnectionToLocator()
+            x -> x.withProperties(senderConfiguration).withConnectionToLocator()
                 .withoutClusterConfigurationService().withoutManagementRestService());
 
-    sender = clusterStartupRule.startServerVM(1, configuration, locator.getPort());
-    receiver = clusterStartupRule.startServerVM(2, configuration, locator.getPort());
+    sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+    receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
   }
 
   @Test
-  public void testP2PMessagingWithTLS() {
+  @Parameters({
+      /*
+       * all combinations of flags with buffer sizes:
+       * (equal), larger/smaller, smaller/larger, minimal
+       */
+      "true, true, 32768, 32768",
+      "true, true, 65536, 32768",
+      "true, true, 32768, 65536",
+      "true, true, 1024, 1024",
+      "true, false, 32768, 32768",
+      "true, false, 65536, 32768",
+      "true, false, 32768, 65536",
+      "true, false, 1024, 1024",
+      "false, true, 32768, 32768",
+      "false, true, 65536, 32768",
+      "false, true, 32768, 65536",
+      "false, true, 1024, 1024",
+      "false, false, 32768, 32768",
+      "false, false, 65536, 32768",
+      "false, false, 32768, 65536",
+      "false, false, 1024, 1024",
+  })
+  public void testP2PMessaging(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    configure(conserveSockets, useTLS, sendSocketBufferSize, receiveSocketBufferSize);
 
     final InternalDistributedMember receiverMember =
         receiver.invoke(() -> {
@@ -172,10 +220,16 @@ public class P2PMessagingConcurrencyDUnitTest {
 
     });
 
-    final long bytesSent = sender.invoke(() -> bytesTransferredAdder.sum());
-    final long bytesReceived = receiver.invoke(() -> bytesTransferredAdder.sum());
+    final long bytesSent = getByteCount(sender);
 
-    assertThat(bytesReceived).as("bytes received != bytes sent").isEqualTo(bytesSent);
+    await().untilAsserted(
+        () -> assertThat(getByteCount(receiver))
+            .as("bytes received != bytes sent")
+            .isEqualTo(bytesSent));
+  }
+
+  private long getByteCount(final MemberVM member) {
+    return member.invoke(() -> bytesTransferredAdder.sum());
   }
 
   private static ClusterDistributionManager getCDM() {
@@ -245,7 +299,7 @@ public class P2PMessagingConcurrencyDUnitTest {
         throws IOException, ClassNotFoundException {
       super.fromData(in, context);
 
-      final int messageId = in.readInt();
+      messageId = in.readInt();
 
       final int length = in.readInt();
 
@@ -263,10 +317,19 @@ public class P2PMessagingConcurrencyDUnitTest {
   }
 
   @NotNull
-  private static Properties gemFireConfiguration()
+  private static Properties gemFireConfiguration(
+      final boolean conserveSockets, final boolean useTLS,
+      final int socketBufferSize)
       throws GeneralSecurityException, IOException {
 
-    final Properties props = securityProperties();
+    final Properties props;
+    if (useTLS) {
+      props = securityProperties();
+    } else {
+      props = new Properties();
+    }
+
+    props.setProperty("socket-buffer-size", String.valueOf(socketBufferSize));
 
     /*
      * This is something we intend to test!
@@ -276,29 +339,32 @@ public class P2PMessagingConcurrencyDUnitTest {
      *
      * careful: if you set a boolean it doesn't take hold! setting a String
      */
-    props.setProperty("conserve-sockets", "true");
+    props.setProperty("conserve-sockets", String.valueOf(conserveSockets));
 
     return props;
   }
 
   @NotNull
   private static Properties securityProperties() throws GeneralSecurityException, IOException {
-    final CertificateMaterial ca = new CertificateBuilder()
-        .commonName("Test CA")
-        .isCA()
-        .generate();
-
-    final CertificateMaterial serverCertificate = new CertificateBuilder()
-        .commonName("member")
-        .issuedBy(ca)
-        .generate();
-
-    final CertStores memberStore = new CertStores("member");
-    memberStore.withCertificate("member", serverCertificate);
-    memberStore.trust("ca", ca);
-    // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
-    final Properties props = memberStore.propertiesWith("all", false, false);
-    return props;
+    // subsequent calls must return the same value so members agree on credentials
+    if (securityProperties == null) {
+      final CertificateMaterial ca = new CertificateBuilder()
+          .commonName("Test CA")
+          .isCA()
+          .generate();
+
+      final CertificateMaterial serverCertificate = new CertificateBuilder()
+          .commonName("member")
+          .issuedBy(ca)
+          .generate();
+
+      final CertStores memberStore = new CertStores("member");
+      memberStore.withCertificate("member", serverCertificate);
+      memberStore.trust("ca", ca);
+      // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+      securityProperties = memberStore.propertiesWith("all", false, false);
+    }
+    return securityProperties;
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 0cd352f..0a3a14e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2770,6 +2770,9 @@ public class Connection implements Runnable {
   /**
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
+   *
+   * pre-condition: inputBuffer (from inputSharing.getBuffer()) is in WRITABLE mode
+   * post-condition: inputBuffer is in WRITABLE mode
    */
   private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
       throws ConnectionException, IOException {
@@ -2846,12 +2849,12 @@ public class Connection implements Runnable {
                       "Allocating larger network read buffer, new size is {} old size was {}.",
                       allocSize, oldBufferSize);
                   inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
+                  makeReadableBufferWriteable(inputBuffer);
                 } else {
                   if (inputBuffer.position() != 0) {
                     inputBuffer.compact();
                   } else {
-                    inputBuffer.position(inputBuffer.limit());
-                    inputBuffer.limit(inputBuffer.capacity());
+                    makeReadableBufferWriteable(inputBuffer);
                   }
                 }
               }
@@ -2865,6 +2868,11 @@ public class Connection implements Runnable {
     }
   }
 
+  private void makeReadableBufferWriteable(final ByteBuffer inputBuffer) {
+    inputBuffer.position(inputBuffer.limit());
+    inputBuffer.limit(inputBuffer.capacity());
+  }
+
   private boolean readHandshakeForReceiver(final DataInput dis) {
     try {
       checkHandshakeInitialByte(dis);