You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/12/11 15:44:08 UTC

(beam) branch master updated: Add per test timeout in recently changed dataflow legacy worker tests (#29696)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 218af9de868 Add per test timeout in recently changed dataflow legacy worker tests (#29696)
218af9de868 is described below

commit 218af9de868198c60ed210cc388a663a80f3423b
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon Dec 11 10:44:00 2023 -0500

    Add per test timeout in recently changed dataflow legacy worker tests (#29696)
---
 .../apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java  | 2 ++
 .../beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java      | 3 +++
 .../beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java     | 1 -
 .../runners/dataflow/worker/StreamingModeExecutionContextTest.java    | 4 +++-
 .../runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java     | 4 +++-
 .../beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java   | 4 +++-
 .../runners/dataflow/worker/StreamingStepMetricsContainerTest.java    | 4 +++-
 .../dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java | 2 ++
 .../beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java   | 4 +++-
 .../runners/dataflow/worker/streaming/WeightBoundedQueueTest.java     | 3 +++
 .../worker/streaming/sideinput/SideInputStateFetcherTest.java         | 3 +++
 .../dataflow/worker/windmill/client/WindmillStreamPoolTest.java       | 3 +++
 .../worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java  | 2 ++
 .../dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java  | 3 ++-
 .../worker/windmill/client/grpc/StreamingEngineClientTest.java        | 2 ++
 .../worker/windmill/client/grpc/WindmillStreamSenderTest.java         | 2 ++
 .../dataflow/worker/windmill/state/WindmillStateCacheTest.java        | 4 +++-
 .../dataflow/worker/windmill/state/WindmillStateInternalsTest.java    | 4 +++-
 .../dataflow/worker/windmill/state/WindmillStateReaderTest.java       | 3 +++
 .../worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java | 2 ++
 .../worker/windmill/work/budget/GetWorkBudgetRefresherTest.java       | 3 +++
 .../dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java       | 3 +++
 22 files changed, 56 insertions(+), 9 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
index b4f544129db..e33412a19d9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
@@ -50,6 +50,7 @@ import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
@@ -61,6 +62,7 @@ import org.mockito.hamcrest.MockitoHamcrest;
 @RunWith(JUnit4.class)
 public class BatchDataflowWorkerTest {
 
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   @Rule public FastNanoClockAndSleeper clockAndSleeper = new FastNanoClockAndSleeper();
   @Mock WorkUnitClient mockWorkUnitClient;
   @Mock DataflowWorkProgressUpdater mockProgressUpdater;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index 3c63f3cc19d..5329fb0f601 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -50,6 +50,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
@@ -60,6 +61,8 @@ import org.slf4j.LoggerFactory;
 /** Unit tests for {@link DataflowWorkUnitClient}. */
 @RunWith(JUnit4.class)
 public class DataflowWorkUnitClientTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+
   private static final Logger LOG = LoggerFactory.getLogger(DataflowWorkUnitClientTest.class);
   private static final String PROJECT_ID = "TEST_PROJECT_ID";
   private static final String JOB_ID = "TEST_JOB_ID";
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 6826607513d..2793cdd8182 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -195,7 +195,6 @@ import org.slf4j.LoggerFactory;
 // released (2.11.0)
 @SuppressWarnings("unused")
 public class StreamingDataflowWorkerTest {
-
   private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorkerTest.class);
   private static final IntervalWindow DEFAULT_WINDOW =
       new IntervalWindow(new Instant(1234), Duration.millis(1000));
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 9991520d593..60ecaa3e37e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -73,7 +73,9 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
@@ -83,7 +85,7 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link StreamingModeExecutionContext}. */
 @RunWith(JUnit4.class)
 public class StreamingModeExecutionContextTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   @Mock private SideInputStateFetcher sideInputStateFetcher;
   @Mock private WindmillStateReader stateReader;
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java
index 3c121ab27f7..07c1080d8f2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java
@@ -69,7 +69,9 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
@@ -79,7 +81,7 @@ import org.mockito.MockitoAnnotations;
 /** Unit tests for {@link StreamingSideInputDoFnRunner}. */
 @RunWith(JUnit4.class)
 public class StreamingSideInputDoFnRunnerTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10));
 
   static TupleTag<String> mainOutputTag = new TupleTag<>();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
index a7196613fbb..d4fee95ead5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
@@ -56,7 +56,9 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
@@ -65,7 +67,7 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link StreamingSideInputFetcher}. */
 @RunWith(JUnit4.class)
 public class StreamingSideInputFetcherTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10));
 
   static TupleTag<String> mainOutputTag = new TupleTag<>();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
index 9e6d45a2351..9be65c198ac 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
@@ -37,7 +37,9 @@ import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.NoOpCounter;
 import org.apache.beam.sdk.metrics.NoOpHistogram;
 import org.apache.beam.sdk.util.HistogramData;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -47,7 +49,7 @@ import org.junit.runners.JUnit4;
   "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
 })
 public class StreamingStepMetricsContainerTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private MetricsContainerRegistry registry = StreamingStepMetricsContainer.createRegistry();
 
   private MetricsContainer c1 = registry.getContainer("s1");
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
index c1b134cafa1..425b2140a96 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
@@ -55,6 +55,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.LoggerFactory;
@@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory;
  */
 @RunWith(JUnit4.class)
 public class DataflowWorkerLoggingInitializerTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   @Rule public TemporaryFolder logFolder = new TemporaryFolder();
 
   @Rule public RestoreSystemProperties restoreProperties = new RestoreSystemProperties();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
