You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/25 00:34:01 UTC

[geode] branch support/1.14 updated (fd3672e -> 0a69cc8)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a change to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from fd3672e  GEODE-9838: Log key info for deserialization issue while index update (#7136)
     new d6d9a91  GEODE-9713: Support thread count in ExecutorService rules (#7002)
     new 0a69cc8  GEODE-9825: processInputBuffer resize retains data (#7131)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/P2PMessagingConcurrencyDUnitTest.java | 370 +++++++++++++++++++++
 .../org/apache/geode/internal/tcp/Connection.java  |  12 +-
 ...dExecutorServiceRuleLimitedThreadCountTest.java | 103 ++++++
 ...utedExecutorServiceRuleLimitedVmCountTest.java} |  14 +-
 ...xecutorServiceRuleUnlimitedThreadCountTest.java |  75 +++++
 .../rules/DistributedExecutorServiceRule.java      | 165 ++++++++-
 .../sanctioned-geode-dunit-serializables.txt       |   2 +-
 .../test/junit/rules/ExecutorServiceRule.java      |  69 +++-
 .../sanctioned-geode-junit-serializables.txt       |   2 +-
 9 files changed, 785 insertions(+), 27 deletions(-)
 create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
 create mode 100644 geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java
 copy geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/{DistributedRuleDistributedTest.java => DistributedExecutorServiceRuleLimitedVmCountTest.java} (73%)
 create mode 100644 geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java

[geode] 01/02: GEODE-9713: Support thread count in ExecutorService rules (#7002)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d6d9a9108150e2396ebe3ca91ee62c1e895cf040
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Oct 18 17:16:32 2021 -0700

    GEODE-9713: Support thread count in ExecutorService rules (#7002)
    
    Restores thread count support to ExecutorServiceRule, and adds it to
    DistributedExecutorServiceRule.
    
    PROBLEM
    
    ExecutorService rules currently create a newCachedThreadPool which
    creates new threads as needed.
    
    Some usages would benefit from the option of specifying a threadCount
    limit which would create a newFixedThreadPool that reuses a fixed
    number of threads.
    
    SOLUTION
    
    Add optional threadCount creation parameter to both ExecutorServiceRule
    and DistributedExecutorServiceRule.
    
    Creating a ExecutorService rule without a threadCount will still create a
    newCachedThreadPool. Using a threadCount will now create a
    newFixedThreadPool.
    
    (cherry picked from commit 636bea3fd14c634d2568ed49eba3b13f1797d1ff)
---
 ...dExecutorServiceRuleLimitedThreadCountTest.java | 103 +++++++++++++
 ...butedExecutorServiceRuleLimitedVmCountTest.java |  37 +++++
 ...xecutorServiceRuleUnlimitedThreadCountTest.java |  75 ++++++++++
 .../rules/DistributedExecutorServiceRule.java      | 165 ++++++++++++++++++++-
 .../sanctioned-geode-dunit-serializables.txt       |   2 +-
 .../test/junit/rules/ExecutorServiceRule.java      |  69 +++++++--
 .../sanctioned-geode-junit-serializables.txt       |   2 +-
 7 files changed, 434 insertions(+), 19 deletions(-)

diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java
new file mode 100644
index 0000000..9328d5e
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule.builder;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+@SuppressWarnings("serial")
+public class DistributedExecutorServiceRuleLimitedThreadCountTest implements Serializable {
+
+  private static final int THREAD_COUNT = 2;
+  private static final long TIMEOUT = getTimeout().toMinutes();
+  private static final TimeUnit UNIT = TimeUnit.MINUTES;
+  private static final AtomicInteger STARTED_TASKS = new AtomicInteger();
+  private static final AtomicInteger COMPLETED_TASKS = new AtomicInteger();
+  private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>();
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule = builder()
+      .threadCount(THREAD_COUNT).vmCount(1).build();
+
+  @Before
+  public void setUp() {
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      STARTED_TASKS.set(0);
+      COMPLETED_TASKS.set(0);
+      LATCH.set(new CountDownLatch(1));
+    }));
+  }
+
+  @Test
+  public void limitsRunningTasksToThreadCount() {
+    // start THREAD_COUNT threads to use up the executor's thread pool
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      for (int i = 1; i <= THREAD_COUNT; i++) {
+        executorServiceRule.submit(() -> {
+          // increment count of started tasks and use a LATCH to keep it running
+          STARTED_TASKS.incrementAndGet();
+          assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
+          COMPLETED_TASKS.incrementAndGet();
+        });
+      }
+
+      // count of started tasks should be the same as THREAD_COUNT
+      await().untilAsserted(() -> {
+        assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
+        assertThat(COMPLETED_TASKS.get()).isZero();
+      });
+
+      // try to start one more task, but it should end up queued instead of started
+      executorServiceRule.submit(() -> {
+        STARTED_TASKS.incrementAndGet();
+        assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
+        COMPLETED_TASKS.incrementAndGet();
+      });
+
+      // started tasks should still be the same as THREAD_COUNT
+      assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
+      assertThat(COMPLETED_TASKS.get()).isZero();
+
+      // number of threads running in executor should also be the same as THREAD_COUNT
+      assertThat(executorServiceRule.getThreads()).hasSize(THREAD_COUNT);
+
+      // open latch to let started tasks complete, and queued task should also start and finish
+      LATCH.get().countDown();
+
+      // all tasks should eventually complete as the executor threads finish tasks
+      await().untilAsserted(() -> {
+        assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
+        assertThat(COMPLETED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
+      });
+    }));
+  }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java
new file mode 100644
index 0000000..434b409
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.dunit.VM.getVMCount;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+public class DistributedExecutorServiceRuleLimitedVmCountTest {
+
+  private static final int VM_COUNT = 2;
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule =
+      new DistributedExecutorServiceRule(0, VM_COUNT);
+
+  @Test
+  public void limitsVmCount() {
+    assertThat(getVMCount()).isEqualTo(VM_COUNT);
+  }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java
new file mode 100644
index 0000000..c76cafc
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+@SuppressWarnings("serial")
+public class DistributedExecutorServiceRuleUnlimitedThreadCountTest implements Serializable {
+
+  private static final int PARALLEL_TASK_COUNT = 4;
+  private static final long TIMEOUT = getTimeout().toMinutes();
+  private static final TimeUnit UNIT = TimeUnit.MINUTES;
+  private static final AtomicBoolean COMPLETED = new AtomicBoolean();
+  private static final AtomicReference<CyclicBarrier> BARRIER = new AtomicReference<>();
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule =
+      new DistributedExecutorServiceRule(0, 1);
+
+  @Before
+  public void setUp() {
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      COMPLETED.set(false);
+      BARRIER.set(new CyclicBarrier(PARALLEL_TASK_COUNT, () -> COMPLETED.set(true)));
+    }));
+  }
+
+  @Test
+  public void doesNotLimitThreadCount() {
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      Collection<Future<Void>> tasks = new ArrayList<>();
+      for (int i = 1; i <= PARALLEL_TASK_COUNT; i++) {
+        tasks.add(executorServiceRule.submit(() -> {
+          BARRIER.get().await(TIMEOUT, UNIT);
+        }));
+      }
+      await().untilAsserted(() -> assertThat(COMPLETED.get()).isTrue());
+      for (Future<Void> task : tasks) {
+        assertThat(task).isDone();
+      }
+    }));
+  }
+}
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
index 44f7de5..1fff402 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
@@ -14,12 +14,15 @@
  */
 package org.apache.geode.test.dunit.rules;
 
