You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2023/02/06 18:57:24 UTC
[beam] branch master updated: Change UnboundedScheduledExecutorService to avoid creating threads when (#25300)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dc632029f0c Change UnboundedScheduledExecutorService to avoid creating threads when (#25300)
dc632029f0c is described below
commit dc632029f0cce4d9ee6ee139cad7236388cd7b71
Author: Sam Whittle <sc...@users.noreply.github.com>
AuthorDate: Mon Feb 6 19:57:14 2023 +0100
Change UnboundedScheduledExecutorService to avoid creating threads when (#25300)
there is contention on the SynchronousQueue.
ThreadPoolExecutor creates a new thread if offer(Runnable) returns false.
With a SynchronousQueue under contention, this may return false even if
there are already idle threads in the pool. To avoid creating a thread
unnecessarily and growing the pool we modify offer to wait with a small
timeout of 100ms. This may delay initial execution on the pool but that
should be rare as it is encountered only once the high-watermark is
exceeded.
Also add a timeout to the non-core threads so that the pool reduces size.
Both of these changes are motivated to reduce memory usage due to per-thread
buffers in Beam and AvroCoder.
Add a test contending adds on the pool which previously showed the pool growing
beyond the necessary amount (ie created 104 threads when 100 is sufficient).
---
.../util/UnboundedScheduledExecutorService.java | 28 +++++++++--
.../UnboundedScheduledExecutorServiceTest.java | 54 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 5 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
index 9b38604a7a8..1c243ca2ada 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -63,7 +65,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* </ul>
*/
public final class UnboundedScheduledExecutorService implements ScheduledExecutorService {
-
/**
* A {@link FutureTask} that handles periodically rescheduling tasks.
*
@@ -181,7 +182,7 @@ public final class UnboundedScheduledExecutorService implements ScheduledExecuto
private final AtomicLong sequencer = new AtomicLong();
private final NanoClock clock;
- private final ThreadPoolExecutor threadPoolExecutor;
+ @VisibleForTesting final ThreadPoolExecutor threadPoolExecutor;
@VisibleForTesting final PriorityQueue<ScheduledFutureTask<?>> tasks;
private final AbstractExecutorService invokeMethodsAdapter;
private final Future<?> launchTasks;
@@ -201,9 +202,26 @@ public final class UnboundedScheduledExecutorService implements ScheduledExecuto
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
- Long.MAX_VALUE,
- TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
- new SynchronousQueue<>(),
+ // Put a high-timeout on non-core threads. This reduces memory for per-thread caches
+ // over time.
+ 1,
+ HOURS,
+ new SynchronousQueue<Runnable>() {
+ @Override
+ public boolean offer(Runnable r) {
+ try {
+ // By blocking for a little we hope to delay thread creation if there are existing
+ // threads that will eventually return. We expect this timeout to be very rarely
+ // hit as the high-watermark of necessary threads will remain for up to an hour.
+ if (offer(r, 10, MILLISECONDS)) {
+ return true;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+ },
threadFactoryBuilder.build());
// Create an internal adapter so that execute does not re-wrap the ScheduledFutureTask again
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
index ededf69c914..b8efa292bd2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.util;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,6 +39,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService.ScheduledFutureTask;
import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -46,10 +49,14 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Tests for {@link UnboundedScheduledExecutorService}. */
@RunWith(JUnit4.class)
public class UnboundedScheduledExecutorServiceTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(UnboundedScheduledExecutorServiceTest.class);
private static final Runnable RUNNABLE =
() -> {
@@ -502,4 +509,51 @@ public class UnboundedScheduledExecutorServiceTest {
}
Thread.sleep(100);
}
+
+ @Test
+ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+ UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+ CountDownLatch start = new CountDownLatch(100);
+
+ ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+ // Schedule 100 threads that are going to be scheduling work non-stop but sequentially.
+ for (int i = 0; i < 100; ++i) {
+ executor.execute(
+ () -> {
+ start.countDown();
+ try {
+ start.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ for (int j = 0; j < 1000; ++j) {
+ try {
+ executorService
+ .submit(
+ () -> {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ // Ignore, happens on executor shutdown.
+ }
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(3, MINUTES);
+
+ int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+ LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool);
+ // Ideally we would never create more than 100, however with contention it is still possible
+ // some extra threads will be created.
+ assertTrue(largestPool <= 104);
+ executorService.shutdown();
+ }
}