You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2023/02/06 18:57:24 UTC

[beam] branch master updated: Change UnboundedScheduledExecutorService to avoid creating threads when (#25300)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dc632029f0c Change UnboundedScheduledExecutorService to avoid creating threads when (#25300)
dc632029f0c is described below

commit dc632029f0cce4d9ee6ee139cad7236388cd7b71
Author: Sam Whittle <sc...@users.noreply.github.com>
AuthorDate: Mon Feb 6 19:57:14 2023 +0100

    Change UnboundedScheduledExecutorService to avoid creating threads when (#25300)
    
    there is contention on the SynchronousQueue.
    
    ThreadPoolExecutor creates a new thread if offer(Runnable) returns false.
    With a SynchronousQueue under contention, this may return false even if
    there are already idle threads in the pool. To avoid creating a thread
    unnecessarily and growing the pool we modify offer to wait with a small
    timeout of 100ms. This may delay initial execution on the pool but that
    should be rare as it is encountered only once the high-watermark is
    exceeded.
    
    Also add a timeout to the non-core threads so that the pool reduces size.
    
    Both of these changes are motivated to reduce memory usage due to per-thread
    buffers in Beam and AvroCoder.
    
    Add a test contending adds on the pool which previously showed the pool growing
    beyond the necessary amount (ie created 104 threads when 100 is sufficient).
---
 .../util/UnboundedScheduledExecutorService.java    | 28 +++++++++--
 .../UnboundedScheduledExecutorServiceTest.java     | 54 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
index 9b38604a7a8..1c243ca2ada 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -63,7 +65,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * </ul>
  */
 public final class UnboundedScheduledExecutorService implements ScheduledExecutorService {
-
   /**
    * A {@link FutureTask} that handles periodically rescheduling tasks.
    *
@@ -181,7 +182,7 @@ public final class UnboundedScheduledExecutorService implements ScheduledExecuto
   private final AtomicLong sequencer = new AtomicLong();
 
   private final NanoClock clock;
-  private final ThreadPoolExecutor threadPoolExecutor;
+  @VisibleForTesting final ThreadPoolExecutor threadPoolExecutor;
   @VisibleForTesting final PriorityQueue<ScheduledFutureTask<?>> tasks;
   private final AbstractExecutorService invokeMethodsAdapter;
   private final Future<?> launchTasks;
@@ -201,9 +202,26 @@ public final class UnboundedScheduledExecutorService implements ScheduledExecuto
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,
-            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-            new SynchronousQueue<>(),
+            // Put a high-timeout on non-core threads. This reduces memory for per-thread caches
+            // over time.
+            1,
+            HOURS,
+            new SynchronousQueue<Runnable>() {
+              @Override
+              public boolean offer(Runnable r) {
+                try {
+                  // By blocking for a little we hope to delay thread creation if there are existing
+                  // threads that will eventually return. We expect this timeout to be very rarely
+                  // hit as the high-watermark of necessary threads will remain for up to an hour.
+                  if (offer(r, 10, MILLISECONDS)) {
+                    return true;
+                  }
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                }
+                return false;
+              }
+            },
             threadFactoryBuilder.build());
 
     // Create an internal adapter so that execute does not re-wrap the ScheduledFutureTask again
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
index ededf69c914..b8efa292bd2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,6 +39,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.util.UnboundedScheduledExecutorService.ScheduledFutureTask;
 import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -46,10 +49,14 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Tests for {@link UnboundedScheduledExecutorService}. */
 @RunWith(JUnit4.class)
 public class UnboundedScheduledExecutorServiceTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(UnboundedScheduledExecutorServiceTest.class);
 
   private static final Runnable RUNNABLE =
       () -> {
@@ -502,4 +509,51 @@ public class UnboundedScheduledExecutorServiceTest {
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch start = new CountDownLatch(100);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 100 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            start.countDown();
+            try {
+              start.await();
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+            for (int j = 0; j < 1000; ++j) {
+              try {
+                executorService
+                    .submit(
+                        () -> {
+                          try {
+                            Thread.sleep(1);
+                          } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                          }
+                        })
+                    .get();
+              } catch (InterruptedException | ExecutionException e) {
+                // Ignore, happens on executor shutdown.
+              }
+            }
+          });
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(3, MINUTES);
+
+    int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+    LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool);
+    // Ideally we would never create more than 100, however with contention it is still possible
+    // some extra threads will be created.
+    assertTrue(largestPool <= 104);
+    executorService.shutdown();
+  }
 }