+import static org.apache.geode.test.dunit.VM.DEFAULT_VM_COUNT;
+
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -27,17 +30,76 @@ import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule.ThrowingRunnable;
 
-@SuppressWarnings("unused")
+/**
+ * Every DUnit VM, including the JUnit Controller VM, has its own {@code ExecutorService}.
+ */
+@SuppressWarnings({"serial", "unused"})
 public class DistributedExecutorServiceRule extends AbstractDistributedRule {
 
   private static final AtomicReference<ExecutorServiceRule> delegate = new AtomicReference<>();
 
+  private final boolean enableAwaitTermination;
+  private final long awaitTerminationTimeout;
+  private final TimeUnit awaitTerminationTimeUnit;
+  private final boolean awaitTerminationBeforeShutdown;
+  private final boolean useShutdown;
+  private final boolean useShutdownNow;
+  private final int threadCount;
+
+  /**
+   * Returns a {@code Builder} to configure a new {@code ExecutorServiceRule}.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Constructs a {@code DistributedExecutorServiceRule} which performs {@code shutdownNow} and
+   * thread dump of its threads if any were left running during {@code tearDown}.
+   */
   public DistributedExecutorServiceRule() {
-    // default vmCount
+    this(new Builder().threadCount(0).vmCount(DEFAULT_VM_COUNT));
   }
 
-  public DistributedExecutorServiceRule(int vmCount) {
+  /**
+   * Constructs a {@code DistributedExecutorServiceRule} which performs {@code shutdownNow} and
+   * thread dump of its threads if any were left running during {@code tearDown}.
+   *
+   * @param threadCount The number of threads in the pool. Creates fixed thread pool if > 0; else
+   *        creates cached thread pool.
+   * @param vmCount specified number of VMs
+   */
+  public DistributedExecutorServiceRule(int threadCount, int vmCount) {
+    this(new Builder().threadCount(threadCount).vmCount(vmCount));
+  }
+
+  private DistributedExecutorServiceRule(Builder builder) {
+    this(builder.enableAwaitTermination,
+        builder.awaitTerminationTimeout,
+        builder.awaitTerminationTimeUnit,
+        builder.awaitTerminationBeforeShutdown,
+        builder.useShutdown,
+        builder.useShutdownNow,
+        builder.threadCount,
+        builder.vmCount);
+  }
+
+  private DistributedExecutorServiceRule(boolean enableAwaitTermination,
+      long awaitTerminationTimeout,
+      TimeUnit awaitTerminationTimeUnit,
+      boolean awaitTerminationBeforeShutdown,
+      boolean useShutdown,
+      boolean useShutdownNow,
+      int threadCount,
+      int vmCount) {
     super(vmCount);
+    this.enableAwaitTermination = enableAwaitTermination;
+    this.awaitTerminationTimeout = awaitTerminationTimeout;
+    this.awaitTerminationTimeUnit = awaitTerminationTimeUnit;
+    this.awaitTerminationBeforeShutdown = awaitTerminationBeforeShutdown;
+    this.useShutdown = useShutdown;
+    this.useShutdownNow = useShutdownNow;
+    this.threadCount = threadCount;
   }
 
   public ExecutorService getExecutorService() {
@@ -173,7 +235,9 @@ public class DistributedExecutorServiceRule extends AbstractDistributedRule {
 
   private void invokeBefore() throws Exception {
     try {
-      delegate.set(new ExecutorServiceRule());
+      delegate.set(new ExecutorServiceRule(enableAwaitTermination, awaitTerminationTimeout,
+          awaitTerminationTimeUnit, awaitTerminationBeforeShutdown, useShutdown, useShutdownNow,
+          threadCount));
       delegate.get().before();
     } catch (Throwable throwable) {
       if (throwable instanceof Exception) {
@@ -186,4 +250,97 @@ public class DistributedExecutorServiceRule extends AbstractDistributedRule {
   private void invokeAfter() {
     delegate.get().after();
   }
+
+  public static class Builder {
+
+    private boolean enableAwaitTermination;
+    private long awaitTerminationTimeout;
+    private TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
+    private boolean awaitTerminationBeforeShutdown = true;
+    private boolean useShutdown;
+    private boolean useShutdownNow = true;
+    private int threadCount;
+    private int vmCount;
+
+    protected Builder() {
+      // nothing
+    }
+
+    /**
+     * Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     */
+    public Builder awaitTermination(long timeout, TimeUnit unit) {
+      enableAwaitTermination = true;
+      awaitTerminationTimeout = timeout;
+      awaitTerminationTimeUnit = unit;
+      return this;
+    }
+
+    /**
+     * Enables invocation of {@code shutdown} during {@code tearDown}. Default is disabled.
+     */
+    public Builder useShutdown() {
+      useShutdown = true;
+      useShutdownNow = false;
+      return this;
+    }
+
+    /**
+     * Enables invocation of {@code shutdownNow} during {@code tearDown}. Default is enabled.
+     */
+    public Builder useShutdownNow() {
+      useShutdown = false;
+      useShutdownNow = true;
+      return this;
+    }
+
+    /**
+     * Specifies invocation of {@code awaitTermination} before {@code shutdown} or
+     * {@code shutdownNow}.
+     */
+    public Builder awaitTerminationBeforeShutdown() {
+      awaitTerminationBeforeShutdown = true;
+      return this;
+    }
+
+    /**
+     * Specifies invocation of {@code awaitTermination} after {@code shutdown} or
+     * {@code shutdownNow}.
+     */
+    public Builder awaitTerminationAfterShutdown() {
+      awaitTerminationBeforeShutdown = false;
+      return this;
+    }
+
+    /**
+     * Specifies the number of threads in the pool. Creates fixed thread pool if > 0. Default is 0
+     * which means (non-fixed) cached thread pool.
+     *
+     * @param threadCount the number of threads in the pool
+     */
+    public Builder threadCount(int threadCount) {
+      this.threadCount = threadCount;
+      return this;
+    }
+
+    /**
+     * Specifies the number of DUnit VMs to startup.
+     *
+     * @param vmCount the number of DUnit VMs to startup
+     */
+    public Builder vmCount(int vmCount) {
+      this.vmCount = vmCount;
+      return this;
+    }
+
+    /**
+     * Builds the instance of {@code ExecutorServiceRule}.
+     */
+    public DistributedExecutorServiceRule build() {
+      return new DistributedExecutorServiceRule(this);
+    }
+  }
 }
diff --git a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
index 40f1f5b..6081d20 100644
--- a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
+++ b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
@@ -165,7 +165,7 @@ org/apache/geode/test/dunit/rules/DistributedCounters,false,beforeBounceCounters
 org/apache/geode/test/dunit/rules/DistributedDiskDirRule,false,invoker:org/apache/geode/test/dunit/rules/RemoteInvoker,temporaryFolder:org/apache/geode/test/junit/rules/serializable/SerializableTemporaryFolder,testClassName:java/lang/String,testName:org/apache/geode/test/junit/rules/serializable/SerializableTestName,vmCount:int,vmEventListener:org/apache/geode/test/dunit/VMEventListener
 org/apache/geode/test/dunit/rules/DistributedDiskDirRule$InternalVMEventListener,false,this$0:org/apache/geode/test/dunit/rules/DistributedDiskDirRule
 org/apache/geode/test/dunit/rules/DistributedErrorCollector,false,beforeBounceErrors:java/util/Map
-org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule,false
+org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,threadCount:int,useShutdown:boolean,useShutdownNow:boolean
 org/apache/geode/test/dunit/rules/DistributedExternalResource,false,invoker:org/apache/geode/test/dunit/rules/RemoteInvoker
 org/apache/geode/test/dunit/rules/DistributedMap,false,controller:java/util/concurrent/atomic/AtomicReference,identity:int,initialEntries:java/util/Map
 org/apache/geode/test/dunit/rules/DistributedReference,false,autoClose:java/util/concurrent/atomic/AtomicBoolean,identity:int
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
index e3a6a67..abc9e42 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -94,6 +94,7 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   protected final boolean awaitTerminationBeforeShutdown;
   protected final boolean useShutdown;
   protected final boolean useShutdownNow;
+  protected final int threadCount;
 
   protected transient volatile DedicatedThreadFactory threadFactory;
   protected transient volatile ExecutorService executor;
@@ -106,12 +107,13 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   }
 
   protected ExecutorServiceRule(Builder builder) {
-    enableAwaitTermination = builder.enableAwaitTermination;
-    awaitTerminationTimeout = builder.awaitTerminationTimeout;
-    awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
-    awaitTerminationBeforeShutdown = builder.awaitTerminationBeforeShutdown;
-    useShutdown = builder.useShutdown;
-    useShutdownNow = builder.useShutdownNow;
+    this(builder.enableAwaitTermination,
+        builder.awaitTerminationTimeout,
+        builder.awaitTerminationTimeUnit,
+        builder.awaitTerminationBeforeShutdown,
+        builder.useShutdown,
+        builder.useShutdownNow,
+        builder.threadCount);
   }
 
   /**
@@ -119,18 +121,47 @@ public class ExecutorServiceRule extends SerializableExternalResource {
    * during {@code tearDown}.
    */
   public ExecutorServiceRule() {
-    enableAwaitTermination = false;
-    awaitTerminationTimeout = 0;
-    awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
-    awaitTerminationBeforeShutdown = false;
-    useShutdown = false;
-    useShutdownNow = true;
+    this(false, 0, TimeUnit.NANOSECONDS, false, false, true, 0);
+  }
+
+  /**
+   * Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
+   * during {@code tearDown}.
+   *
+   * @param threadCount The number of threads in the pool. Creates fixed thread pool if > 0; else
+   *        creates cached thread pool.
+   */
+  public ExecutorServiceRule(int threadCount) {
+    this(false, 0, TimeUnit.NANOSECONDS, false, false, true, threadCount);
+  }
+
+  /**
+   * For invocation by {@code DistributedExecutorServiceRule} which needs to subclass another class.
+   */
+  public ExecutorServiceRule(boolean enableAwaitTermination,
+      long awaitTerminationTimeout,
+      TimeUnit awaitTerminationTimeUnit,
+      boolean awaitTerminationBeforeShutdown,
+      boolean useShutdown,
+      boolean useShutdownNow,
+      int threadCount) {
+    this.enableAwaitTermination = enableAwaitTermination;
+    this.awaitTerminationTimeout = awaitTerminationTimeout;
+    this.awaitTerminationTimeUnit = awaitTerminationTimeUnit;
+    this.awaitTerminationBeforeShutdown = awaitTerminationBeforeShutdown;
+    this.useShutdown = useShutdown;
+    this.useShutdownNow = useShutdownNow;
+    this.threadCount = threadCount;
   }
 
   @Override
   public void before() {
     threadFactory = new DedicatedThreadFactory();
-    executor = Executors.newCachedThreadPool(threadFactory);
+    if (threadCount > 0) {
+      executor = Executors.newFixedThreadPool(threadCount, threadFactory);
+    } else {
+      executor = Executors.newCachedThreadPool(threadFactory);
+    }
   }
 
   @Override
@@ -375,6 +406,7 @@ public class ExecutorServiceRule extends SerializableExternalResource {
     protected boolean awaitTerminationBeforeShutdown = true;
     protected boolean useShutdown;
     protected boolean useShutdownNow = true;
+    protected int threadCount;
 
     protected Builder() {
       // nothing
@@ -430,6 +462,17 @@ public class ExecutorServiceRule extends SerializableExternalResource {
     }
 
     /**
+     * Specifies the number of threads in the pool. Creates fixed thread pool if > 0. Default is 0
+     * which means (non-fixed) cached thread pool.
+     *
+     * @param threadCount the number of threads in the pool
+     */
+    public Builder threadCount(int threadCount) {
+      this.threadCount = threadCount;
+      return this;
+    }
+
+    /**
      * Builds the instance of {@code ExecutorServiceRule}.
      */
     public ExecutorServiceRule build() {
diff --git a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
index 89eb8c1..b0be3e0 100644
--- a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
+++ b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
@@ -75,7 +75,7 @@ org/apache/geode/test/compiler/ClassBuilder,false,classPath:java/lang/String
 org/apache/geode/test/concurrent/FileBasedCountDownLatch,false,dataFile:java/io/File,lockFile:java/io/File
 org/apache/geode/test/junit/rules/CloseableReference,false,autoClose:java/util/concurrent/atomic/AtomicBoolean
 org/apache/geode/test/junit/rules/ConditionalIgnoreRule,false
-org/apache/geode/test/junit/rules/ExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,useShutdown:boolean,useShutdownNow:boolean
+org/apache/geode/test/junit/rules/ExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,threadCount:int,useShutdown:boolean,useShutdownNow:boolean
 org/apache/geode/test/junit/rules/IgnoreOnWindowsRule,false
 org/apache/geode/test/junit/rules/IgnoreUntilRule,false
 org/apache/geode/test/junit/rules/JarFileRule,false,className:java/lang/String,jarFile:java/io/File,jarName:java/lang/String,makeJarLarge:boolean

[geode] 02/02: GEODE-9825: processInputBuffer resize retains data (#7131)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0a69cc87bb8a43b18fe7e32cf588b7344ec37580
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Nov 23 08:29:11 2021 -0800

    GEODE-9825: processInputBuffer resize retains data (#7131)
    
    (cherry picked from commit fb142e1bbd42d6af2463fd9b9b49ef3e5519cfcb)
---
 .../internal/P2PMessagingConcurrencyDUnitTest.java | 370 +++++++++++++++++++++
 .../org/apache/geode/internal/tcp/Connection.java  |  12 +-
 2 files changed, 380 insertions(+), 2 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
new file mode 100644
index 0000000..0d7c2d3
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.distributed.internal;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.jetbrains.annotations.NotNull;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.version.VersionManager;
+
+/**
+ * Tests one-way P2P messaging between two peers.
+ * Many concurrent tasks compete on the sending side.
+ * The main purpose of the test is to exercise
+ * ByteBufferSharing and friends.
+ *
+ * Tests combinations of: conserve-sockets true/false,
+ * TLS on/off, and socket-buffer-size for sender
+ * and receiver both set to the default (and equal)
+ * and set to the sender's buffer twice as big as the
+ * receiver's buffer.
+ *
+ */
+@Category({MembershipTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class P2PMessagingConcurrencyDUnitTest {
+
+  // how many messages will each sender generate?
+  private static final int MESSAGES_PER_SENDER = 1_000;
+
+  // number of concurrent (sending) tasks to run
+  private static final int SENDER_COUNT = 10;
+
+  // (exclusive) upper bound of random message size, in bytes
+  private static final int LARGEST_MESSAGE_BOUND = 32 * 1024 + 2; // 32KiB + 2
+
+  // random seed
+  private static final int RANDOM_SEED = 1234;
+
+  private static Properties securityProperties;
+
+  @Rule
+  public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  @ClassRule
+  public static final DistributedExecutorServiceRule senderExecutorServiceRule =
+      new DistributedExecutorServiceRule(SENDER_COUNT, 3);
+
+  private MemberVM sender;
+  private MemberVM receiver;
+
+  /*
+   * bytes sent on sender JVM, bytes received on receiver JVM
+   * (not used in test JVM)
+   */
+  private static LongAdder bytesTransferredAdder;
+
+  private void configure(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    final Properties senderConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, sendSocketBufferSize);
+    final Properties receiverConfiguration =
+        gemFireConfiguration(conserveSockets, useTLS, receiveSocketBufferSize);
+
+    final MemberVM locator =
+        clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
+            x -> x.withProperties(senderConfiguration).withConnectionToLocator()
+                .withoutClusterConfigurationService().withoutManagementRestService());
+
+    sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+    receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
+  }
+
+  @Test
+  @Parameters({
+      /*
+       * all combinations of flags with buffer sizes:
+       * (equal), larger/smaller, smaller/larger, minimal
+       */
+      "true, true, 32768, 32768",
+      "true, true, 65536, 32768",
+      "true, true, 32768, 65536",
+      "true, true, 1024, 1024",
+      "true, false, 32768, 32768",
+      "true, false, 65536, 32768",
+      "true, false, 32768, 65536",
+      "true, false, 1024, 1024",
+      "false, true, 32768, 32768",
+      "false, true, 65536, 32768",
+      "false, true, 32768, 65536",
+      "false, true, 1024, 1024",
+      "false, false, 32768, 32768",
+      "false, false, 65536, 32768",
+      "false, false, 32768, 65536",
+      "false, false, 1024, 1024",
+  })
+  public void testP2PMessaging(
+      final boolean conserveSockets,
+      final boolean useTLS,
+      final int sendSocketBufferSize,
+      final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {
+
+    configure(conserveSockets, useTLS, sendSocketBufferSize, receiveSocketBufferSize);
+
+    final InternalDistributedMember receiverMember =
+        receiver.invoke(() -> {
+
+          bytesTransferredAdder = new LongAdder();
+
+          final ClusterDistributionManager cdm = getCDM();
+          final InternalDistributedMember localMember = cdm.getDistribution().getLocalMember();
+          return localMember;
+
+        });
+
+    sender.invoke(() -> {
+
+      bytesTransferredAdder = new LongAdder();
+
+      final ClusterDistributionManager cdm = getCDM();
+      final Random random = new Random(RANDOM_SEED);
+      final AtomicInteger nextSenderId = new AtomicInteger();
+
+      /*
+       * When this comment was written DistributedExecutorServiceRule's
+       * getExecutorService had no option to specify the number of threads.
+       * If it had we might have liked to specify the number of CPU cores.
+       * In an ideal world we'd want only as many threads as CPUs here.
+       * OTOH the P2P messaging system at the time this comment was written,
+       * used blocking I/O, so we were not, as it turns out, living in that
+       * ideal world.
+       */
+      final ExecutorService executor = senderExecutorServiceRule.getExecutorService();
+
+      final CountDownLatch startLatch = new CountDownLatch(SENDER_COUNT);
+      final CountDownLatch stopLatch = new CountDownLatch(SENDER_COUNT);
+      final LongAdder failedRecipientCount = new LongAdder();
+
+      final Runnable doSending = () -> {
+        final int senderId = nextSenderId.getAndIncrement();
+        try {
+          startLatch.countDown();
+          startLatch.await();
+        } catch (final InterruptedException e) {
+          throw new RuntimeException("doSending failed", e);
+        }
+        final int firstMessageId = senderId * SENDER_COUNT;
+        for (int messageId = firstMessageId; messageId < firstMessageId
+            + MESSAGES_PER_SENDER; messageId++) {
+          final TestMessage msg = new TestMessage(receiverMember, random, messageId);
+
+          /*
+           * HERE is the Geode API entrypoint we intend to test (putOutgoing()).
+           */
+          final Set<InternalDistributedMember> failedRecipients = cdm.putOutgoing(msg);
+
+          if (failedRecipients != null) {
+            failedRecipientCount.add(failedRecipients.size());
+          }
+        }
+        stopLatch.countDown();
+      };
+
+      for (int i = 0; i < SENDER_COUNT; ++i) {
+        executor.submit(doSending);
+      }
+
+      stopLatch.await();
+
+      assertThat(failedRecipientCount.sum()).as("message delivery failed N times").isZero();
+
+    });
+
+    final long bytesSent = getByteCount(sender);
+
+    await().untilAsserted(
+        () -> assertThat(getByteCount(receiver))
+            .as("bytes received != bytes sent")
+            .isEqualTo(bytesSent));
+  }
+
+  private long getByteCount(final MemberVM member) {
+    return member.invoke(() -> bytesTransferredAdder.sum());
+  }
+
+  private static ClusterDistributionManager getCDM() {
+    return (ClusterDistributionManager) ((InternalCache) CacheFactory.getAnyInstance())
+        .getDistributionManager();
+  }
+
+  private static class TestMessage extends DistributionMessage {
+
+    /*
+     * When this comment was written, messageId wasn't used for anything.
+     * The field was added during a misguided attempt to add SHA-256
+     * digest verification on sender and receiver. Then I figured out
+     * that there's no way to parallelize that (for the sender) so
+     * I settled for merely validating the number of bytes transferred.
+     * Left the field here in case it comes in handy later.
+     */
+    private volatile int messageId;
+    private volatile Random random;
+
+    TestMessage(final InternalDistributedMember receiver,
+        final Random random, final int messageId) {
+      setRecipient(receiver);
+      this.random = random;
+      this.messageId = messageId;
+    }
+
+    // necessary for deserialization
+    public TestMessage() {
+      random = null;
+      messageId = 0;
+    }
+
+    @Override
+    public int getProcessorType() {
+      return OperationExecutors.STANDARD_EXECUTOR;
+    }
+
+    @Override
+    protected void process(final ClusterDistributionManager dm) {}
+
+    @Override
+    public void toData(final DataOutput out, final SerializationContext context)
+        throws IOException {
+      super.toData(out, context);
+
+      out.writeInt(messageId);
+
+      final int length = random.nextInt(LARGEST_MESSAGE_BOUND);
+
+      out.writeInt(length);
+
+      final byte[] payload = new byte[length];
+      random.nextBytes(payload);
+
+      out.write(payload);
+
+      /*
+       * the LongAdder should ensure that we don't introduce any (much)
+       * synchronization with other concurrent tasks here
+       */
+      bytesTransferredAdder.add(length);
+    }
+
+    @Override
+    public void fromData(final DataInput in, final DeserializationContext context)
+        throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+
+      messageId = in.readInt();
+
+      final int length = in.readInt();
+
+      final byte[] payload = new byte[length];
+
+      in.readFully(payload);
+
+      bytesTransferredAdder.add(length);
+    }
+
+    @Override
+    public int getDSFID() {
+      return NO_FIXED_ID; // for testing only!
+    }
+  }
+
+  @NotNull
+  private static Properties gemFireConfiguration(
+      final boolean conserveSockets, final boolean useTLS,
+      final int socketBufferSize)
+      throws GeneralSecurityException, IOException {
+
+    final Properties props;
+    if (useTLS) {
+      props = securityProperties();
+    } else {
+      props = new Properties();
+    }
+
+    props.setProperty("socket-buffer-size", String.valueOf(socketBufferSize));
+
+    /*
+     * This is something we intend to test!
+     * Send all messages, from all threads, on a single socket per recipient.
+     * maintenance tip: to see what kind of connection you're getting you can
+     * uncomment logging over in DirectChannel.sendToMany()
+     *
+     * careful: if you set a boolean it doesn't take hold! setting a String
+     */
+    props.setProperty("conserve-sockets", String.valueOf(conserveSockets));
+
+    return props;
+  }
+
+  @NotNull
+  private static Properties securityProperties() throws GeneralSecurityException, IOException {
+    // subsequent calls must return the same value so members agree on credentials
+    if (securityProperties == null) {
+      final CertificateMaterial ca = new CertificateBuilder()
+          .commonName("Test CA")
+          .isCA()
+          .generate();
+
+      final CertificateMaterial serverCertificate = new CertificateBuilder()
+          .commonName("member")
+          .issuedBy(ca)
+          .generate();
+
+      final CertStores memberStore = new CertStores("member");
+      memberStore.withCertificate("member", serverCertificate);
+      memberStore.trust("ca", ca);
+      // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+      securityProperties = memberStore.propertiesWith("all", false, false);
+    }
+    return securityProperties;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 44dcfda..82e3b3d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2766,6 +2766,9 @@ public class Connection implements Runnable {
   /**
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
+   *
+   * pre-condition: inputBuffer (from inputSharing.getBuffer()) is in WRITABLE mode
+   * post-condition: inputBuffer is in WRITABLE mode
    */
   private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
       throws ConnectionException, IOException {
@@ -2842,12 +2845,12 @@ public class Connection implements Runnable {
                       "Allocating larger network read buffer, new size is {} old size was {}.",
                       allocSize, oldBufferSize);
                   inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
+                  makeReadableBufferWriteable(inputBuffer);
                 } else {
                   if (inputBuffer.position() != 0) {
                     inputBuffer.compact();
                   } else {
-                    inputBuffer.position(inputBuffer.limit());
-                    inputBuffer.limit(inputBuffer.capacity());
+                    makeReadableBufferWriteable(inputBuffer);
                   }
                 }
               }
@@ -2861,6 +2864,11 @@ public class Connection implements Runnable {
     }
   }
 
+  private void makeReadableBufferWriteable(final ByteBuffer inputBuffer) {
+    inputBuffer.position(inputBuffer.limit());
+    inputBuffer.limit(inputBuffer.capacity());
+  }
+
   private boolean readHandshakeForReceiver(DataInput dis) {
     try {
       byte b = dis.readByte();