index 12ae816de82..540166f226a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
@@ -41,13 +41,15 @@ import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class ActiveWorkStateTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private final WindmillStateCache.ForComputation computationStateCache =
       mock(WindmillStateCache.ForComputation.class);
   private Map<ShardedKey, Deque<Work>> readOnlyActiveWork;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java
index b2d98fb0e95..4f035c88774 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java
@@ -22,12 +22,15 @@ import static org.junit.Assert.assertNull;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class WeightBoundedQueueTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final int MAX_WEIGHT = 10;
 
   @Test
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
index daf81461879..1e188da2dd6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
@@ -51,7 +51,9 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
@@ -63,6 +65,7 @@ import org.mockito.MockitoAnnotations;
 @SuppressWarnings("deprecation")
 @RunWith(JUnit4.class)
 public class SideInputStateFetcherTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final String STATE_FAMILY = "state";
 
   @Mock private MetricTrackingWindmillServerStub server;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
index 264540531bf..a2f5e71d04c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
@@ -30,12 +30,15 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class WindmillStreamPoolTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final int DEFAULT_NUM_STREAMS = 10;
   private static final int NEW_STREAM_HOLDS = 2;
   private final ConcurrentHashMap<
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
index e3b07bf7aa4..253d6ff3a48 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
@@ -55,12 +55,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mockito;
 
 @RunWith(JUnit4.class)
 public class GrpcGetWorkerMetadataStreamTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final String IPV6_ADDRESS_1 = "2001:db8:0000:bac5:0000:0000:fed0:81a2";
   private static final String IPV6_ADDRESS_2 = "2001:db8:0000:bac5:0000:0000:fed0:82a3";
   private static final List<WorkerMetadataResponse.Endpoint> DIRECT_PATH_ENDPOINTS =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index d9f4b72716c..5f8a452a043 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -94,6 +94,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ErrorCollector;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
@@ -105,7 +106,7 @@ import org.slf4j.LoggerFactory;
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
 })
 public class GrpcWindmillServerTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServerTest.class);
   private static final int STREAM_CHUNK_SIZE = 2 << 20;
   private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
index 8a2c643a5b7..46983a618e4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
@@ -66,11 +66,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class StreamingEngineClientTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final WindmillServiceAddress DEFAULT_WINDMILL_SERVICE_ADDRESS =
       WindmillServiceAddress.create(HostAndPort.fromParts(WindmillChannelFactory.LOCALHOST, 443));
   private static final ImmutableMap<String, WorkerMetadataResponse.Endpoint> DEFAULT =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
index c8d2974f923..5fd1814e511 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
@@ -43,11 +43,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class WindmillStreamSenderTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final GetWorkRequest GET_WORK_REQUEST =
       GetWorkRequest.newBuilder().setClientId(1L).setJobId("job").setProjectId("project").build();
   @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
index cc6633f1b70..35d01aaffb8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
@@ -35,14 +35,16 @@ import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Tests for {@link org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache}. */
 @RunWith(JUnit4.class)
 public class WindmillStateCacheTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final String COMPUTATION = "computation";
   private static final long SHARDING_KEY = 123;
   private static final WindmillComputationKey COMPUTATION_KEY =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index 8971c39ccaa..d2590ceb846 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -98,7 +98,9 @@ import org.hamcrest.core.CombinableMatcher;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
@@ -112,7 +114,7 @@ import org.mockito.MockitoAnnotations;
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
 })
 public class WindmillStateInternalsTest {
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   public static final Range<Long> FULL_ORDERED_LIST_RANGE =
       Range.closedOpen(WindmillOrderedList.MIN_TS_MICROS, WindmillOrderedList.MAX_TS_MICROS);
   private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns");
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateReaderTest.java
index 430e31ee04f..7ef74639bb5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateReaderTest.java
@@ -54,7 +54,9 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncodin
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
@@ -69,6 +71,7 @@ import org.mockito.MockitoAnnotations;
   "FutureReturnValueIgnored",
 })
 public class WindmillStateReaderTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final VarIntCoder INT_CODER = VarIntCoder.of();
 
   private static final String COMPUTATION = "computation";
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
index 14da55fe238..54b605efbf3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
@@ -40,11 +40,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class EvenGetWorkBudgetDistributorTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
 
   private ManagedChannel inProcessChannel;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
index fd85410cc91..101e111cb65 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
@@ -21,13 +21,16 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mockito;
 
 @RunWith(JUnit4.class)
 public class GetWorkBudgetRefresherTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final int WAIT_BUFFER = 10;
   private final Runnable redistributeBudget = Mockito.mock(Runnable.class);
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
index 97789abaaa9..2d2806bef61 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
@@ -19,12 +19,15 @@ package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
 
 import static org.junit.Assert.assertEquals;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class GetWorkBudgetTest {
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
 
   @Test
   public void testCreateWithNoBudget() {