You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/25 00:34:03 UTC

[geode] 02/02: GEODE-9825: processInputBuffer resize retains data (#7131)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0a69cc87bb8a43b18fe7e32cf588b7344ec37580
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Nov 23 08:29:11 2021 -0800

    GEODE-9825: processInputBuffer resize retains data (#7131)
    
    (cherry picked from commit fb142e1bbd42d6af2463fd9b9b49ef3e5519cfcb)
---
 .../internal/P2PMessagingConcurrencyDUnitTest.java | 370 +++++++++++++++++++++
 .../org/apache/geode/internal/tcp/Connection.java  |  12 +-
 2 files changed, 380 insertions(+), 2 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
new file mode 100644
index 0000000..0d7c2d3
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.distributed.internal;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.jetbrains.annotations.NotNull;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.version.VersionManager;
+
+/**
+ * Tests one-way P2P messaging between two peers.
+ * Many concurrent tasks compete on the sending side.
+ * The main purpose of the test is to exercise
+ * ByteBufferSharing and friends.
+ *
+ * Tests combinations of: conserve-sockets true/false,
+ * TLS on/off, and socket-buffer-size for sender
+ * and receiver both set to the default (and equal)
+ * and set to the sender's buffer twice as big as the
+ * receiver's buffer.
+ *
+ */
+@Category({MembershipTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class P2PMessagingConcurrencyDUnitTest {
+
+  // how many messages will each sender generate?
+  private static final int MESSAGES_PER_SENDER = 1_000;
+
+  // number of concurrent (sending) tasks to run
+  private static final int SENDER_COUNT = 10;
+
+  // (exclusive) upper bound of random message size, in bytes
+  private static final int LARGEST_MESSAGE_BOUND = 32 * 1024 + 2; // 32KiB + 2
+
+  // random seed
+  private static final int RANDOM_SEED = 1234;
+
+  private static Properties securityProperties;
+
+  @Rule
+  public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  @ClassRule
+  public static final DistributedExecutorServiceRule senderExecutorServiceRule =
+      new DistributedExecutorServiceRule(SENDER_COUNT, 3);
+
+  private MemberVM sender;
+  private MemberVM receiver;
+
+  /*
+   * bytes sent on sender JVM, bytes received on receiver JVM
+   * (not used in test JVM)
+   */
+  private static LongAdder bytesTransferredAdder;
+
+  private void configure(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    final Properties senderConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, sendSocketBufferSize);
+    final Properties receiverConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, receiveSocketBufferSize);
+
+    final MemberVM locator =
+        clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
+            x -> x.withProperties(senderConfiguration).withConnectionToLocator()
+                .withoutClusterConfigurationService().withoutManagementRestService());
+
+    sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+    receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
+  }
+
+  @Test
+  @Parameters({
+      /*
+       * all combinations of flags with buffer sizes:
+       * (equal), larger/smaller, smaller/larger, minimal
+       */
+      "true, true, 32768, 32768",
+      "true, true, 65536, 32768",
+      "true, true, 32768, 65536",
+      "true, true, 1024, 1024",
+      "true, false, 32768, 32768",
+      "true, false, 65536, 32768",
+      "true, false, 32768, 65536",
+      "true, false, 1024, 1024",
+      "false, true, 32768, 32768",
+      "false, true, 65536, 32768",
+      "false, true, 32768, 65536",
+      "false, true, 1024, 1024",
+      "false, false, 32768, 32768",
+      "false, false, 65536, 32768",
+      "false, false, 32768, 65536",
+      "false, false, 1024, 1024",
+  })
+  public void testP2PMessaging(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    configure(conserveSockets, useTLS, sendSocketBufferSize, receiveSocketBufferSize);
+
+    final InternalDistributedMember receiverMember =
+        receiver.invoke(() -> {
+
+          bytesTransferredAdder = new LongAdder();
+
+          final ClusterDistributionManager cdm = getCDM();
+          final InternalDistributedMember localMember = cdm.getDistribution().getLocalMember();
+          return localMember;
+
+        });
+
+    sender.invoke(() -> {
+
+      bytesTransferredAdder = new LongAdder();
+
+      final ClusterDistributionManager cdm = getCDM();
+      final Random random = new Random(RANDOM_SEED);
+      final AtomicInteger nextSenderId = new AtomicInteger();
+
+      /*
+       * When this comment was written DistributedExecutorServiceRule's
+       * getExecutorService had no option to specify the number of threads.
+       * If it had we might have liked to specify the number of CPU cores.
+       * In an ideal world we'd want only as many threads as CPUs here.
+       * OTOH the P2P messaging system at the time this comment was written,
+       * used blocking I/O, so we were not, as it turns out, living in that
+       * ideal world.
+       */
+      final ExecutorService executor = senderExecutorServiceRule.getExecutorService();
+
+      final CountDownLatch startLatch = new CountDownLatch(SENDER_COUNT);
+      final CountDownLatch stopLatch = new CountDownLatch(SENDER_COUNT);
+      final LongAdder failedRecipientCount = new LongAdder();
+
+      final Runnable doSending = () -> {
+        final int senderId = nextSenderId.getAndIncrement();
+        try {
+          startLatch.countDown();
+          startLatch.await();
+        } catch (final InterruptedException e) {
+          throw new RuntimeException("doSending failed", e);
+        }
+        final int firstMessageId = senderId * SENDER_COUNT;
+        for (int messageId = firstMessageId; messageId < firstMessageId
+            + MESSAGES_PER_SENDER; messageId++) {
+          final TestMessage msg = new TestMessage(receiverMember, random, messageId);
+
+          /*
+           * HERE is the Geode API entrypoint we intend to test (putOutgoing()).
+           */
+          final Set<InternalDistributedMember> failedRecipients = cdm.putOutgoing(msg);
+
+          if (failedRecipients != null) {
+            failedRecipientCount.add(failedRecipients.size());
+          }
+        }
+        stopLatch.countDown();
+      };
+
+      for (int i = 0; i < SENDER_COUNT; ++i) {
+        executor.submit(doSending);
+      }
+
+      stopLatch.await();
+
+      assertThat(failedRecipientCount.sum()).as("message delivery failed N times").isZero();
+
+    });
+
+    final long bytesSent = getByteCount(sender);
+
+    await().untilAsserted(
+        () -> assertThat(getByteCount(receiver))
+            .as("bytes received != bytes sent")
+            .isEqualTo(bytesSent));
+  }
+
+  private long getByteCount(final MemberVM member) {
+    return member.invoke(() -> bytesTransferredAdder.sum());
+  }
+
+  private static ClusterDistributionManager getCDM() {
+    return (ClusterDistributionManager) ((InternalCache) CacheFactory.getAnyInstance())
+        .getDistributionManager();
+  }
+
+  private static class TestMessage extends DistributionMessage {
+
+    /*
+     * When this comment was written, messageId wasn't used for anything.
+     * The field was added during a misguided attempt to add SHA-256
+     * digest verification on sender and receiver. Then I figured out
+     * that there's no way to parallelize that (for the sender) so
+     * I settled for merely validating the number of bytes transferred.
+     * Left the field here in case it comes in handy later.
+     */
+    private volatile int messageId;
+    private volatile Random random;
+
+    TestMessage(final InternalDistributedMember receiver,
+        final Random random, final int messageId) {
+      setRecipient(receiver);
+      this.random = random;
+      this.messageId = messageId;
+    }
+
+    // necessary for deserialization
+    public TestMessage() {
+      random = null;
+      messageId = 0;
+    }
+
+    @Override
+    public int getProcessorType() {
+      return OperationExecutors.STANDARD_EXECUTOR;
+    }
+
+    @Override
+    protected void process(final ClusterDistributionManager dm) {}
+
+    @Override
+    public void toData(final DataOutput out, final SerializationContext context)
+        throws IOException {
+      super.toData(out, context);
+
+      out.writeInt(messageId);
+
+      final int length = random.nextInt(LARGEST_MESSAGE_BOUND);
+
+      out.writeInt(length);
+
+      final byte[] payload = new byte[length];
+      random.nextBytes(payload);
+
+      out.write(payload);
+
+      /*
+       * the LongAdder should ensure that we don't introduce any (much)
+       * synchronization with other concurrent tasks here
+       */
+      bytesTransferredAdder.add(length);
+    }
+
+    @Override
+    public void fromData(final DataInput in, final DeserializationContext context)
+        throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+
+      messageId = in.readInt();
+
+      final int length = in.readInt();
+
+      final byte[] payload = new byte[length];
+
+      in.readFully(payload);
+
+      bytesTransferredAdder.add(length);
+    }
+
+    @Override
+    public int getDSFID() {
+      return NO_FIXED_ID; // for testing only!
+    }
+  }
+
+  @NotNull
+  private static Properties gemFireConfiguration(
+      final boolean conserveSockets, final boolean useTLS,
+      final int socketBufferSize)
+      throws GeneralSecurityException, IOException {
+
+    final Properties props;
+    if (useTLS) {
+      props = securityProperties();
+    } else {
+      props = new Properties();
+    }
+
+    props.setProperty("socket-buffer-size", String.valueOf(socketBufferSize));
+
+    /*
+     * This is something we intend to test!
+     * Send all messages, from all threads, on a single socket per recipient.
+     * maintenance tip: to see what kind of connection you're getting you can
+     * uncomment logging over in DirectChannel.sendToMany()
+     *
+     * careful: if you set a boolean it doesn't take hold! setting a String
+     */
+    props.setProperty("conserve-sockets", String.valueOf(conserveSockets));
+
+    return props;
+  }
+
+  @NotNull
+  private static Properties securityProperties() throws GeneralSecurityException, IOException {
+    // subsequent calls must return the same value so members agree on credentials
+    if (securityProperties == null) {
+      final CertificateMaterial ca = new CertificateBuilder()
+          .commonName("Test CA")
+          .isCA()
+          .generate();
+
+      final CertificateMaterial serverCertificate = new CertificateBuilder()
+          .commonName("member")
+          .issuedBy(ca)
+          .generate();
+
+      final CertStores memberStore = new CertStores("member");
+      memberStore.withCertificate("member", serverCertificate);
+      memberStore.trust("ca", ca);
+      // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+      securityProperties = memberStore.propertiesWith("all", false, false);
+    }
+    return securityProperties;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 44dcfda..82e3b3d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2766,6 +2766,9 @@ public class Connection implements Runnable {
   /**
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
+   *
+   * pre-condition: inputBuffer (from inputSharing.getBuffer()) is in WRITABLE mode
+   * post-condition: inputBuffer is in WRITABLE mode
    */
   private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
       throws ConnectionException, IOException {
@@ -2842,12 +2845,12 @@ public class Connection implements Runnable {
                       "Allocating larger network read buffer, new size is {} old size was {}.",
                       allocSize, oldBufferSize);
                   inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
+                  makeReadableBufferWriteable(inputBuffer);
                 } else {
                   if (inputBuffer.position() != 0) {
                     inputBuffer.compact();
                   } else {
-                    inputBuffer.position(inputBuffer.limit());
-                    inputBuffer.limit(inputBuffer.capacity());
+                    makeReadableBufferWriteable(inputBuffer);
                   }
                 }
               }
@@ -2861,6 +2864,11 @@ public class Connection implements Runnable {
     }
   }
 
+  private void makeReadableBufferWriteable(final ByteBuffer inputBuffer) {
+    inputBuffer.position(inputBuffer.limit());
+    inputBuffer.limit(inputBuffer.capacity());
+  }
+
   private boolean readHandshakeForReceiver(DataInput dis) {
     try {
       byte b = dis.readByte();