You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2024/03/21 00:08:04 UTC

(beam) branch master updated: Add BigTableIO Stress test (#30630)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8955124219c Add BigTableIO Stress test (#30630)
8955124219c is described below

commit 8955124219cf9bd6dcea74f22c70fed7940da2b8
Author: akashorabek <70...@users.noreply.github.com>
AuthorDate: Thu Mar 21 06:07:57 2024 +0600

    Add BigTableIO Stress test (#30630)
    
    * Add BigTableIO Stress test
    
    * refactor
    
    * update dependency tree
    
    * refactor
    
    * Add stress test files to grpc/protobuff exception ignore list
    
    * move exportMetrics method to IOLoadTestBase class
    
    * refactor
    
    * refactor
---
 it/google-cloud-platform/build.gradle              |   5 +-
 .../org/apache/beam/it/gcp/IOLoadTestBase.java     |  34 ++
 .../org/apache/beam/it/gcp/IOStressTestBase.java   | 123 +++++++
 .../apache/beam/it/gcp/bigquery/BigQueryIOST.java  | 105 +-----
 .../apache/beam/it/gcp/bigtable/BigTableIOST.java  | 389 +++++++++++++++++++++
 it/kafka/build.gradle                              |   4 +-
 .../java/org/apache/beam/it/kafka/KafkaIOST.java   | 156 ++-------
 .../resources/beam/checkstyle/suppressions.xml     |   1 +
 8 files changed, 578 insertions(+), 239 deletions(-)

diff --git a/it/google-cloud-platform/build.gradle b/it/google-cloud-platform/build.gradle
index 9717b5f8c84..3353a9692cb 100644
--- a/it/google-cloud-platform/build.gradle
+++ b/it/google-cloud-platform/build.gradle
@@ -32,6 +32,7 @@ dependencies {
     implementation project(path: ":runners:google-cloud-dataflow-java")
     implementation project(path: ":it:conditions", configuration: "shadow")
     implementation project(path: ":it:truthmatchers", configuration: "shadow")
+    implementation project(path: ":sdks:java:testing:test-utils")
     implementation library.java.slf4j_api
     implementation library.java.vendored_guava_32_1_2_jre
     implementation library.java.jackson_core
@@ -48,6 +49,7 @@ dependencies {
     implementation library.java.protobuf_java
     implementation library.java.threetenbp
     implementation 'org.awaitility:awaitility:4.2.0'
+    implementation 'joda-time:joda-time:2.10.10'
     // Google Cloud Dependencies
     implementation library.java.google_api_services_bigquery
     implementation library.java.google_cloud_core
@@ -71,7 +73,6 @@ dependencies {
     implementation 'com.google.cloud:google-cloud-secretmanager'
     provided 'com.google.api.grpc:proto-google-cloud-secretmanager-v1'
 
-    testImplementation project(path: ":sdks:java:testing:test-utils")
     testImplementation project(path: ":sdks:java:io:google-cloud-platform")
     testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
     testImplementation project(path: ":sdks:java:io:synthetic")
@@ -83,6 +84,8 @@ dependencies {
 
 tasks.register("GCSPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'FileBasedIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
 tasks.register("BigTablePerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("BigTableStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("BigTableStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
 tasks.register("BigQueryPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOLT', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
 tasks.register("BigQueryStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
 tasks.register("BigQueryStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
index 6b728a6a60d..e5f20c07c01 100644
--- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
+++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
@@ -17,12 +17,21 @@
  */
 package org.apache.beam.it.gcp;
 
+import com.google.cloud.Timestamp;
 import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.beam.it.common.PipelineLauncher;
 import org.apache.beam.it.common.TestProperties;
 import org.apache.beam.it.gcp.dataflow.DefaultPipelineLauncher;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.junit.After;
 import org.junit.Before;
@@ -101,4 +110,29 @@ public class IOLoadTestBase extends LoadTestBase {
   public static String getBeamMetricsName(PipelineMetricsType metricstype, String metricsName) {
     return BEAM_METRICS_NAMESPACE + ":" + metricstype + ":" + metricsName;
   }
+
+  /** Exports test metrics to InfluxDB or BigQuery depending on the configuration. */
+  protected void exportMetrics(
+      PipelineLauncher.LaunchInfo launchInfo,
+      MetricsConfiguration metricsConfig,
+      boolean exportToInfluxDB,
+      InfluxDBSettings influxDBSettings)
+      throws IOException, ParseException, InterruptedException {
+
+    Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig);
+    String testId = UUID.randomUUID().toString();
+    String testTimestamp = Timestamp.now().toString();
+
+    if (exportToInfluxDB) {
+      Collection<NamedTestResult> namedTestResults = new ArrayList<>();
+      for (Map.Entry<String, Double> entry : metrics.entrySet()) {
+        NamedTestResult metricResult =
+            NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue());
+        namedTestResults.add(metricResult);
+      }
+      IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings);
+    } else {
+      exportMetricsToBigQuery(launchInfo, metrics);
+    }
+  }
 }
diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java
new file mode 100644
index 00000000000..5c2fb74cd2f
--- /dev/null
+++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/** Base class for IO Stress tests. */
+public class IOStressTestBase extends IOLoadTestBase {
+  /**
+   * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
+   * eventually return to 1x.
+   */
+  protected static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
+
+  protected static final int DEFAULT_ROWS_PER_SECOND = 1000;
+
+  /**
+   * Generates and returns a list of LoadPeriod instances representing periods of load increase
+   * based on the specified load increase array and total duration in minutes.
+   *
+   * @param minutesTotal The total duration in minutes for which the load periods are generated.
+   * @return A list of LoadPeriod instances defining periods of load increase.
+   */
+  protected List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
+
+    List<LoadPeriod> loadPeriods = new ArrayList<>();
+    long periodDurationMillis =
+        Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
+    long startTimeMillis = 0;
+
+    for (int loadIncreaseMultiplier : loadIncreaseArray) {
+      long endTimeMillis = startTimeMillis + periodDurationMillis;
+      loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
+
+      startTimeMillis = endTimeMillis;
+    }
+    return loadPeriods;
+  }
+
+  /**
+   * Represents a period of time with associated load increase properties for stress testing
+   * scenarios.
+   */
+  protected static class LoadPeriod implements Serializable {
+    private final int loadIncreaseMultiplier;
+    private final long periodStartMillis;
+    private final long periodEndMillis;
+
+    public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
+      this.loadIncreaseMultiplier = loadIncreaseMultiplier;
+      this.periodStartMillis = periodStartMillis;
+      this.periodEndMillis = periodEndMin;
+    }
+
+    public int getLoadIncreaseMultiplier() {
+      return loadIncreaseMultiplier;
+    }
+
+    public long getPeriodStartMillis() {
+      return periodStartMillis;
+    }
+
+    public long getPeriodEndMillis() {
+      return periodEndMillis;
+    }
+  }
+
+  /**
+   * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
+   * load increase over time, multiplying the input elements based on the elapsed time since the
+   * start of processing. This class aims to simulate various load levels during stress testing.
+   */
+  protected static class MultiplierDoFn<T> extends DoFn<T, T> {
+    private final int startMultiplier;
+    private final long startTimesMillis;
+    private final List<LoadPeriod> loadPeriods;
+
+    public MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
+      this.startMultiplier = startMultiplier;
+      this.startTimesMillis = Instant.now().getMillis();
+      this.loadPeriods = loadPeriods;
+    }
+
+    @DoFn.ProcessElement
+    public void processElement(
+        @Element T element, OutputReceiver<T> outputReceiver, @DoFn.Timestamp Instant timestamp) {
+
+      int multiplier = this.startMultiplier;
+      long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
+
+      for (LoadPeriod loadPeriod : loadPeriods) {
+        if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
+            && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
+          multiplier *= loadPeriod.getLoadIncreaseMultiplier();
+          break;
+        }
+      }
+      for (int i = 0; i < multiplier; i++) {
+        outputReceiver.output(element);
+      }
+    }
+  }
+}
diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
index 6ffe1014c8a..d0eed457b19 100644
--- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
+++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
@@ -26,7 +26,6 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.Timestamp;
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.text.ParseException;
 import java.time.Duration;
@@ -46,7 +45,7 @@ import org.apache.beam.it.common.PipelineLauncher;
 import org.apache.beam.it.common.PipelineOperator;
 import org.apache.beam.it.common.TestProperties;
 import org.apache.beam.it.common.utils.ResourceManagerUtils;
-import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.it.gcp.IOStressTestBase;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
@@ -59,7 +58,6 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.PeriodicImpulse;
@@ -70,7 +68,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
-import org.joda.time.Instant;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -87,18 +84,11 @@ import org.junit.Test;
  * - To run large-scale stress tests: {@code gradle
  * :it:google-cloud-platform:BigQueryStressTestLarge}
  */
-public final class BigQueryIOST extends IOLoadTestBase {
+public final class BigQueryIOST extends IOStressTestBase {
 
   private static final String READ_ELEMENT_METRIC_NAME = "read_count";
   private static final String TEST_ID = UUID.randomUUID().toString();
   private static final String TEST_TIMESTAMP = Timestamp.now().toString();
-  private static final int DEFAULT_ROWS_PER_SECOND = 1000;
-
-  /**
-   * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
-   * eventually return to 1x.
-   */
-  private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
 
   private static BigQueryResourceManager resourceManager;
   private static String tableQualifier;
@@ -301,7 +291,7 @@ public final class BigQueryIOST extends IOLoadTestBase {
           source
               .apply(
                   "One input to multiple outputs",
-                  ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods)))
+                  ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
               .apply("Reshuffle fanout", Reshuffle.viaRandomKey())
               .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME)));
     }
@@ -371,44 +361,6 @@ public final class BigQueryIOST extends IOLoadTestBase {
     }
   }
 
-  /**
-   * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
-   * load increase over time, multiplying the input elements based on the elapsed time since the
-   * start of processing. This class aims to simulate various load levels during stress testing.
-   */
-  private static class MultiplierDoFn extends DoFn<byte[], byte[]> {
-    private final int startMultiplier;
-    private final long startTimesMillis;
-    private final List<LoadPeriod> loadPeriods;
-
-    MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
-      this.startMultiplier = startMultiplier;
-      this.startTimesMillis = Instant.now().getMillis();
-      this.loadPeriods = loadPeriods;
-    }
-
-    @ProcessElement
-    public void processElement(
-        @Element byte[] element,
-        OutputReceiver<byte[]> outputReceiver,
-        @DoFn.Timestamp Instant timestamp) {
-
-      int multiplier = this.startMultiplier;
-      long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
-
-      for (LoadPeriod loadPeriod : loadPeriods) {
-        if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
-            && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
-          multiplier *= loadPeriod.getLoadIncreaseMultiplier();
-          break;
-        }
-      }
-      for (int i = 0; i < multiplier; i++) {
-        outputReceiver.output(element);
-      }
-    }
-  }
-
   abstract static class FormatFn<InputT, OutputT> implements SerializableFunction<InputT, OutputT> {
     protected final int numColumns;
 
@@ -493,29 +445,6 @@ public final class BigQueryIOST extends IOLoadTestBase {
     }
   }
 
