You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "scwhittle (via GitHub)" <gi...@apache.org> on 2023/02/03 11:45:01 UTC

[GitHub] [beam] scwhittle opened a new pull request, #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

scwhittle opened a new pull request, #25300:
URL: https://github.com/apache/beam/pull/25300

   ThreadPoolExecutor creates a new thread if offer(Runnable) returns false. With a SynchronousQueue under contention, this may return false even if there are already idle threads in the pool. To avoid creating a thread unnecessarily and growing the pool we modify offer to wait with a small timeout of 100ms. This may delay initial execution on the pool but that should be rare as it is encountered only once the high-watermark is exceeded.
   
   Also add a timeout to the non-core threads so that the pool reduces size.
   
   Both of these changes are motivated to reduce memory usage due to per-thread buffers in Beam and AvroCoder.
   
   Add a test contending adds on the pool which previously showed the pool growing beyond the necessary amount (ie created 104 threads when 100 is sufficient).
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096025456


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,

Review Comment:
   Agreed, I'm not sure the change to the service made this any different, likely could have been issue before too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on PR #25300:
URL: https://github.com/apache/beam/pull/25300#issuecomment-1415756766

   R: @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik merged PR #25300:
URL: https://github.com/apache/beam/pull/25300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on PR #25300:
URL: https://github.com/apache/beam/pull/25300#issuecomment-1416229863

   Lots of flakes with the test, see https://github.com/apache/beam/pull/25300#discussion_r1096091240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1095985769


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   Instead of using a sleep, consider having each thread wait for a condition so we know that they are all running and then each submit 1000 tasks.



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);
+    done.countDown();
+    executor.shutdown();
+    executor.awaitTermination(1, MINUTES);
+
+    int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+    LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool);
+    assert(largestPool <= 100);

Review Comment:
   assert is a java assert and not a junit test assertion
   
   java asserts are no-ops unless being run with assertions enabled



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,
-            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-            new SynchronousQueue<>(),
+            // Put a high-timeout on non-core threads. This reduces memory for per-thread caches
+            // over time.
+            1,
+            TimeUnit.HOURS,
+            new SynchronousQueue<Runnable>() {
+              @Override
+              public boolean offer(Runnable r) {
+                try {
+                  // By blocking for a little we hope to delay thread creation if there are existing

Review Comment:
   This is a good short term solution for tasks to be more likely paired up with existing threads, if the system is very busy then I'm not sure how much this will help.
   
   I had originally thought of storing the set of unused and available threads instead of using this queue over tasks but there is higher communication overhead to make that happen since you insert threads when they are done, pull them out when you want to assign something and then have to notify them to wake them up.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,

Review Comment:
   Note that these were the original parameters when we didn't have a scheduled executor service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1097289368


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   Out of 100 I only saw at most 101 with my change, I modified the test to check <=104 to prevent flakes since that failed with previous. And we don't really need to be too strict here, just know it's better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on PR #25300:
URL: https://github.com/apache/beam/pull/25300#issuecomment-1415821148

   Separately the overhead for buffers and encoding per-thread makes me wonder if we want separate unboundedscheduledexecutor for scheduling DoFns and for other things like GAX.  That could help keep the per-thread objects limited to the number of threads necessary for that particular type of task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096078434


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.util;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;

Review Comment:
   ```suggestion
   import static java.util.concurrent.TimeUnit.HOURS;
   import static java.util.concurrent.TimeUnit.MILLISECONDS;
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +200,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,
-            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-            new SynchronousQueue<>(),
+            // Put a high-timeout on non-core threads. This reduces memory for per-thread caches
+            // over time.
+            1,
+            TimeUnit.HOURS,

Review Comment:
   ```suggestion
               HOURS,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096031498


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   Much faster now :)
   Passes with 99 threads with this change.
   Failed with 108 without this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25300:
URL: https://github.com/apache/beam/pull/25300#issuecomment-1415758300

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096080925


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -37,7 +39,6 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;

Review Comment:
   ```suggestion
   import java.util.concurrent.ThreadPoolExecutor;
   import java.util.concurrent.TimeUnit;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096024741


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,
-            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-            new SynchronousQueue<>(),
+            // Put a high-timeout on non-core threads. This reduces memory for per-thread caches
+            // over time.
+            1,
+            TimeUnit.HOURS,
+            new SynchronousQueue<Runnable>() {
+              @Override
+              public boolean offer(Runnable r) {
+                try {
+                  // By blocking for a little we hope to delay thread creation if there are existing

Review Comment:
   Ack



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);
+    done.countDown();
+    executor.shutdown();
+    executor.awaitTermination(1, MINUTES);
+
+    int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+    LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool);
+    assert(largestPool <= 100);

Review Comment:
   Done



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096029624


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-            Long.MAX_VALUE,

Review Comment:
   Yeah, I think this will address a likely slow leak we have always had.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on PR #25300:
URL: https://github.com/apache/beam/pull/25300#issuecomment-1419122195

   Actually I see now it is 101 because there is an internal thread scheduled to poll the tasks queue.
   But I'm leaving at 104 just to avoid possible flakiness. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25300: Change UnboundedScheduledExecutorService to avoid creating threads when there is contention on the SynchronousQueue.

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1096091240


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
+    UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   I ran the test locally and it is flaking a lot (> 50%).
   
   Without your change I see something like:
   ```
   Feb 03, 2023 9:52:47 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 105 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:52:48 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 107 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:52:49 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 105 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:52:50 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 105 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:52:52 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 103 threads to execute at most 100 parallel tasks
   ```
   
   With your change I see:
   ```
   Feb 03, 2023 9:54:20 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 101 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:54:21 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 100 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:54:22 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 97 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:54:24 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 101 threads to execute at most 100 parallel tasks
   Feb 03, 2023 9:54:25 AM org.apache.beam.sdk.util.UnboundedScheduledExecutorServiceTest testThreadsAreAddedOnlyAsNeededWithContention
   INFO: Created 101 threads to execute at most 100 parallel tasks
   ```
   and it flaked 29 times out of 50.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org