You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/25 00:38:30 UTC

[geode] branch support/1.12 updated (d979357 -> 7ec6f67)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a change to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from d979357  GEODE-9838: Log key info for deserialization issue while index update (#7136)
     new e8cb3ec  GEODE-9825: processInputBuffer resize retains data (#7131)
     new 7ec6f67  GEODE-9825: minus GEODE-9713

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/P2PMessagingConcurrencyDUnitTest.java | 370 +++++++++++++++++++++
 .../org/apache/geode/internal/tcp/Connection.java  |  12 +-
 2 files changed, 380 insertions(+), 2 deletions(-)
 create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java

[geode] 02/02: GEODE-9825: minus GEODE-9713

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7ec6f679f4fdc394eb053f98726d70b42175ef49
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Wed Nov 24 12:12:38 2021 -0800

    GEODE-9825: minus GEODE-9713
---
 .../geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
index 0d7c2d3..4c8677c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -89,7 +89,7 @@ public class P2PMessagingConcurrencyDUnitTest {
 
   @ClassRule
   public static final DistributedExecutorServiceRule senderExecutorServiceRule =
-      new DistributedExecutorServiceRule(SENDER_COUNT, 3);
+      new DistributedExecutorServiceRule(SENDER_COUNT);
 
   private MemberVM sender;
   private MemberVM receiver;

[geode] 01/02: GEODE-9825: processInputBuffer resize retains data (#7131)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit e8cb3ecc6582f1494cce2d9093b1e068fd128968
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Nov 23 08:29:11 2021 -0800

    GEODE-9825: processInputBuffer resize retains data (#7131)
    
    (cherry picked from commit fb142e1bbd42d6af2463fd9b9b49ef3e5519cfcb)
    (cherry picked from commit 62bc9aee3f929103411fc2dbe66cc750e56c06c1)
---
 .../internal/P2PMessagingConcurrencyDUnitTest.java | 370 +++++++++++++++++++++
 .../org/apache/geode/internal/tcp/Connection.java  |  12 +-
 2 files changed, 380 insertions(+), 2 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
new file mode 100644
index 0000000..0d7c2d3
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.distributed.internal;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.jetbrains.annotations.NotNull;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.version.VersionManager;
+
+/**
+ * Tests one-way P2P messaging between two peers.
+ * Many concurrent tasks compete on the sending side.
+ * The main purpose of the test is to exercise
+ * ByteBufferSharing and friends.
+ *
+ * Tests combinations of: conserve-sockets true/false,
+ * TLS on/off, and socket-buffer-size for sender
+ * and receiver both set to the default (and equal)
+ * and set to the sender's buffer twice as big as the
+ * receiver's buffer.
+ *
+ */
+@Category({MembershipTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class P2PMessagingConcurrencyDUnitTest {
+
+  // how many messages will each sender generate?
+  private static final int MESSAGES_PER_SENDER = 1_000;
+
+  // number of concurrent (sending) tasks to run
+  private static final int SENDER_COUNT = 10;
+
+  // (exclusive) upper bound of random message size, in bytes
+  private static final int LARGEST_MESSAGE_BOUND = 32 * 1024 + 2; // 32KiB + 2
+
+  // random seed
+  private static final int RANDOM_SEED = 1234;
+
+  private static Properties securityProperties;
+
+  @Rule
+  public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  @ClassRule
+  public static final DistributedExecutorServiceRule senderExecutorServiceRule =
+      new DistributedExecutorServiceRule(SENDER_COUNT, 3);
+
+  private MemberVM sender;
+  private MemberVM receiver;
+
+  /*
+   * bytes sent on sender JVM, bytes received on receiver JVM
+   * (not used in test JVM)
+   */
+  private static LongAdder bytesTransferredAdder;
+
+  private void configure(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    final Properties senderConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, sendSocketBufferSize);
+    final Properties receiverConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, receiveSocketBufferSize);
+
+    final MemberVM locator =
+        clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
+            x -> x.withProperties(senderConfiguration).withConnectionToLocator()
+                .withoutClusterConfigurationService().withoutManagementRestService());
+
+    sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+    receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
+  }
+
+  @Test
+  @Parameters({
+      /*
+       * all combinations of flags with buffer sizes:
+       * (equal), larger/smaller, smaller/larger, minimal
+       */
+      "true, true, 32768, 32768",
+      "true, true, 65536, 32768",
+      "true, true, 32768, 65536",
+      "true, true, 1024, 1024",
+      "true, false, 32768, 32768",
+      "true, false, 65536, 32768",
+      "true, false, 32768, 65536",
+      "true, false, 1024, 1024",
+      "false, true, 32768, 32768",
+      "false, true, 65536, 32768",
+      "false, true, 32768, 65536",
+      "false, true, 1024, 1024",
+      "false, false, 32768, 32768",
+      "false, false, 65536, 32768",
+      "false, false, 32768, 65536",
+      "false, false, 1024, 1024",
+  })
+  public void testP2PMessaging(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    configure(conserveSockets, useTLS, sendSocketBufferSize, receiveSocketBufferSize);
+
+    final InternalDistributedMember receiverMember =
+        receiver.invoke(() -> {
+
+          bytesTransferredAdder = new LongAdder();
+
+          final ClusterDistributionManager cdm = getCDM();
+          final InternalDistributedMember localMember = cdm.getDistribution().getLocalMember();
+          return localMember;
+
+        });
+
+    sender.invoke(() -> {
+
+      bytesTransferredAdder = new LongAdder();
+
+      final ClusterDistributionManager cdm = getCDM();
+      final Random random = new Random(RANDOM_SEED);
+      final AtomicInteger nextSenderId = new AtomicInteger();
+
+      /*
+       * When this comment was written DistributedExecutorServiceRule's
+       * getExecutorService had no option to specify the number of threads.
+       * If it had we might have liked to specify the number of CPU cores.
+       * In an ideal world we'd want only as many threads as CPUs here.
+       * OTOH the P2P messaging system at the time this comment was written,
+       * used blocking I/O, so we were not, as it turns out, living in that
+       * ideal world.
+       */
+      final ExecutorService executor = senderExecutorServiceRule.getExecutorService();
+
+      final CountDownLatch startLatch = new CountDownLatch(SENDER_COUNT);
+      final CountDownLatch stopLatch = new CountDownLatch(SENDER_COUNT);
+      final LongAdder failedRecipientCount = new LongAdder();
+
+      final Runnable doSending = () -> {
+        final int senderId = nextSenderId.getAndIncrement();
+        try {
+          startLatch.countDown();
+          startLatch.await();
+        } catch (final InterruptedException e) {
+          throw new RuntimeException("doSending failed", e);
+        }
+        final int firstMessageId = senderId * SENDER_COUNT;
+        for (int messageId = firstMessageId; messageId < firstMessageId
+            + MESSAGES_PER_SENDER; messageId++) {
+          final TestMessage msg = new TestMessage(receiverMember, random, messageId);
+
+          /*
+           * HERE is the Geode API entrypoint we intend to test (putOutgoing()).
+           */
+          final Set<InternalDistributedMember> failedRecipients = cdm.putOutgoing(msg);
+
+          if (failedRecipients != null) {
+            failedRecipientCount.add(failedRecipients.size());
+          }
+        }
+        stopLatch.countDown();
+      };
+
+      for (int i = 0; i < SENDER_COUNT; ++i) {
+        executor.submit(doSending);
+      }
+
+      stopLatch.await();
+
+      assertThat(failedRecipientCount.sum()).as("message delivery failed N times").isZero();
+
+    });
+
+    final long bytesSent = getByteCount(sender);
+
+    await().untilAsserted(
+        () -> assertThat(getByteCount(receiver))
+            .as("bytes received != bytes sent")
+            .isEqualTo(bytesSent));
+  }
+
+  private long getByteCount(final MemberVM member) {
+    return member.invoke(() -> bytesTransferredAdder.sum());
+  }
+
+  private static ClusterDistributionManager getCDM() {
+    return (ClusterDistributionManager) ((InternalCache) CacheFactory.getAnyInstance())
+        .getDistributionManager();
+  }
+
+  private static class TestMessage extends DistributionMessage {
+
+    /*
+     * When this comment was written, messageId wasn't used for anything.
+     * The field was added during a misguided attempt to add SHA-256
+     * digest verification on sender and receiver. Then I figured out
+     * that there's no way to parallelize that (for the sender) so
+     * I settled for merely validating the number of bytes transferred.
+     * Left the field here in case it comes in handy later.
+     */
+    private volatile int messageId;
+    private volatile Random random;
+
+    TestMessage(final InternalDistributedMember receiver,
+        final Random random, final int messageId) {
+      setRecipient(receiver);
+      this.random = random;
+      this.messageId = messageId;
+    }
+
+    // necessary for deserialization
+    public TestMessage() {
+      random = null;
+      messageId = 0;
+    }
+
+    @Override
+    public int getProcessorType() {
+      return OperationExecutors.STANDARD_EXECUTOR;
+    }
+
+    @Override
+    protected void process(final ClusterDistributionManager dm) {}
+
+    @Override
+    public void toData(final DataOutput out, final SerializationContext context)
+        throws IOException {
+      super.toData(out, context);
+
+      out.writeInt(messageId);
+
+      final int length = random.nextInt(LARGEST_MESSAGE_BOUND);
+
+      out.writeInt(length);
+
+      final byte[] payload = new byte[length];
+      random.nextBytes(payload);
+
+      out.write(payload);
+
+      /*
+       * the LongAdder should ensure that we don't introduce any (much)
+       * synchronization with other concurrent tasks here
+       */
+      bytesTransferredAdder.add(length);
+    }
+
+    @Override
+    public void fromData(final DataInput in, final DeserializationContext context)
+        throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+
+      messageId = in.readInt();
+
+      final int length = in.readInt();
+
+      final byte[] payload = new byte[length];
+
+      in.readFully(payload);
+
+      bytesTransferredAdder.add(length);
+    }
+
+    @Override
+    public int getDSFID() {
+      return NO_FIXED_ID; // for testing only!
+    }
+  }
+
+  @NotNull
+  private static Properties gemFireConfiguration(
+      final boolean conserveSockets, final boolean useTLS,
+      final int socketBufferSize)
+      throws GeneralSecurityException, IOException {
+
+    final Properties props;
+    if (useTLS) {
+      props = securityProperties();
+    } else {
+      props = new Properties();
+    }
+
+    props.setProperty("socket-buffer-size", String.valueOf(socketBufferSize));
+
+    /*
+     * This is something we intend to test!
+     * Send all messages, from all threads, on a single socket per recipient.
+     * maintenance tip: to see what kind of connection you're getting you can
+     * uncomment logging over in DirectChannel.sendToMany()
+     *
+     * careful: if you set a boolean it doesn't take hold! setting a String
+     */
+    props.setProperty("conserve-sockets", String.valueOf(conserveSockets));
+
+    return props;
+  }
+
+  @NotNull
+  private static Properties securityProperties() throws GeneralSecurityException, IOException {
+    // subsequent calls must return the same value so members agree on credentials
+    if (securityProperties == null) {
+      final CertificateMaterial ca = new CertificateBuilder()
+          .commonName("Test CA")
+          .isCA()
+          .generate();
+
+      final CertificateMaterial serverCertificate = new CertificateBuilder()
+          .commonName("member")
+          .issuedBy(ca)
+          .generate();
+
+      final CertStores memberStore = new CertStores("member");
+      memberStore.withCertificate("member", serverCertificate);
+      memberStore.trust("ca", ca);
+      // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+      securityProperties = memberStore.propertiesWith("all", false, false);
+    }
+    return securityProperties;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 107aa9f..476fc24 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2756,6 +2756,9 @@ public class Connection implements Runnable {
   /**
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
+   *
+   * pre-condition: inputBuffer (from inputSharing.getBuffer()) is in WRITABLE mode
+   * post-condition: inputBuffer is in WRITABLE mode
    */
   private void processInputBuffer() throws ConnectionException, IOException {
     try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
@@ -2831,12 +2834,12 @@ public class Connection implements Runnable {
                       "Allocating larger network read buffer, new size is {} old size was {}.",
                       allocSize, oldBufferSize);
                   inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
+                  makeReadableBufferWriteable(inputBuffer);
                 } else {
                   if (inputBuffer.position() != 0) {
                     inputBuffer.compact();
                   } else {
-                    inputBuffer.position(inputBuffer.limit());
-                    inputBuffer.limit(inputBuffer.capacity());
+                    makeReadableBufferWriteable(inputBuffer);
                   }
                 }
               }
@@ -2850,6 +2853,11 @@ public class Connection implements Runnable {
     }
   }
 
+  private void makeReadableBufferWriteable(final ByteBuffer inputBuffer) {
+    inputBuffer.position(inputBuffer.limit());
+    inputBuffer.limit(inputBuffer.capacity());
+  }
+
   private boolean readHandshakeForReceiver(DataInput dis) {
     try {
       byte b = dis.readByte();