-  /**
-   * Generates and returns a list of LoadPeriod instances representing periods of load increase
-   * based on the specified load increase array and total duration in minutes.
-   *
-   * @param minutesTotal The total duration in minutes for which the load periods are generated.
-   * @return A list of LoadPeriod instances defining periods of load increase.
-   */
-  private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
-
-    List<LoadPeriod> loadPeriods = new ArrayList<>();
-    long periodDurationMillis =
-        Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
-    long startTimeMillis = 0;
-
-    for (int loadIncreaseMultiplier : loadIncreaseArray) {
-      long endTimeMillis = startTimeMillis + periodDurationMillis;
-      loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
-
-      startTimeMillis = endTimeMillis;
-    }
-    return loadPeriods;
-  }
-
   private enum WriteFormat {
     AVRO,
     JSON
@@ -573,32 +502,4 @@ public final class BigQueryIOST extends IOLoadTestBase {
     /** InfluxDB database to publish metrics. * */
     @JsonProperty public String influxDatabase;
   }
-
-  /**
-   * Represents a period of time with associated load increase properties for stress testing
-   * scenarios.
-   */
-  private static class LoadPeriod implements Serializable {
-    private final int loadIncreaseMultiplier;
-    private final long periodStartMillis;
-    private final long periodEndMillis;
-
-    public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
-      this.loadIncreaseMultiplier = loadIncreaseMultiplier;
-      this.periodStartMillis = periodStartMillis;
-      this.periodEndMillis = periodEndMin;
-    }
-
-    public int getLoadIncreaseMultiplier() {
-      return loadIncreaseMultiplier;
-    }
-
-    public long getPeriodStartMillis() {
-      return periodStartMillis;
-    }
-
-    public long getPeriodEndMillis() {
-      return periodEndMillis;
-    }
-  }
 }
diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
new file mode 100644
index 00000000000..4821992381b
--- /dev/null
+++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp.bigtable;
+
+import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.bigtable.v2.Mutation;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.TestProperties;
+import org.apache.beam.it.common.utils.ResourceManagerUtils;
+import org.apache.beam.it.gcp.IOStressTestBase;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * BigTableIO stress test. The test is designed to assess the performance of BigTableIO under
+ * various conditions.
+ *
+ * <p>Usage: <br>
+ * - To run medium-scale stress tests: {@code gradle
+ * :it:google-cloud-platform:BigTableStressTestMedium} - To run large-scale stress tests: {@code
+ * gradle :it:google-cloud-platform:BigTableStressTestLarge}
+ */
+public final class BigTableIOST extends IOStressTestBase {
+
+  private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
+  private static final String READ_ELEMENT_METRIC_NAME = "read_count";
+  private static final String COLUMN_FAMILY_NAME = "cf";
+  private static final long TABLE_MAX_AGE_MINUTES = 800L;
+
+  private BigtableResourceManager resourceManager;
+  private InfluxDBSettings influxDBSettings;
+  private Configuration configuration;
+  private String testConfigName;
+  private String tableId;
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Before
+  public void setup() throws IOException {
+    resourceManager =
+        BigtableResourceManager.builder(testName, project, CREDENTIALS_PROVIDER).build();
+
+    // create table
+    tableId = generateTableId(testName);
+    resourceManager.createTable(
+        tableId,
+        ImmutableList.of(COLUMN_FAMILY_NAME),
+        org.threeten.bp.Duration.ofMinutes(TABLE_MAX_AGE_MINUTES));
+
+    // parse configuration
+    testConfigName =
+        TestProperties.getProperty("configuration", "medium", TestProperties.Type.PROPERTY);
+    configuration = TEST_CONFIGS_PRESET.get(testConfigName);
+    if (configuration == null) {
+      try {
+        configuration = Configuration.fromJsonString(testConfigName, Configuration.class);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown test configuration: [%s]. Pass to a valid configuration json, or use"
+                    + " config presets: %s",
+                testConfigName, TEST_CONFIGS_PRESET.keySet()));
+      }
+    }
+
+    // tempLocation needs to be set for DataflowRunner
+    if (!Strings.isNullOrEmpty(tempBucketName)) {
+      String tempLocation = String.format("gs://%s/temp/", tempBucketName);
+      writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation);
+      writePipeline.getOptions().setTempLocation(tempLocation);
+      readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation);
+      readPipeline.getOptions().setTempLocation(tempLocation);
+    }
+    // Use streaming pipeline to write records
+    writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+  }
+
+  @After
+  public void teardown() {
+    ResourceManagerUtils.cleanResources(resourceManager);
+  }
+
+  private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
+
+  static {
+    try {
+      TEST_CONFIGS_PRESET =
+          ImmutableMap.of(
+              "medium",
+              Configuration.fromJsonString(
+                  "{\"rowsPerSecond\":25000,\"minutes\":40,\"pipelineTimeout\":120,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}",
+                  Configuration.class),
+              "large",
+              Configuration.fromJsonString(
+                  "{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":200,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}",
+                  Configuration.class));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Run stress test with configurations specified by TestProperties. */
+  @Test
+  public void runTest() throws IOException, ParseException, InterruptedException {
+    if (configuration.exportMetricsToInfluxDB) {
+      influxDBSettings =
+          InfluxDBSettings.builder()
+              .withHost(configuration.influxHost)
+              .withDatabase(configuration.influxDatabase)
+              .withMeasurement(configuration.influxMeasurement + "_" + testConfigName)
+              .get();
+    }
+
+    PipelineLauncher.LaunchInfo writeInfo = generateDataAndWrite();
+    PipelineOperator.Result writeResult =
+        pipelineOperator.waitUntilDone(
+            createConfig(writeInfo, Duration.ofMinutes(configuration.pipelineTimeout)));
+    assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, writeResult);
+
+    PipelineLauncher.LaunchInfo readInfo = readData();
+    PipelineOperator.Result readResult =
+        pipelineOperator.waitUntilDone(
+            createConfig(readInfo, Duration.ofMinutes(configuration.pipelineTimeout)));
+    assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, readResult);
+
+    try {
+      double writeNumRecords =
+          pipelineLauncher.getMetric(
+              project,
+              region,
+              writeInfo.jobId(),
+              getBeamMetricsName(PipelineMetricsType.COUNTER, WRITE_ELEMENT_METRIC_NAME));
+
+      double readNumRecords =
+          pipelineLauncher.getMetric(
+              project,
+              region,
+              readInfo.jobId(),
+              getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME));
+
+      assertEquals(writeNumRecords, readNumRecords, 0);
+    } finally {
+      // clean up write streaming pipeline
+      if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId())
+          == PipelineLauncher.JobState.RUNNING) {
+        pipelineLauncher.cancelJob(project, region, writeInfo.jobId());
+      }
+    }
+
+    // export metrics
+    MetricsConfiguration writeMetricsConfig =
+        MetricsConfiguration.builder()
+            .setOutputPCollection("Counting element.out0")
+            .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0")
+            .build();
+
+    MetricsConfiguration readMetricsConfig =
+        MetricsConfiguration.builder()
+            .setOutputPCollection("Counting element.out0")
+            .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0")
+            .build();
+
+    exportMetrics(
+        writeInfo, writeMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
+    exportMetrics(
+        readInfo, readMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
+  }
+
+  /**
+   * The method creates a pipeline to simulate data generation and write operations to BigTable,
+   * based on the specified configuration parameters. The stress test involves varying the load
+   * dynamically over time, with options to use configurable parameters.
+   */
+  private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
+    // The PeriodicImpulse source will generate an element every this many millis:
+    int fireInterval = 1;
+    // Each element from PeriodicImpulse will fan out to this many elements:
+    int startMultiplier =
+        Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / DEFAULT_ROWS_PER_SECOND;
+    long stopAfterMillis =
+        org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis();
+    long totalRows = startMultiplier * stopAfterMillis / fireInterval;
+    List<LoadPeriod> loadPeriods =
+        getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
+
+    PCollection<org.joda.time.Instant> source =
+        writePipeline.apply(
+            PeriodicImpulse.create()
+                .stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1))
+                .withInterval(org.joda.time.Duration.millis(fireInterval)));
+    if (startMultiplier > 1) {
+      source =
+          source
+              .apply(
+                  "One input to multiple outputs",
+                  ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
+              .apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME)));
+    }
+    source
+        .apply(
+            "Map records to BigTable format",
+            ParDo.of(new MapToBigTableFormat((int) configuration.valueSizeBytes, (int) totalRows)))
+        .apply(
+            "Write to BigTable",
+            BigtableIO.write()
+                .withProjectId(project)
+                .withInstanceId(resourceManager.getInstanceId())
+                .withTableId(tableId));
+
+    PipelineLauncher.LaunchConfig options =
+        PipelineLauncher.LaunchConfig.builder("write-bigtable")
+            .setSdk(PipelineLauncher.Sdk.JAVA)
+            .setPipeline(writePipeline)
+            .addParameter("runner", configuration.runner)
+            .addParameter(
+                "autoscalingAlgorithm",
+                DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED
+                    .toString())
+            .addParameter("numWorkers", String.valueOf(configuration.numWorkers))
+            .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers))
+            .addParameter("experiments", "use_runner_v2")
+            .build();
+
+    return pipelineLauncher.launch(project, region, options);
+  }
+
+  /** The method reads data from BigTable in batch mode. */
+  private PipelineLauncher.LaunchInfo readData() throws IOException {
+    BigtableIO.Read readIO =
+        BigtableIO.read()
+            .withoutValidation()
+            .withProjectId(project)
+            .withInstanceId(resourceManager.getInstanceId())
+            .withTableId(tableId);
+
+    readPipeline
+        .apply("Read from BigTable", readIO)
+        .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME)));
+
+    PipelineLauncher.LaunchConfig options =
+        PipelineLauncher.LaunchConfig.builder("read-bigtable")
+            .setSdk(PipelineLauncher.Sdk.JAVA)
+            .setPipeline(readPipeline)
+            .addParameter("runner", configuration.runner)
+            .addParameter(
+                "autoscalingAlgorithm",
+                DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED
+                    .toString())
+            .addParameter("numWorkers", String.valueOf(configuration.numWorkers))
+            .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers))
+            .build();
+
+    return pipelineLauncher.launch(project, region, options);
+  }
+
+  /** Options for BigTableIO stress test. */
+  static class Configuration extends SyntheticSourceOptions {
+    /** Pipeline timeout in minutes. Must be a positive value. */
+    @JsonProperty public int pipelineTimeout = 20;
+
+    /** Runner specified to run the pipeline. */
+    @JsonProperty public String runner = "DirectRunner";
+
+    /** Number of workers for the pipeline. */
+    @JsonProperty public int numWorkers = 20;
+
+    /** Maximum number of workers for the pipeline. */
+    @JsonProperty public int maxNumWorkers = 100;
+
+    /**
+     * Rate of generated elements sent to the source table. Will run with a minimum of 1k rows per
+     * second.
+     */
+    @JsonProperty public int rowsPerSecond = DEFAULT_ROWS_PER_SECOND;
+
+    /** Rows will be generated for this many minutes. */
+    @JsonProperty public int minutes = 15;
+
+    /**
+     * Determines the destination for exporting metrics. If set to true, metrics will be exported to
+     * InfluxDB and displayed using Grafana. If set to false, metrics will be exported to BigQuery
+     * and displayed with Looker Studio.
+     */
+    @JsonProperty public boolean exportMetricsToInfluxDB = false;
+
+    /** InfluxDB measurement to publish results to. * */
+    @JsonProperty public String influxMeasurement = BigTableIOST.class.getName();
+
+    /** InfluxDB host to publish metrics. * */
+    @JsonProperty public String influxHost;
+
+    /** InfluxDB database to publish metrics. * */
+    @JsonProperty public String influxDatabase;
+  }
+
+  /** Maps Instant to the BigTable format record. */
+  private static class MapToBigTableFormat
+      extends DoFn<org.joda.time.Instant, KV<ByteString, Iterable<Mutation>>>
+      implements Serializable {
+
+    private final int valueSizeBytes;
+    private final int totalRows;
+
+    public MapToBigTableFormat(int valueSizeBytes, int totalRows) {
+      this.valueSizeBytes = valueSizeBytes;
+      this.totalRows = totalRows;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      long index = Objects.requireNonNull(c.element()).getMillis() % totalRows;
+
+      ByteString key =
+          ByteString.copyFromUtf8(
+              String.format(
+                  "key%s",
+                  index
+                      + "-"
+                      + UUID.randomUUID()
+                      + "-"
+                      + UUID.randomUUID()
+                      + "-"
+                      + org.joda.time.Instant.now().getMillis()));
+      Random random = new Random(index);
+      byte[] valBytes = new byte[this.valueSizeBytes];
+      random.nextBytes(valBytes);
+      ByteString value = ByteString.copyFrom(valBytes);
+
+      Iterable<Mutation> mutations =
+          ImmutableList.of(
+              Mutation.newBuilder()
+                  .setSetCell(
+                      Mutation.SetCell.newBuilder()
+                          .setValue(value)
+                          .setTimestampMicros(java.time.Instant.now().toEpochMilli() * 1000L)
+                          .setFamilyName(COLUMN_FAMILY_NAME))
+                  .build());
+      c.output(KV.of(key, mutations));
+    }
+  }
+}
diff --git a/it/kafka/build.gradle b/it/kafka/build.gradle
index b1b8147e72a..96f915a1d84 100644
--- a/it/kafka/build.gradle
+++ b/it/kafka/build.gradle
@@ -46,5 +46,5 @@ dependencies {
     testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
 }
 
