You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2024/03/21 00:08:04 UTC
(beam) branch master updated: Add BigTableIO Stress test (#30630)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8955124219c Add BigTableIO Stress test (#30630)
8955124219c is described below
commit 8955124219cf9bd6dcea74f22c70fed7940da2b8
Author: akashorabek <70...@users.noreply.github.com>
AuthorDate: Thu Mar 21 06:07:57 2024 +0600
Add BigTableIO Stress test (#30630)
* Add BigTableIO Stress test
* refactor
* update dependency tree
* refactor
* Add stress test files to grpc/protobuff exception ignore list
* move exportMetrics method to IOLoadTestBase class
* refactor
* refactor
---
it/google-cloud-platform/build.gradle | 5 +-
.../org/apache/beam/it/gcp/IOLoadTestBase.java | 34 ++
.../org/apache/beam/it/gcp/IOStressTestBase.java | 123 +++++++
.../apache/beam/it/gcp/bigquery/BigQueryIOST.java | 105 +-----
.../apache/beam/it/gcp/bigtable/BigTableIOST.java | 389 +++++++++++++++++++++
it/kafka/build.gradle | 4 +-
.../java/org/apache/beam/it/kafka/KafkaIOST.java | 156 ++-------
.../resources/beam/checkstyle/suppressions.xml | 1 +
8 files changed, 578 insertions(+), 239 deletions(-)
diff --git a/it/google-cloud-platform/build.gradle b/it/google-cloud-platform/build.gradle
index 9717b5f8c84..3353a9692cb 100644
--- a/it/google-cloud-platform/build.gradle
+++ b/it/google-cloud-platform/build.gradle
@@ -32,6 +32,7 @@ dependencies {
implementation project(path: ":runners:google-cloud-dataflow-java")
implementation project(path: ":it:conditions", configuration: "shadow")
implementation project(path: ":it:truthmatchers", configuration: "shadow")
+ implementation project(path: ":sdks:java:testing:test-utils")
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.jackson_core
@@ -48,6 +49,7 @@ dependencies {
implementation library.java.protobuf_java
implementation library.java.threetenbp
implementation 'org.awaitility:awaitility:4.2.0'
+ implementation 'joda-time:joda-time:2.10.10'
// Google Cloud Dependencies
implementation library.java.google_api_services_bigquery
implementation library.java.google_cloud_core
@@ -71,7 +73,6 @@ dependencies {
implementation 'com.google.cloud:google-cloud-secretmanager'
provided 'com.google.api.grpc:proto-google-cloud-secretmanager-v1'
- testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation project(path: ":sdks:java:io:google-cloud-platform")
testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:io:synthetic")
@@ -83,6 +84,8 @@ dependencies {
tasks.register("GCSPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'FileBasedIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigTablePerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("BigTableStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("BigTableStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOLT', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
index 6b728a6a60d..e5f20c07c01 100644
--- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
+++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
@@ -17,12 +17,21 @@
*/
package org.apache.beam.it.gcp;
+import com.google.cloud.Timestamp;
import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.gcp.dataflow.DefaultPipelineLauncher;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.junit.After;
import org.junit.Before;
@@ -101,4 +110,29 @@ public class IOLoadTestBase extends LoadTestBase {
public static String getBeamMetricsName(PipelineMetricsType metricstype, String metricsName) {
return BEAM_METRICS_NAMESPACE + ":" + metricstype + ":" + metricsName;
}
+
+ /** Exports test metrics to InfluxDB or BigQuery depending on the configuration. */
+ protected void exportMetrics(
+ PipelineLauncher.LaunchInfo launchInfo,
+ MetricsConfiguration metricsConfig,
+ boolean exportToInfluxDB,
+ InfluxDBSettings influxDBSettings)
+ throws IOException, ParseException, InterruptedException {
+
+ Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig);
+ String testId = UUID.randomUUID().toString();
+ String testTimestamp = Timestamp.now().toString();
+
+ if (exportToInfluxDB) {
+ Collection<NamedTestResult> namedTestResults = new ArrayList<>();
+ for (Map.Entry<String, Double> entry : metrics.entrySet()) {
+ NamedTestResult metricResult =
+ NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue());
+ namedTestResults.add(metricResult);
+ }
+ IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings);
+ } else {
+ exportMetricsToBigQuery(launchInfo, metrics);
+ }
+ }
}
diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java
new file mode 100644
index 00000000000..5c2fb74cd2f
--- /dev/null
+++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/** Base class for IO Stress tests. */
+public class IOStressTestBase extends IOLoadTestBase {
+ /**
+ * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
+ * eventually return to 1x.
+ */
+ protected static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
+
+ protected static final int DEFAULT_ROWS_PER_SECOND = 1000;
+
+ /**
+ * Generates and returns a list of LoadPeriod instances representing periods of load increase
+ * based on the specified load increase array and total duration in minutes.
+ *
+ * @param minutesTotal The total duration in minutes for which the load periods are generated.
+ * @return A list of LoadPeriod instances defining periods of load increase.
+ */
+ protected List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
+
+ List<LoadPeriod> loadPeriods = new ArrayList<>();
+ long periodDurationMillis =
+ Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
+ long startTimeMillis = 0;
+
+ for (int loadIncreaseMultiplier : loadIncreaseArray) {
+ long endTimeMillis = startTimeMillis + periodDurationMillis;
+ loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
+
+ startTimeMillis = endTimeMillis;
+ }
+ return loadPeriods;
+ }
+
+ /**
+ * Represents a period of time with associated load increase properties for stress testing
+ * scenarios.
+ */
+ protected static class LoadPeriod implements Serializable {
+ private final int loadIncreaseMultiplier;
+ private final long periodStartMillis;
+ private final long periodEndMillis;
+
+ public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
+ this.loadIncreaseMultiplier = loadIncreaseMultiplier;
+ this.periodStartMillis = periodStartMillis;
+ this.periodEndMillis = periodEndMin;
+ }
+
+ public int getLoadIncreaseMultiplier() {
+ return loadIncreaseMultiplier;
+ }
+
+ public long getPeriodStartMillis() {
+ return periodStartMillis;
+ }
+
+ public long getPeriodEndMillis() {
+ return periodEndMillis;
+ }
+ }
+
+ /**
+ * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
+ * load increase over time, multiplying the input elements based on the elapsed time since the
+ * start of processing. This class aims to simulate various load levels during stress testing.
+ */
+ protected static class MultiplierDoFn<T> extends DoFn<T, T> {
+ private final int startMultiplier;
+ private final long startTimesMillis;
+ private final List<LoadPeriod> loadPeriods;
+
+ public MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
+ this.startMultiplier = startMultiplier;
+ this.startTimesMillis = Instant.now().getMillis();
+ this.loadPeriods = loadPeriods;
+ }
+
+ @DoFn.ProcessElement
+ public void processElement(
+ @Element T element, OutputReceiver<T> outputReceiver, @DoFn.Timestamp Instant timestamp) {
+
+ int multiplier = this.startMultiplier;
+ long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
+
+ for (LoadPeriod loadPeriod : loadPeriods) {
+ if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
+ && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
+ multiplier *= loadPeriod.getLoadIncreaseMultiplier();
+ break;
+ }
+ }
+ for (int i = 0; i < multiplier; i++) {
+ outputReceiver.output(element);
+ }
+ }
+ }
+}
diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
index 6ffe1014c8a..d0eed457b19 100644
--- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
+++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
@@ -26,7 +26,6 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.Timestamp;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.time.Duration;
@@ -46,7 +45,7 @@ import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
-import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
@@ -59,7 +58,6 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
@@ -70,7 +68,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
-import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -87,18 +84,11 @@ import org.junit.Test;
* - To run large-scale stress tests: {@code gradle
* :it:google-cloud-platform:BigQueryStressTestLarge}
*/
-public final class BigQueryIOST extends IOLoadTestBase {
+public final class BigQueryIOST extends IOStressTestBase {
private static final String READ_ELEMENT_METRIC_NAME = "read_count";
private static final String TEST_ID = UUID.randomUUID().toString();
private static final String TEST_TIMESTAMP = Timestamp.now().toString();
- private static final int DEFAULT_ROWS_PER_SECOND = 1000;
-
- /**
- * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
- * eventually return to 1x.
- */
- private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
private static BigQueryResourceManager resourceManager;
private static String tableQualifier;
@@ -301,7 +291,7 @@ public final class BigQueryIOST extends IOLoadTestBase {
source
.apply(
"One input to multiple outputs",
- ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods)))
+ ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
.apply("Reshuffle fanout", Reshuffle.viaRandomKey())
.apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME)));
}
@@ -371,44 +361,6 @@ public final class BigQueryIOST extends IOLoadTestBase {
}
}
- /**
- * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
- * load increase over time, multiplying the input elements based on the elapsed time since the
- * start of processing. This class aims to simulate various load levels during stress testing.
- */
- private static class MultiplierDoFn extends DoFn<byte[], byte[]> {
- private final int startMultiplier;
- private final long startTimesMillis;
- private final List<LoadPeriod> loadPeriods;
-
- MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
- this.startMultiplier = startMultiplier;
- this.startTimesMillis = Instant.now().getMillis();
- this.loadPeriods = loadPeriods;
- }
-
- @ProcessElement
- public void processElement(
- @Element byte[] element,
- OutputReceiver<byte[]> outputReceiver,
- @DoFn.Timestamp Instant timestamp) {
-
- int multiplier = this.startMultiplier;
- long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
-
- for (LoadPeriod loadPeriod : loadPeriods) {
- if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
- && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
- multiplier *= loadPeriod.getLoadIncreaseMultiplier();
- break;
- }
- }
- for (int i = 0; i < multiplier; i++) {
- outputReceiver.output(element);
- }
- }
- }
-
abstract static class FormatFn<InputT, OutputT> implements SerializableFunction<InputT, OutputT> {
protected final int numColumns;
@@ -493,29 +445,6 @@ public final class BigQueryIOST extends IOLoadTestBase {
}
}
- /**
- * Generates and returns a list of LoadPeriod instances representing periods of load increase
- * based on the specified load increase array and total duration in minutes.
- *
- * @param minutesTotal The total duration in minutes for which the load periods are generated.
- * @return A list of LoadPeriod instances defining periods of load increase.
- */
- private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
-
- List<LoadPeriod> loadPeriods = new ArrayList<>();
- long periodDurationMillis =
- Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
- long startTimeMillis = 0;
-
- for (int loadIncreaseMultiplier : loadIncreaseArray) {
- long endTimeMillis = startTimeMillis + periodDurationMillis;
- loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
-
- startTimeMillis = endTimeMillis;
- }
- return loadPeriods;
- }
-
private enum WriteFormat {
AVRO,
JSON
@@ -573,32 +502,4 @@ public final class BigQueryIOST extends IOLoadTestBase {
/** InfluxDB database to publish metrics. * */
@JsonProperty public String influxDatabase;
}
-
- /**
- * Represents a period of time with associated load increase properties for stress testing
- * scenarios.
- */
- private static class LoadPeriod implements Serializable {
- private final int loadIncreaseMultiplier;
- private final long periodStartMillis;
- private final long periodEndMillis;
-
- public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
- this.loadIncreaseMultiplier = loadIncreaseMultiplier;
- this.periodStartMillis = periodStartMillis;
- this.periodEndMillis = periodEndMin;
- }
-
- public int getLoadIncreaseMultiplier() {
- return loadIncreaseMultiplier;
- }
-
- public long getPeriodStartMillis() {
- return periodStartMillis;
- }
-
- public long getPeriodEndMillis() {
- return periodEndMillis;
- }
- }
}
diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
new file mode 100644
index 00000000000..4821992381b
--- /dev/null
+++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp.bigtable;
+
+import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.bigtable.v2.Mutation;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.TestProperties;
+import org.apache.beam.it.common.utils.ResourceManagerUtils;
+import org.apache.beam.it.gcp.IOStressTestBase;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * BigTableIO stress test. The test is designed to assess the performance of BigTableIO under
+ * various conditions.
+ *
+ * <p>Usage: <br>
+ * - To run medium-scale stress tests: {@code gradle
+ * :it:google-cloud-platform:BigTableStressTestMedium} - To run large-scale stress tests: {@code
+ * gradle :it:google-cloud-platform:BigTableStressTestLarge}
+ */
+public final class BigTableIOST extends IOStressTestBase {
+
+ private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
+ private static final String READ_ELEMENT_METRIC_NAME = "read_count";
+ private static final String COLUMN_FAMILY_NAME = "cf";
+ private static final long TABLE_MAX_AGE_MINUTES = 800L;
+
+ private BigtableResourceManager resourceManager;
+ private InfluxDBSettings influxDBSettings;
+ private Configuration configuration;
+ private String testConfigName;
+ private String tableId;
+
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ @Before
+ public void setup() throws IOException {
+ resourceManager =
+ BigtableResourceManager.builder(testName, project, CREDENTIALS_PROVIDER).build();
+
+ // create table
+ tableId = generateTableId(testName);
+ resourceManager.createTable(
+ tableId,
+ ImmutableList.of(COLUMN_FAMILY_NAME),
+ org.threeten.bp.Duration.ofMinutes(TABLE_MAX_AGE_MINUTES));
+
+ // parse configuration
+ testConfigName =
+ TestProperties.getProperty("configuration", "medium", TestProperties.Type.PROPERTY);
+ configuration = TEST_CONFIGS_PRESET.get(testConfigName);
+ if (configuration == null) {
+ try {
+ configuration = Configuration.fromJsonString(testConfigName, Configuration.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown test configuration: [%s]. Pass to a valid configuration json, or use"
+ + " config presets: %s",
+ testConfigName, TEST_CONFIGS_PRESET.keySet()));
+ }
+ }
+
+ // tempLocation needs to be set for DataflowRunner
+ if (!Strings.isNullOrEmpty(tempBucketName)) {
+ String tempLocation = String.format("gs://%s/temp/", tempBucketName);
+ writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation);
+ writePipeline.getOptions().setTempLocation(tempLocation);
+ readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation);
+ readPipeline.getOptions().setTempLocation(tempLocation);
+ }
+ // Use streaming pipeline to write records
+ writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+ }
+
+ @After
+ public void teardown() {
+ ResourceManagerUtils.cleanResources(resourceManager);
+ }
+
+ private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
+
+ static {
+ try {
+ TEST_CONFIGS_PRESET =
+ ImmutableMap.of(
+ "medium",
+ Configuration.fromJsonString(
+ "{\"rowsPerSecond\":25000,\"minutes\":40,\"pipelineTimeout\":120,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}",
+ Configuration.class),
+ "large",
+ Configuration.fromJsonString(
+ "{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":200,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}",
+ Configuration.class));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Run stress test with configurations specified by TestProperties. */
+ @Test
+ public void runTest() throws IOException, ParseException, InterruptedException {
+ if (configuration.exportMetricsToInfluxDB) {
+ influxDBSettings =
+ InfluxDBSettings.builder()
+ .withHost(configuration.influxHost)
+ .withDatabase(configuration.influxDatabase)
+ .withMeasurement(configuration.influxMeasurement + "_" + testConfigName)
+ .get();
+ }
+
+ PipelineLauncher.LaunchInfo writeInfo = generateDataAndWrite();
+ PipelineOperator.Result writeResult =
+ pipelineOperator.waitUntilDone(
+ createConfig(writeInfo, Duration.ofMinutes(configuration.pipelineTimeout)));
+ assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, writeResult);
+
+ PipelineLauncher.LaunchInfo readInfo = readData();
+ PipelineOperator.Result readResult =
+ pipelineOperator.waitUntilDone(
+ createConfig(readInfo, Duration.ofMinutes(configuration.pipelineTimeout)));
+ assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, readResult);
+
+ try {
+ double writeNumRecords =
+ pipelineLauncher.getMetric(
+ project,
+ region,
+ writeInfo.jobId(),
+ getBeamMetricsName(PipelineMetricsType.COUNTER, WRITE_ELEMENT_METRIC_NAME));
+
+ double readNumRecords =
+ pipelineLauncher.getMetric(
+ project,
+ region,
+ readInfo.jobId(),
+ getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME));
+
+ assertEquals(writeNumRecords, readNumRecords, 0);
+ } finally {
+ // clean up write streaming pipeline
+ if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId())
+ == PipelineLauncher.JobState.RUNNING) {
+ pipelineLauncher.cancelJob(project, region, writeInfo.jobId());
+ }
+ }
+
+ // export metrics
+ MetricsConfiguration writeMetricsConfig =
+ MetricsConfiguration.builder()
+ .setOutputPCollection("Counting element.out0")
+ .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0")
+ .build();
+
+ MetricsConfiguration readMetricsConfig =
+ MetricsConfiguration.builder()
+ .setOutputPCollection("Counting element.out0")
+ .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0")
+ .build();
+
+ exportMetrics(
+ writeInfo, writeMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
+ exportMetrics(
+ readInfo, readMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
+ }
+
+ /**
+ * The method creates a pipeline to simulate data generation and write operations to BigTable,
+ * based on the specified configuration parameters. The stress test involves varying the load
+ * dynamically over time, with options to use configurable parameters.
+ */
+ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
+ // The PeriodicImpulse source will generate an element every this many millis:
+ int fireInterval = 1;
+ // Each element from PeriodicImpulse will fan out to this many elements:
+ int startMultiplier =
+ Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / DEFAULT_ROWS_PER_SECOND;
+ long stopAfterMillis =
+ org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis();
+ long totalRows = startMultiplier * stopAfterMillis / fireInterval;
+ List<LoadPeriod> loadPeriods =
+ getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
+
+ PCollection<org.joda.time.Instant> source =
+ writePipeline.apply(
+ PeriodicImpulse.create()
+ .stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1))
+ .withInterval(org.joda.time.Duration.millis(fireInterval)));
+ if (startMultiplier > 1) {
+ source =
+ source
+ .apply(
+ "One input to multiple outputs",
+ ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
+ .apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME)));
+ }
+ source
+ .apply(
+ "Map records to BigTable format",
+ ParDo.of(new MapToBigTableFormat((int) configuration.valueSizeBytes, (int) totalRows)))
+ .apply(
+ "Write to BigTable",
+ BigtableIO.write()
+ .withProjectId(project)
+ .withInstanceId(resourceManager.getInstanceId())
+ .withTableId(tableId));
+
+ PipelineLauncher.LaunchConfig options =
+ PipelineLauncher.LaunchConfig.builder("write-bigtable")
+ .setSdk(PipelineLauncher.Sdk.JAVA)
+ .setPipeline(writePipeline)
+ .addParameter("runner", configuration.runner)
+ .addParameter(
+ "autoscalingAlgorithm",
+ DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED
+ .toString())
+ .addParameter("numWorkers", String.valueOf(configuration.numWorkers))
+ .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers))
+ .addParameter("experiments", "use_runner_v2")
+ .build();
+
+ return pipelineLauncher.launch(project, region, options);
+ }
+
+ /** The method reads data from BigTable in batch mode. */
+ private PipelineLauncher.LaunchInfo readData() throws IOException {
+ BigtableIO.Read readIO =
+ BigtableIO.read()
+ .withoutValidation()
+ .withProjectId(project)
+ .withInstanceId(resourceManager.getInstanceId())
+ .withTableId(tableId);
+
+ readPipeline
+ .apply("Read from BigTable", readIO)
+ .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME)));
+
+ PipelineLauncher.LaunchConfig options =
+ PipelineLauncher.LaunchConfig.builder("read-bigtable")
+ .setSdk(PipelineLauncher.Sdk.JAVA)
+ .setPipeline(readPipeline)
+ .addParameter("runner", configuration.runner)
+ .addParameter(
+ "autoscalingAlgorithm",
+ DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED
+ .toString())
+ .addParameter("numWorkers", String.valueOf(configuration.numWorkers))
+ .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers))
+ .build();
+
+ return pipelineLauncher.launch(project, region, options);
+ }
+
+ /** Options for BigTableIO stress test. */
+ static class Configuration extends SyntheticSourceOptions {
+ /** Pipeline timeout in minutes. Must be a positive value. */
+ @JsonProperty public int pipelineTimeout = 20;
+
+ /** Runner specified to run the pipeline. */
+ @JsonProperty public String runner = "DirectRunner";
+
+ /** Number of workers for the pipeline. */
+ @JsonProperty public int numWorkers = 20;
+
+ /** Maximum number of workers for the pipeline. */
+ @JsonProperty public int maxNumWorkers = 100;
+
+ /**
+ * Rate of generated elements sent to the source table. Will run with a minimum of 1k rows per
+ * second.
+ */
+ @JsonProperty public int rowsPerSecond = DEFAULT_ROWS_PER_SECOND;
+
+ /** Rows will be generated for this many minutes. */
+ @JsonProperty public int minutes = 15;
+
+ /**
+ * Determines the destination for exporting metrics. If set to true, metrics will be exported to
+ * InfluxDB and displayed using Grafana. If set to false, metrics will be exported to BigQuery
+ * and displayed with Looker Studio.
+ */
+ @JsonProperty public boolean exportMetricsToInfluxDB = false;
+
+ /** InfluxDB measurement to publish results to. * */
+ @JsonProperty public String influxMeasurement = BigTableIOST.class.getName();
+
+ /** InfluxDB host to publish metrics. * */
+ @JsonProperty public String influxHost;
+
+ /** InfluxDB database to publish metrics. * */
+ @JsonProperty public String influxDatabase;
+ }
+
+ /** Maps Instant to the BigTable format record. */
+ private static class MapToBigTableFormat
+ extends DoFn<org.joda.time.Instant, KV<ByteString, Iterable<Mutation>>>
+ implements Serializable {
+
+ private final int valueSizeBytes;
+ private final int totalRows;
+
+ public MapToBigTableFormat(int valueSizeBytes, int totalRows) {
+ this.valueSizeBytes = valueSizeBytes;
+ this.totalRows = totalRows;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ long index = Objects.requireNonNull(c.element()).getMillis() % totalRows;
+
+ ByteString key =
+ ByteString.copyFromUtf8(
+ String.format(
+ "key%s",
+ index
+ + "-"
+ + UUID.randomUUID()
+ + "-"
+ + UUID.randomUUID()
+ + "-"
+ + org.joda.time.Instant.now().getMillis()));
+ Random random = new Random(index);
+ byte[] valBytes = new byte[this.valueSizeBytes];
+ random.nextBytes(valBytes);
+ ByteString value = ByteString.copyFrom(valBytes);
+
+ Iterable<Mutation> mutations =
+ ImmutableList.of(
+ Mutation.newBuilder()
+ .setSetCell(
+ Mutation.SetCell.newBuilder()
+ .setValue(value)
+ .setTimestampMicros(java.time.Instant.now().toEpochMilli() * 1000L)
+ .setFamilyName(COLUMN_FAMILY_NAME))
+ .build());
+ c.output(KV.of(key, mutations));
+ }
+ }
+}
diff --git a/it/kafka/build.gradle b/it/kafka/build.gradle
index b1b8147e72a..96f915a1d84 100644
--- a/it/kafka/build.gradle
+++ b/it/kafka/build.gradle
@@ -46,5 +46,5 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
}
-tasks.register("KafkaStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'medium','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
-tasks.register("KafkaStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'large','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("KafkaStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'medium','bootstrapServers':System.getProperty("bootstrapServers"),'useDataflowRunnerV2':System.getProperty("useDataflowRunnerV2"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
+tasks.register("KafkaStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'large','bootstrapServers':System.getProperty("bootstrapServers"),'useDataflowRunnerV2':System.getProperty("useDataflowRunnerV2"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
index 2a830400a0a..4ca34328637 100644
--- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
@@ -21,15 +21,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.cloud.Timestamp;
import java.io.IOException;
-import java.io.Serializable;
import java.text.ParseException;
import java.time.Duration;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -37,17 +33,14 @@ import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
-import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.testutils.NamedTestResult;
-import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
@@ -61,7 +54,6 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -81,17 +73,10 @@ import org.junit.Test;
* - To run large-scale stress tests: {@code gradle :it:kafka:KafkaStressTestLarge
* -DbootstrapServers="0.0.0.0:32400,1.1.1.1:32400"}
*/
-public final class KafkaIOST extends IOLoadTestBase {
- /**
- * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
- * eventually return to 1x.
- */
- private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
-
- private static InfluxDBSettings influxDBSettings;
+public final class KafkaIOST extends IOStressTestBase {
private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
private static final String READ_ELEMENT_METRIC_NAME = "read_count";
- private static final int DEFAULT_ROWS_PER_SECOND = 1000;
+ private static InfluxDBSettings influxDBSettings;
private Configuration configuration;
private AdminClient adminClient;
private String testConfigName;
@@ -120,6 +105,11 @@ public final class KafkaIOST extends IOLoadTestBase {
}
configuration.bootstrapServers =
TestProperties.getProperty("bootstrapServers", null, TestProperties.Type.PROPERTY);
+ String useDataflowRunnerV2FromProps =
+ TestProperties.getProperty("useDataflowRunnerV2", "true", TestProperties.Type.PROPERTY);
+ if (!useDataflowRunnerV2FromProps.isEmpty()) {
+ configuration.useDataflowRunnerV2 = Boolean.parseBoolean(useDataflowRunnerV2FromProps);
+ }
adminClient =
AdminClient.create(ImmutableMap.of("bootstrap.servers", configuration.bootstrapServers));
@@ -231,8 +221,10 @@ public final class KafkaIOST extends IOLoadTestBase {
.setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0")
.build();
- exportMetrics(writeInfo, writeMetricsConfig);
- exportMetrics(readInfo, readMetricsConfig);
+ exportMetrics(
+ writeInfo, writeMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
+ exportMetrics(
+ readInfo, readMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings);
}
/**
@@ -267,7 +259,7 @@ public final class KafkaIOST extends IOLoadTestBase {
source
.apply(
"One input to multiple outputs",
- ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods)))
+ ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
.apply("Reshuffle fanout", Reshuffle.viaRandomKey())
.apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME)));
}
@@ -295,7 +287,7 @@ public final class KafkaIOST extends IOLoadTestBase {
.toString())
.addParameter("numWorkers", String.valueOf(configuration.numWorkers))
.addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers))
- .addParameter("experiments", "use_runner_v2")
+ .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "")
.build();
return pipelineLauncher.launch(project, region, options);
@@ -310,7 +302,7 @@ public final class KafkaIOST extends IOLoadTestBase {
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"));
readPipeline
- .apply("Read from unbounded Kafka", readFromKafka)
+ .apply("Read from Kafka", readFromKafka)
.apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME)));
PipelineLauncher.LaunchConfig options =
@@ -319,94 +311,12 @@ public final class KafkaIOST extends IOLoadTestBase {
.setPipeline(readPipeline)
.addParameter("numWorkers", String.valueOf(configuration.numWorkers))
.addParameter("runner", configuration.runner)
- .addParameter("experiments", "use_runner_v2")
+ .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "")
.build();
return pipelineLauncher.launch(project, region, options);
}
- private void exportMetrics(
- PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration metricsConfig)
- throws IOException, ParseException, InterruptedException {
-
- Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig);
- String testId = UUID.randomUUID().toString();
- String testTimestamp = Timestamp.now().toString();
-
- if (configuration.exportMetricsToInfluxDB) {
- Collection<NamedTestResult> namedTestResults = new ArrayList<>();
- for (Map.Entry<String, Double> entry : metrics.entrySet()) {
- NamedTestResult metricResult =
- NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue());
- namedTestResults.add(metricResult);
- }
- IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings);
- } else {
- exportMetricsToBigQuery(launchInfo, metrics);
- }
- }
-
- /**
- * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
- * load increase over time, multiplying the input elements based on the elapsed time since the
- * start of processing. This class aims to simulate various load levels during stress testing.
- */
- private static class MultiplierDoFn extends DoFn<byte[], byte[]> {
- private final int startMultiplier;
- private final long startTimesMillis;
- private final List<LoadPeriod> loadPeriods;
-
- MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
- this.startMultiplier = startMultiplier;
- this.startTimesMillis = Instant.now().getMillis();
- this.loadPeriods = loadPeriods;
- }
-
- @DoFn.ProcessElement
- public void processElement(
- @Element byte[] element,
- OutputReceiver<byte[]> outputReceiver,
- @DoFn.Timestamp Instant timestamp) {
-
- int multiplier = this.startMultiplier;
- long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
-
- for (LoadPeriod loadPeriod : loadPeriods) {
- if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
- && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
- multiplier *= loadPeriod.getLoadIncreaseMultiplier();
- break;
- }
- }
- for (int i = 0; i < multiplier; i++) {
- outputReceiver.output(element);
- }
- }
- }
-
- /**
- * Generates and returns a list of LoadPeriod instances representing periods of load increase
- * based on the specified load increase array and total duration in minutes.
- *
- * @param minutesTotal The total duration in minutes for which the load periods are generated.
- * @return A list of LoadPeriod instances defining periods of load increase.
- */
- private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
-
- List<LoadPeriod> loadPeriods = new ArrayList<>();
- long periodDurationMillis =
- Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
- long startTimeMillis = 0;
-
- for (int loadIncreaseMultiplier : loadIncreaseArray) {
- long endTimeMillis = startTimeMillis + periodDurationMillis;
- loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
-
- startTimeMillis = endTimeMillis;
- }
- return loadPeriods;
- }
-
/** Options for Kafka IO stress test. */
static class Configuration extends SyntheticSourceOptions {
/** Pipeline timeout in minutes. Must be a positive value. */
@@ -415,6 +325,12 @@ public final class KafkaIOST extends IOLoadTestBase {
/** Runner specified to run the pipeline. */
@JsonProperty public String runner = "DirectRunner";
+ /**
+ * Determines whether to use Dataflow runner v2. If set to true, it uses SDF mode for reading
+ * from Kafka. Otherwise, Unbounded mode will be used.
+ */
+ @JsonProperty public boolean useDataflowRunnerV2 = true;
+
/** Number of workers for the pipeline. */
@JsonProperty public int numWorkers = 20;
@@ -449,32 +365,4 @@ public final class KafkaIOST extends IOLoadTestBase {
/** InfluxDB database to publish metrics. * */
@JsonProperty public String influxDatabase;
}
-
- /**
- * Represents a period of time with associated load increase properties for stress testing
- * scenarios.
- */
- private static class LoadPeriod implements Serializable {
- private final int loadIncreaseMultiplier;
- private final long periodStartMillis;
- private final long periodEndMillis;
-
- public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
- this.loadIncreaseMultiplier = loadIncreaseMultiplier;
- this.periodStartMillis = periodStartMillis;
- this.periodEndMillis = periodEndMin;
- }
-
- public int getLoadIncreaseMultiplier() {
- return loadIncreaseMultiplier;
- }
-
- public long getPeriodStartMillis() {
- return periodStartMillis;
- }
-
- public long getPeriodEndMillis() {
- return periodEndMillis;
- }
- }
}
diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
index e0c53436d4e..824aa901078 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
@@ -86,6 +86,7 @@
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*Base\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*Client\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*LT\.java" />
+ <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*ST\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*ResourceManagerTest\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*testinfra.*mockapis.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*requestresponse.*" />