You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "MelodyShen (via GitHub)" <gi...@apache.org> on 2024/02/27 18:08:26 UTC

[PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

MelodyShen opened a new pull request, #30439:
URL: https://github.com/apache/beam/pull/30439

   Workers will read the StreamignScalingReportResponse from worker messages and configure the executor pool size based on the specified value.
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1525090238


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -653,6 +659,9 @@ private static void setUpWorkLoggingContext(String workId, String computationId)
   }
 
   private int chooseMaximumNumberOfThreads() {
+    if (maxThreadCountOverride.get() != 0) {

Review Comment:
   since this is possibly modified, it's safer to save the value you observe instead of observing it again
   
   probably wouldn't ever actually change between calls but might as well be safe.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -115,12 +143,16 @@ public boolean executorQueueIsEmpty() {
     return executor.getQueue().isEmpty();
   }
 
-  public long allThreadsActiveTime() {
+  public synchronized long allThreadsActiveTime() {
     return totalTimeMaxActiveThreadsUsed;
   }
 
-  public int activeCount() {
-    return activeCount.intValue();
+  public synchronized int activeCount() {
+    return activeCount;
+  }
+
+  public synchronized int maximumThreadCount() {
+    return maximumThreadCount;
   }
 
   public long bytesOutstanding() {

Review Comment:
   doesn't look modified by this PR but bytesOutstanding() and elementsOutstanding() don't seem safe to read here if they are not beneath the monitor
   
   can these public methods be removed? or shoudl they be changed to use monitor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on PR #30439:
URL: https://github.com/apache/beam/pull/30439#issuecomment-1972398557

   Hi @scwhittle sorry I rebased the branch incorrectly which caused the previous commit history got lost. The main changes since last review are:
   1. BoundedQueueExecutor: Use synchronized instead of Atomic for all the thread-unsafe fields.
   2. StreamingDataflowWorkerTest: update test
   3. Added a new test file BoundedQueueExecutorTest to test behaviors related to maximumThreadCount and TotalMaxThreadActiveTimeUsed.
   
   Thanks for your time!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1510932832


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -500,8 +503,10 @@ static StreamingDataflowWorker forTesting(
       boolean publishCounters,
       HotKeyLogger hotKeyLogger,
       Supplier<Instant> clock,
-      Function<String, ScheduledExecutorService> executorSupplier) {
-    BoundedQueueExecutor boundedQueueExecutor = createWorkUnitExecutor(options);
+      Function<String, ScheduledExecutorService> executorSupplier,
+      BoundedQueueExecutor executor) {

Review Comment:
   how about naming workUnitExecutor and can just use same variable below to avoid proliferation of names
   
   if (workUnitExeuctor == null) workUnitExecutor = createWorkUnitExecutor(options);



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1918,6 +1926,26 @@ private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
     return Optional.of(workUnitClient.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics));
   }
 
+  private void readAndSaveWorkerMessageResponseForStreamingScalingReportResponse(
+      List<WorkerMessageResponse> responses) {
+    for (WorkerMessageResponse response : responses) {
+      if (response.getStreamingScalingReportResponse() != null) {

Review Comment:
   would it be possible to get multiple responses? If so perhaps it would be best to find the last value in the loop and then just set once instead of possibly setting multiple times.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -59,8 +69,8 @@ public BoundedQueueExecutor(
           @Override
           protected void beforeExecute(Thread t, Runnable r) {
             super.beforeExecute(t, r);
-            synchronized (this) {
-              if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount++ >= maximumThreadCount - 1) {
                 startTimeMaxActiveThreadsUsed = System.currentTimeMillis();

Review Comment:
   only reset if zero? I think it could be racy since 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -37,8 +37,17 @@ public class BoundedQueueExecutor {
   private final Monitor monitor = new Monitor();
   private int elementsOutstanding = 0;

Review Comment:
   comment that elementsOutstanding and bytesOutstanding are guarded by the monitor



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,189 @@
+/*

Review Comment:
   I think this file should be in the directory path matching other util files like:
   
   runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitorTest.java
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1508317550


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -104,6 +106,20 @@ public void forceExecute(Runnable work, long workBytes) {
     executeLockHeld(work, workBytes);
   }
 
+  // Set the maximum/core pool size of the executor.
+  public void setMaximumPoolSize(int maximumPoolSize) {

Review Comment:
   Sounds good. I will refactor the class for related atomics and non-guarded fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #30439:
URL: https://github.com/apache/beam/pull/30439#issuecomment-2038472128

   ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/30439?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `86.20690%` with `8 lines` in your changes are missing coverage. Please review.
   > Project coverage is 62.87%. Comparing base [(`0d41168`)](https://app.codecov.io/gh/apache/beam/commit/0d41168a0963869df037e468f80f5ab8466a99ab?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`d499074`)](https://app.codecov.io/gh/apache/beam/pull/30439?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 1 commits behind head on master.
   
   | [Files](https://app.codecov.io/gh/apache/beam/pull/30439?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...reaming/harness/StreamingWorkerStatusReporter.java](https://app.codecov.io/gh/apache/beam/pull/30439?src=pr&el=tree&filepath=runners%2Fgoogle-cloud-dataflow-java%2Fworker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fdataflow%2Fworker%2Fstreaming%2Fharness%2FStreamingWorkerStatusReporter.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cnVubmVycy9nb29nbGUtY2xvdWQtZGF0YWZsb3ctamF2YS93b3JrZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2JlYW0vcnVubmVycy9kYXRhZmxvdy93b3JrZXIvc3RyZWFtaW5nL2hhcm5lc3MvU3RyZWFtaW5nV29ya2VyU3RhdHVzUmVwb3J0ZXIuamF2YQ==) | 83.87% | [3 Missing and 2 partials :warning: ](https://app.codecov.io/gh/apache/beam/pull/30439?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...ers/dataflow/worker/util/BoundedQueueExecutor.java](https://app.codecov.io/gh/apache/beam/pull/30439?src=pr&el=tree&filepath=runners%2Fgoogle-cloud-dataflow-java%2Fworker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fdataflow%2Fworker%2Futil%2FBoundedQueueExecutor.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cnVubmVycy9nb29nbGUtY2xvdWQtZGF0YWZsb3ctamF2YS93b3JrZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2JlYW0vcnVubmVycy9kYXRhZmxvdy93b3JrZXIvdXRpbC9Cb3VuZGVkUXVldWVFeGVjdXRvci5qYXZh) | 86.95% | [2 Missing and 1 partial :warning: ](https://app.codecov.io/gh/apache/beam/pull/30439?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #30439       +/-   ##
   =============================================
   + Coverage     56.86%   62.87%    +6.00%     
   - Complexity     1485    14773    +13288     
   =============================================
     Files           501     2207     +1706     
     Lines         46219   153504   +107285     
     Branches       1076    11735    +10659     
   =============================================
   + Hits          26283    96511    +70228     
   - Misses        17918    50840    +32922     
   - Partials       2018     6153     +4135     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/beam/pull/30439/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [java](https://app.codecov.io/gh/apache/beam/pull/30439/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `65.69% <86.20%> (-3.13%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/30439?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen closed pull request #30439: Tune maximum thread count for streaming dataflow worker executor dynamically.
URL: https://github.com/apache/beam/pull/30439


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1531099531


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));

Review Comment:
   Cool!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1522162667


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1918,6 +1926,26 @@ private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
     return Optional.of(workUnitClient.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics));
   }
 
+  private void readAndSaveWorkerMessageResponseForStreamingScalingReportResponse(
+      List<WorkerMessageResponse> responses) {
+    for (WorkerMessageResponse response : responses) {
+      if (response.getStreamingScalingReportResponse() != null) {

Review Comment:
   Currently only single response will be obtained here. But that makes sense to treat it as multiple here. Will refactor this.



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,189 @@
+/*

Review Comment:
   Yeah I put it wrongly. Fixed.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -59,8 +69,8 @@ public BoundedQueueExecutor(
           @Override
           protected void beforeExecute(Thread t, Runnable r) {
             super.beforeExecute(t, r);
-            synchronized (this) {
-              if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount++ >= maximumThreadCount - 1) {
                 startTimeMaxActiveThreadsUsed = System.currentTimeMillis();

Review Comment:
   Yeah that makes sense.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -37,8 +37,17 @@ public class BoundedQueueExecutor {
   private final Monitor monitor = new Monitor();
   private int elementsOutstanding = 0;

Review Comment:
   Sure.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -500,8 +503,10 @@ static StreamingDataflowWorker forTesting(
       boolean publishCounters,
       HotKeyLogger hotKeyLogger,
       Supplier<Instant> clock,
-      Function<String, ScheduledExecutorService> executorSupplier) {
-    BoundedQueueExecutor boundedQueueExecutor = createWorkUnitExecutor(options);
+      Function<String, ScheduledExecutorService> executorSupplier,
+      BoundedQueueExecutor executor) {

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1530536071


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -59,8 +72,8 @@ public BoundedQueueExecutor(
           @Override
           protected void beforeExecute(Thread t, Runnable r) {
             super.beforeExecute(t, r);
-            synchronized (this) {
-              if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount++ >= maximumThreadCount - 1 && startTimeMaxActiveThreadsUsed == 0) {

Review Comment:
   nit: how about
   ++activeCount >= maximumThreadCount
   
   seems easier to read



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -69,8 +82,8 @@ protected void beforeExecute(Thread t, Runnable r) {
           @Override
           protected void afterExecute(Runnable r, Throwable t) {
             super.afterExecute(r, t);
-            synchronized (this) {
-              if (activeCount.getAndDecrement() == maximumPoolSize) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount-- <= maximumThreadCount && startTimeMaxActiveThreadsUsed > 0) {

Review Comment:
   ditto,
   --activeCount < maximumThreadCount 
   seems simpler



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count

Review Comment:
   could you also verify that increasing the bytes would let something in similarly?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count
+    executor.setMaximumPoolSize(3, 103);
+    assertEquals(3, executor.maximumThreadCount());
+
+    // m3 is accepted
+    processStart3.await();
+    assertEquals(3, executor.activeCount());
+
+    stop.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.
+    }
+    // Max pool size was reached so the allThreadsActiveTime() was updated.
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumThreadCountUpdated()
+      throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());

Review Comment:
   this made me realize we're only updating the count after the active work completes.  That could delay possibly requesting additional threads if there is long-running work on the available threads.  Maybe in a followup we might want to periodically increase this with some monitoring thread instead of relying on the finishing threads themselves to increment.
   
   



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count
+    executor.setMaximumPoolSize(3, 103);
+    assertEquals(3, executor.maximumThreadCount());
+
+    // m3 is accepted
+    processStart3.await();
+    assertEquals(3, executor.activeCount());
+
+    stop.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.

Review Comment:
   sleep a little here while polling to not burn cpu and slow down completion.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count
+    executor.setMaximumPoolSize(3, 103);
+    assertEquals(3, executor.maximumThreadCount());
+
+    // m3 is accepted
+    processStart3.await();
+    assertEquals(3, executor.activeCount());
+
+    stop.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.
+    }
+    // Max pool size was reached so the allThreadsActiveTime() was updated.
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumThreadCountUpdated()
+      throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    // Increase the max thread count
+    executor.setMaximumPoolSize(5, 105);
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -104,6 +117,21 @@ public void forceExecute(Runnable work, long workBytes) {
     executeLockHeld(work, workBytes);

Review Comment:
   can you rename executeLockHeld to executeMonitorHeld to clarify since we have both monitor and synchronized lock now?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));

Review Comment:
   for all of these awaits where we expect it to happen, await without a timeout to prevent possible test flakiness. test deadline can be sufficient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on PR #30439:
URL: https://github.com/apache/beam/pull/30439#issuecomment-2038766063

   Hi @scwhittle thanks for reviewing the changes. I have rebased master to catch up with the latest and all checks passed. Would you mind merging the PR when available? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1529071176


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -149,18 +191,22 @@ public String summaryHtml() {
       builder.append(executor.getMaximumPoolSize());
       builder.append("<br>/n");
 
+      builder.append("Maximum Threads: ");
+      builder.append(maximumThreadCount());
+      builder.append("<br>/n");
+
       builder.append("Active Threads: ");
       builder.append(executor.getActiveCount());
       builder.append("<br>/n");
 
       builder.append("Work Queue Size: ");
-      builder.append(elementsOutstanding);
+      builder.append(elementsOutstanding());

Review Comment:
   Ah I didn't notice that. Reverted and added a test case for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on PR #30439:
URL: https://github.com/apache/beam/pull/30439#issuecomment-2030785112

   Rebased the master branch. The base code was refactored a lot recently so moved the related changes from StreamingDataflowWorker to StreamingWorkerStatusReporter based on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on PR #30439:
URL: https://github.com/apache/beam/pull/30439#issuecomment-2039290869

   mac os failures appear unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1525260145


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -653,6 +659,9 @@ private static void setUpWorkLoggingContext(String workId, String computationId)
   }
 
   private int chooseMaximumNumberOfThreads() {
+    if (maxThreadCountOverride.get() != 0) {

Review Comment:
   Sounds good!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1529071437


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch latch, AtomicBoolean stop) {
+    Runnable runnable =
+        () -> {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            return;
+          }
+          latch.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;

Review Comment:
   Sure change to use countdownlatch.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch latch, AtomicBoolean stop) {
+    Runnable runnable =
+        () -> {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            return;
+          }
+          latch.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1531100277


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -59,8 +72,8 @@ public BoundedQueueExecutor(
           @Override
           protected void beforeExecute(Thread t, Runnable r) {
             super.beforeExecute(t, r);
-            synchronized (this) {
-              if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount++ >= maximumThreadCount - 1 && startTimeMaxActiveThreadsUsed == 0) {

Review Comment:
   Sure!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle merged PR #30439:
URL: https://github.com/apache/beam/pull/30439


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1526703939


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch latch, AtomicBoolean stop) {
+    Runnable runnable =
+        () -> {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            return;
+          }
+          latch.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;

Review Comment:
   add Thread.sleep(10) or something to avoid burning cpu
   
   or could make stop a countdown latch as well to signal.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -149,18 +191,22 @@ public String summaryHtml() {
       builder.append(executor.getMaximumPoolSize());
       builder.append("<br>/n");
 
+      builder.append("Maximum Threads: ");
+      builder.append(maximumThreadCount());
+      builder.append("<br>/n");
+
       builder.append("Active Threads: ");
       builder.append(executor.getActiveCount());
       builder.append("<br>/n");
 
       builder.append("Work Queue Size: ");
-      builder.append(elementsOutstanding);
+      builder.append(elementsOutstanding());

Review Comment:
   should the monitor.enter() at the top of summaryHtml be removed if using the accessors that grab the monitor?
   
   not sure if it is reentrant. Would be good to add renderHtml to the unit test either way.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch latch, AtomicBoolean stop) {
+    Runnable runnable =
+        () -> {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            return;
+          }
+          latch.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+

Review Comment:
   add a basic test since we have a test for this now
   below we never schedule new work after previous work started for example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1525267452


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -115,12 +143,16 @@ public boolean executorQueueIsEmpty() {
     return executor.getQueue().isEmpty();
   }
 
-  public long allThreadsActiveTime() {
+  public synchronized long allThreadsActiveTime() {
     return totalTimeMaxActiveThreadsUsed;
   }
 
-  public int activeCount() {
-    return activeCount.intValue();
+  public synchronized int activeCount() {
+    return activeCount;
+  }
+
+  public synchronized int maximumThreadCount() {
+    return maximumThreadCount;
   }
 
   public long bytesOutstanding() {

Review Comment:
   Yeah they are not thread safe. They are currently used in StreamingDataflowWorker to gather metrics so it seems okay to me to leave them unsafe but in case we use them for other purposes, I feel it is better to be changed to use monitor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1531096133


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count
+    executor.setMaximumPoolSize(3, 103);
+    assertEquals(3, executor.maximumThreadCount());
+
+    // m3 is accepted
+    processStart3.await();
+    assertEquals(3, executor.activeCount());
+
+    stop.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());
+    stop.countDown();
+    while (executor.activeCount() != 0) {
+      // Waiting for all threads to be ended.
+    }
+    // Max pool size was reached so the allThreadsActiveTime() was updated.
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumThreadCountUpdated()
+      throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    assertEquals(0l, executor.allThreadsActiveTime());

Review Comment:
   Yeah if all the threads are occupied by the long-running tasks, we may under-estimate the related time counters. Not sure if we use these counters for any decisions now but if later we decide to tune the thread count based on these time counters and thread utilization, it might cause problem. Let me create a bug to follow up on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1507573892


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -104,6 +106,20 @@ public void forceExecute(Runnable work, long workBytes) {
     executeLockHeld(work, workBytes);
   }
 
+  // Set the maximum/core pool size of the executor.
+  public void setMaximumPoolSize(int maximumPoolSize) {

Review Comment:
   sychronized?
   seems safer when comparing the atomic with other things like pool size
   
   Perhaps the atomics in this class should just be removed and protected by synchronized.  Using the atomics within synchronized block is more overhead and these accessors reading the atomics are not called very frequently I believe.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -38,6 +38,7 @@ public class BoundedQueueExecutor {
   private int elementsOutstanding = 0;
   private long bytesOutstanding = 0;
   private final AtomicInteger activeCount = new AtomicInteger();
+  private final AtomicInteger maximumThreadCount = new AtomicInteger();

Review Comment:
   would be good to add a unit test covering the new methods to this class directly



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -70,7 +72,7 @@ protected void beforeExecute(Thread t, Runnable r) {
           protected void afterExecute(Runnable r, Throwable t) {
             super.afterExecute(r, t);
             synchronized (this) {
-              if (activeCount.getAndDecrement() == maximumPoolSize) {
+              if (activeCount.getAndDecrement() == maximumThreadCount.get()) {

Review Comment:
   I think this might need to be
   if (activeCount.getAndDecrement() <= maximumThreadCount.get() &&
       startTimeMaxActiveThreadsUsed > 0)
   
   you could maybe test this by adding more work to queue than the limit, having the work that is scheduled block, then increase the limit and let all the work complete.  We should see totalTimeMaxActiveThreadsUsed incremented
   
   Another way to fix it could be to modify the setMaximumPoolSize to reset and count startTimeMaxActiveThreadsUsed if the limit increased (it will be restarted shortly as new threads start and call beforeExecute).



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2916,6 +2916,69 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;

Review Comment:
   this doesn't matter right? If so set it higher so it isn't possible that it triggers.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -48,6 +49,7 @@ public BoundedQueueExecutor(
       int maximumElementsOutstanding,
       long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
+    this.maximumThreadCount.set(maximumPoolSize);

Review Comment:
   how about 
   this.maximumThreadCount = new AtomicInteger(maximumPoolSize);
   instead of assignment above



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -123,6 +139,10 @@ public int activeCount() {
     return activeCount.intValue();
   }
 
+  public int maximumThreadCount() {
+    return maximumThreadCount.intValue();

Review Comment:
   github won't let me add a comment, but above allThreadsActiveTime shoudl be synchronized so it isn't racy reading that while other threads are modifying it.
   
   can add a @GuardedBy annotation to totalTimeMaxActiveThreadsUsed



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2916,6 +2916,69 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep track of
+    // active thread count.
+    BoundedQueueExecutor executor =

Review Comment:
   the test that BoundedQueueExecutor works when calling setMaximumPoolSize seems like it should be in BoundedQueueExecutorTest.
   
   The test at the streaming dataflow worker level seems like it should have the work update client and validate plumbing of responses to setting the max on the queue.  The queue itself could be a mock in that test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1525267452


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -115,12 +143,16 @@ public boolean executorQueueIsEmpty() {
     return executor.getQueue().isEmpty();
   }
 
-  public long allThreadsActiveTime() {
+  public synchronized long allThreadsActiveTime() {
     return totalTimeMaxActiveThreadsUsed;
   }
 
-  public int activeCount() {
-    return activeCount.intValue();
+  public synchronized int activeCount() {
+    return activeCount;
+  }
+
+  public synchronized int maximumThreadCount() {
+    return maximumThreadCount;
   }
 
   public long bytesOutstanding() {

Review Comment:
   Yeah they are not thread safe. They are currently used in StreamingDataflowWorker to gather metrics so it seems okay to me to leave them unsafe but in case we use them for other purposes in the future, I feel it is better to be changed to use monitor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1531166753


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
+@RunWith(JUnit4.class)
+// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is
+// released (2.11.0)
+@SuppressWarnings("unused")
+public class BoundedQueueExecutorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
+  private static final int DEFAULT_MAX_THREADS = 2;
+  private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
+
+  private BoundedQueueExecutor executor;
+
+  private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
+    Runnable runnable =
+        () -> {
+          start.countDown();
+          try {
+            stop.await();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+    return runnable;
+  }
+
+  @Before
+  public void setUp() {
+    this.executor =
+        new BoundedQueueExecutor(
+            DEFAULT_MAX_THREADS,
+            DEFAULT_THREAD_EXPIRATION_SEC,
+            TimeUnit.SECONDS,
+            DEFAULT_MAX_THREADS + 100,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+  }
+
+  @Test
+  public void testScheduleWork() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStop1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStop2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch processStop3 = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
+
+    executor.execute(m1, 1);
+    assertTrue(processStart1.await(1000, TimeUnit.MILLISECONDS));
+    executor.execute(m2, 1);
+    assertTrue(processStart2.await(1000, TimeUnit.MILLISECONDS));
+    // m1 and m2 have started and all threads are occupied so m3 will be queued and not executed.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Stop m1 so there is an available thread for m3 to run.
+    processStop1.countDown();
+    assertTrue(processStart3.await(1000, TimeUnit.MILLISECONDS));
+    // m3 started.
+    processStop2.countDown();
+    processStop3.countDown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    CountDownLatch processStart1 = new CountDownLatch(1);
+    CountDownLatch processStart2 = new CountDownLatch(1);
+    CountDownLatch processStart3 = new CountDownLatch(1);
+    CountDownLatch stop = new CountDownLatch(1);
+    Runnable m1 = createSleepProcessWorkFn(processStart1, stop);
+    Runnable m2 = createSleepProcessWorkFn(processStart2, stop);
+    Runnable m3 = createSleepProcessWorkFn(processStart3, stop);
+
+    // Initial state.
+    assertEquals(0, executor.activeCount());
+    assertEquals(2, executor.maximumThreadCount());
+
+    // m1 and m2 are accepted.
+    executor.execute(m1, 1);
+    processStart1.await();
+    assertEquals(1, executor.activeCount());
+    executor.execute(m2, 1);
+    processStart2.await();
+    assertEquals(2, executor.activeCount());
+
+    // Max pool size was reached so no new work is accepted.
+    executor.execute(m3, 1);
+    assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
+
+    // Increase the max thread count

Review Comment:
   Currently the maximumBytesOutstanding is final and we don't change it at runtime. I added a test case for scheduling work when maximumBytesOutstanding is reached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on PR #30439:
URL: https://github.com/apache/beam/pull/30439#issuecomment-2008517195

   :runners:google-cloud-dataflow-java:examples-streaming:windmillPreCommit seems to be broken...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #30439:
URL: https://github.com/apache/beam/pull/30439#issuecomment-1969873017

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Tune maximum thread count for streaming dataflow worker executor dynamically. [beam]

Posted by "MelodyShen (via GitHub)" <gi...@apache.org>.
MelodyShen commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1508318117


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -48,6 +49,7 @@ public BoundedQueueExecutor(
       int maximumElementsOutstanding,
       long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
+    this.maximumThreadCount.set(maximumPoolSize);

Review Comment:
   Changed to int and use synchronized instead.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -38,6 +38,7 @@ public class BoundedQueueExecutor {
   private int elementsOutstanding = 0;
   private long bytesOutstanding = 0;
   private final AtomicInteger activeCount = new AtomicInteger();
+  private final AtomicInteger maximumThreadCount = new AtomicInteger();

Review Comment:
   Sure.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2916,6 +2916,69 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;

Review Comment:
   Cool!



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -70,7 +72,7 @@ protected void beforeExecute(Thread t, Runnable r) {
           protected void afterExecute(Runnable r, Throwable t) {
             super.afterExecute(r, t);
             synchronized (this) {
-              if (activeCount.getAndDecrement() == maximumPoolSize) {
+              if (activeCount.getAndDecrement() == maximumThreadCount.get()) {

Review Comment:
   Yeah that makes sense.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2916,6 +2916,69 @@ public void testActiveThreadMetric() throws Exception {
     executor.shutdown();
   }
 
+  @Test
+  public void testOverrideMaximumThreadCount() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep track of
+    // active thread count.
+    BoundedQueueExecutor executor =

Review Comment:
   Sounds good. Will refactor it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org