You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lukecwik (via GitHub)" <gi...@apache.org> on 2023/02/03 16:32:18 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1095985769


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   Instead of using a sleep, consider having each thread wait for a condition so we know that they are all running and then each submit 1000 tasks.



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);
+    done.countDown();
+    executor.shutdown();
+    executor.awaitTermination(1, MINUTES);
+
+    int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+    LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool);
+    assert(largestPool <= 100);

Review Comment:
   assert is a java assert and not a junit test assertion
   
   java asserts are no-ops unless being run with assertions enabled



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,
-            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-            new SynchronousQueue<>(),
+            // Put a high-timeout on non-core threads. This reduces memory for per-thread caches
+            // over time.
+            1,
+            TimeUnit.HOURS,
+            new SynchronousQueue<Runnable>() {
+              @Override
+              public boolean offer(Runnable r) {
+                try {
+                  // By blocking for a little we hope to delay thread creation if there are existing

Review Comment:
   This is a good short term solution for tasks to be more likely paired up with existing threads, if the system is very busy then I'm not sure how much this will help.
   
   I had originally thought of storing the set of unused and available threads instead of using this queue over tasks but there is higher communication overhead to make that happen since you insert threads when they are done, pull them out when you want to assign something and then have to notify them to wake them up.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,

Review Comment:
   Note that these were the original parameters when we didn't have a scheduled executor service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org