-tasks.register("KafkaStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'medium','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
-tasks.register("KafkaStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'large','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("KafkaStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'medium','bootstrapServers':System.getProperty("bootstrapServers"),'useDataflowRunnerV2':System.getProperty("useDataflowRunnerV2"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("KafkaStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'large','bootstrapServers':System.getProperty("bootstrapServers"),'useDataflowRunnerV2':System.getProperty("useDataflowRunnerV2"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
index 2a830400a0a..4ca34328637 100644
--- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
@@ -21,15 +21,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.cloud.Timestamp;
 import java.io.IOException;
-import java.io.Serializable;
 import java.text.ParseException;
 import java.time.Duration;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -37,17 +33,14 @@ import java.util.UUID;
 import org.apache.beam.it.common.PipelineLauncher;
 import org.apache.beam.it.common.PipelineOperator;
 import org.apache.beam.it.common.TestProperties;
-import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.it.gcp.IOStressTestBase;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.testutils.NamedTestResult;
-import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.PeriodicImpulse;
@@ -61,7 +54,6 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -81,17 +73,10 @@ import org.junit.Test;
  * - To run large-scale stress tests: {@code gradle :it:kafka:KafkaStressTestLarge
  * -DbootstrapServers="0.0.0.0:32400,1.1.1.1:32400"}
  */
-public final class KafkaIOST extends IOLoadTestBase {
-  /**
-   * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
-   * eventually return to 1x.
-   */
-  private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
-
-  private static InfluxDBSettings influxDBSettings;
+public final class KafkaIOST extends IOStressTestBase {
   private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
   private static final String READ_ELEMENT_METRIC_NAME = "read_count";
-  private static final int DEFAULT_ROWS_PER_SECOND = 1000;
+  private static InfluxDBSettings influxDBSettings;
   private Configuration configuration;
   private AdminClient adminClient;
   private String testConfigName;
@@ -120,6 +105,11 @@ public final class KafkaIOST extends IOLoadTestBase {
     }
     configuration.bootstrapServers =
         TestProperties.getProperty("bootstrapServers", null, TestProperties.Type.PROPERTY);
+    String useDataflowRunnerV2FromProps =
+        TestProperties.getProperty("useDataflowRunnerV2", "true", TestProperties.Type.PROPERTY);
+    if (!useDataflowRunnerV2FromProps.isEmpty()) {
+      configuration.useDataflowRunnerV2 = Boolean.parseBoolean(useDataflowRunnerV2FromProps);
+    }
 
     adminClient =
         AdminClient.create(ImmutableMap.of("bootstrap.servers", configuration.bootstrapServers));
@@ -231,8 +221,10 @@ public final class KafkaIOST extends IOLoadTestBase {
             .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0")
             .build();
 
-    exportMetrics(writeInfo, writeMetricsConfig);
-    exportMetrics(readInfo, readMetricsConfig);
+    exportMetrics(
+        writeInfo, writeMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
+    exportMetrics(
+        readInfo, readMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
   }
 
   /**
@@ -267,7 +259,7 @@ public final class KafkaIOST extends IOLoadTestBase {
           source
               .apply(
                   "One input to multiple outputs",
-                  ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods)))
+                  ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
               .apply("Reshuffle fanout", Reshuffle.viaRandomKey())
               .apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME)));
     }
@@ -295,7 +287,7 @@ public final class KafkaIOST extends IOLoadTestBase {
                     .toString())
             .addParameter("numWorkers", String.valueOf(configuration.numWorkers))
             .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers))
-            .addParameter("experiments", "use_runner_v2")
+            .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "")
             .build();
 
     return pipelineLauncher.launch(project, region, options);
@@ -310,7 +302,7 @@ public final class KafkaIOST extends IOLoadTestBase {
             .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"));
 
     readPipeline
-        .apply("Read from unbounded Kafka", readFromKafka)
+        .apply("Read from Kafka", readFromKafka)
         .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME)));
 
     PipelineLauncher.LaunchConfig options =
@@ -319,94 +311,12 @@ public final class KafkaIOST extends IOLoadTestBase {
             .setPipeline(readPipeline)
             .addParameter("numWorkers", String.valueOf(configuration.numWorkers))
             .addParameter("runner", configuration.runner)
-            .addParameter("experiments", "use_runner_v2")
+            .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "")
             .build();
 
     return pipelineLauncher.launch(project, region, options);
   }
 
-  private void exportMetrics(
-      PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration metricsConfig)
-      throws IOException, ParseException, InterruptedException {
-
-    Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig);
-    String testId = UUID.randomUUID().toString();
-    String testTimestamp = Timestamp.now().toString();
-
-    if (configuration.exportMetricsToInfluxDB) {
-      Collection<NamedTestResult> namedTestResults = new ArrayList<>();
-      for (Map.Entry<String, Double> entry : metrics.entrySet()) {
-        NamedTestResult metricResult =
-            NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue());
-        namedTestResults.add(metricResult);
-      }
-      IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings);
-    } else {
-      exportMetricsToBigQuery(launchInfo, metrics);
-    }
-  }
-
-  /**
-   * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
-   * load increase over time, multiplying the input elements based on the elapsed time since the
-   * start of processing. This class aims to simulate various load levels during stress testing.
-   */
-  private static class MultiplierDoFn extends DoFn<byte[], byte[]> {
-    private final int startMultiplier;
-    private final long startTimesMillis;
-    private final List<LoadPeriod> loadPeriods;
-
-    MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
-      this.startMultiplier = startMultiplier;
-      this.startTimesMillis = Instant.now().getMillis();
-      this.loadPeriods = loadPeriods;
-    }
-
-    @DoFn.ProcessElement
-    public void processElement(
-        @Element byte[] element,
-        OutputReceiver<byte[]> outputReceiver,
-        @DoFn.Timestamp Instant timestamp) {
-
-      int multiplier = this.startMultiplier;
-      long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
-
-      for (LoadPeriod loadPeriod : loadPeriods) {
-        if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
-            && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
-          multiplier *= loadPeriod.getLoadIncreaseMultiplier();
-          break;
-        }
-      }
-      for (int i = 0; i < multiplier; i++) {
-        outputReceiver.output(element);
-      }
-    }
-  }
-
-  /**
-   * Generates and returns a list of LoadPeriod instances representing periods of load increase
-   * based on the specified load increase array and total duration in minutes.
-   *
-   * @param minutesTotal The total duration in minutes for which the load periods are generated.
-   * @return A list of LoadPeriod instances defining periods of load increase.
-   */
-  private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
-
-    List<LoadPeriod> loadPeriods = new ArrayList<>();
-    long periodDurationMillis =
-        Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
-    long startTimeMillis = 0;
-
-    for (int loadIncreaseMultiplier : loadIncreaseArray) {
-      long endTimeMillis = startTimeMillis + periodDurationMillis;
-      loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
-
-      startTimeMillis = endTimeMillis;
-    }
-    return loadPeriods;
-  }
-
   /** Options for Kafka IO stress test. */
   static class Configuration extends SyntheticSourceOptions {
     /** Pipeline timeout in minutes. Must be a positive value. */
@@ -415,6 +325,12 @@ public final class KafkaIOST extends IOLoadTestBase {
     /** Runner specified to run the pipeline. */
     @JsonProperty public String runner = "DirectRunner";
 
+    /**
+     * Determines whether to use Dataflow runner v2. If set to true, it uses SDF mode for reading
+     * from Kafka. Otherwise, Unbounded mode will be used.
+     */
+    @JsonProperty public boolean useDataflowRunnerV2 = true;
+
     /** Number of workers for the pipeline. */
     @JsonProperty public int numWorkers = 20;
 
@@ -449,32 +365,4 @@ public final class KafkaIOST extends IOLoadTestBase {
     /** InfluxDB database to publish metrics. * */
     @JsonProperty public String influxDatabase;
   }
-
-  /**
-   * Represents a period of time with associated load increase properties for stress testing
-   * scenarios.
-   */
-  private static class LoadPeriod implements Serializable {
-    private final int loadIncreaseMultiplier;
-    private final long periodStartMillis;
-    private final long periodEndMillis;
-
-    public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
-      this.loadIncreaseMultiplier = loadIncreaseMultiplier;
-      this.periodStartMillis = periodStartMillis;
-      this.periodEndMillis = periodEndMin;
-    }
-
-    public int getLoadIncreaseMultiplier() {
-      return loadIncreaseMultiplier;
-    }
-
-    public long getPeriodStartMillis() {
-      return periodStartMillis;
-    }
-
-    public long getPeriodEndMillis() {
-      return periodEndMillis;
-    }
-  }
 }
diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
index e0c53436d4e..824aa901078 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
@@ -86,6 +86,7 @@
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*Base\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*Client\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*LT\.java" />
+  <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*ST\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*ResourceManagerTest\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*testinfra.*mockapis.*" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*requestresponse.*" />