You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "akashorabek (via GitHub)" <gi...@apache.org> on 2024/03/14 00:24:25 UTC

[PR] Add BigTableIO Stress test [beam]

akashorabek opened a new pull request, #30630:
URL: https://github.com/apache/beam/pull/30630

   This pull request introduces stress tests for BigTableIO, designed to assess the performance under various conditions. The stress tests simulate dynamic load increases and evaluate the behavior of BigTableIO.
   
   Changes:
   
   - Added stress tests for BigTableIO.
   - Implemented dynamic load increases over time to simulate varying workloads.
   - Added support for exporting metrics to InfluxDB or BigQuery based on the configuration parameter.
   - Added ability for user to configure Dataflow version in KafkaIO Stress test
   - Refactored the code of BigQueryIO and KafkaIO stress tests.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2008707627

   Run Kotlin_Examples PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2010893498

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2008707523

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2009164781

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2008682755

   Run Kotlin_Examples PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2009407880

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2008998653

   Run Kotlin_Examples PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2009051676

   Run Kotlin_Examples PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-1998041877

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @Abacn for label java.
   R: @damccorm for label build.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2009165137

   Run Kotlin_Examples PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #30630:
URL: https://github.com/apache/beam/pull/30630


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2008651373

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2009061574

   Run Kotlin_Examples PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #30630:
URL: https://github.com/apache/beam/pull/30630#discussion_r1530769586


##########
it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp;
+
+import com.google.cloud.Timestamp;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/** Base class for IO Stress tests. */
+public class IOStressTestBase extends IOLoadTestBase {
+  /**
+   * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
+   * eventually return to 1x.
+   */
+  protected static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
+
+  protected static final int DEFAULT_ROWS_PER_SECOND = 1000;
+
+  /**
+   * Generates and returns a list of LoadPeriod instances representing periods of load increase
+   * based on the specified load increase array and total duration in minutes.
+   *
+   * @param minutesTotal The total duration in minutes for which the load periods are generated.
+   * @return A list of LoadPeriod instances defining periods of load increase.
+   */
+  protected List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
+
+    List<LoadPeriod> loadPeriods = new ArrayList<>();
+    long periodDurationMillis =
+        Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
+    long startTimeMillis = 0;
+
+    for (int loadIncreaseMultiplier : loadIncreaseArray) {
+      long endTimeMillis = startTimeMillis + periodDurationMillis;
+      loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
+
+      startTimeMillis = endTimeMillis;
+    }
+    return loadPeriods;
+  }
+
+  /**
+   * Represents a period of time with associated load increase properties for stress testing
+   * scenarios.
+   */
+  protected static class LoadPeriod implements Serializable {
+    private final int loadIncreaseMultiplier;
+    private final long periodStartMillis;
+    private final long periodEndMillis;
+
+    public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
+      this.loadIncreaseMultiplier = loadIncreaseMultiplier;
+      this.periodStartMillis = periodStartMillis;
+      this.periodEndMillis = periodEndMin;
+    }
+
+    public int getLoadIncreaseMultiplier() {
+      return loadIncreaseMultiplier;
+    }
+
+    public long getPeriodStartMillis() {
+      return periodStartMillis;
+    }
+
+    public long getPeriodEndMillis() {
+      return periodEndMillis;
+    }
+  }
+
+  /**
+   * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
+   * load increase over time, multiplying the input elements based on the elapsed time since the
+   * start of processing. This class aims to simulate various load levels during stress testing.
+   */
+  protected static class MultiplierDoFn<T> extends DoFn<T, T> {
+    private final int startMultiplier;
+    private final long startTimesMillis;
+    private final List<LoadPeriod> loadPeriods;
+
+    public MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
+      this.startMultiplier = startMultiplier;
+      this.startTimesMillis = Instant.now().getMillis();
+      this.loadPeriods = loadPeriods;
+    }
+
+    @DoFn.ProcessElement
+    public void processElement(
+        @Element T element, OutputReceiver<T> outputReceiver, @DoFn.Timestamp Instant timestamp) {
+
+      int multiplier = this.startMultiplier;
+      long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
+
+      for (LoadPeriod loadPeriod : loadPeriods) {
+        if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
+            && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
+          multiplier *= loadPeriod.getLoadIncreaseMultiplier();
+          break;
+        }
+      }
+      for (int i = 0; i < multiplier; i++) {
+        outputReceiver.output(element);
+      }
+    }
+  }
+
+  /** Exports test metrics to InfluxDB or BigQuery depending on the configuration. */
+  protected void exportMetrics(

Review Comment:
   I wonder this is useful for both load test and stress tests, and actually we should have been using it for streaming load tests. Is it possible to move this to IOLoadTestBase ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on code in PR #30630:
URL: https://github.com/apache/beam/pull/30630#discussion_r1531474427


##########
it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp;
+
+import com.google.cloud.Timestamp;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/** Base class for IO Stress tests. */
+public class IOStressTestBase extends IOLoadTestBase {
+  /**
+   * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
+   * eventually return to 1x.
+   */
+  protected static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
+
+  protected static final int DEFAULT_ROWS_PER_SECOND = 1000;
+
+  /**
+   * Generates and returns a list of LoadPeriod instances representing periods of load increase
+   * based on the specified load increase array and total duration in minutes.
+   *
+   * @param minutesTotal The total duration in minutes for which the load periods are generated.
+   * @return A list of LoadPeriod instances defining periods of load increase.
+   */
+  protected List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {
+
+    List<LoadPeriod> loadPeriods = new ArrayList<>();
+    long periodDurationMillis =
+        Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
+    long startTimeMillis = 0;
+
+    for (int loadIncreaseMultiplier : loadIncreaseArray) {
+      long endTimeMillis = startTimeMillis + periodDurationMillis;
+      loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));
+
+      startTimeMillis = endTimeMillis;
+    }
+    return loadPeriods;
+  }
+
+  /**
+   * Represents a period of time with associated load increase properties for stress testing
+   * scenarios.
+   */
+  protected static class LoadPeriod implements Serializable {
+    private final int loadIncreaseMultiplier;
+    private final long periodStartMillis;
+    private final long periodEndMillis;
+
+    public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
+      this.loadIncreaseMultiplier = loadIncreaseMultiplier;
+      this.periodStartMillis = periodStartMillis;
+      this.periodEndMillis = periodEndMin;
+    }
+
+    public int getLoadIncreaseMultiplier() {
+      return loadIncreaseMultiplier;
+    }
+
+    public long getPeriodStartMillis() {
+      return periodStartMillis;
+    }
+
+    public long getPeriodEndMillis() {
+      return periodEndMillis;
+    }
+  }
+
+  /**
+   * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
+   * load increase over time, multiplying the input elements based on the elapsed time since the
+   * start of processing. This class aims to simulate various load levels during stress testing.
+   */
+  protected static class MultiplierDoFn<T> extends DoFn<T, T> {
+    private final int startMultiplier;
+    private final long startTimesMillis;
+    private final List<LoadPeriod> loadPeriods;
+
+    public MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
+      this.startMultiplier = startMultiplier;
+      this.startTimesMillis = Instant.now().getMillis();
+      this.loadPeriods = loadPeriods;
+    }
+
+    @DoFn.ProcessElement
+    public void processElement(
+        @Element T element, OutputReceiver<T> outputReceiver, @DoFn.Timestamp Instant timestamp) {
+
+      int multiplier = this.startMultiplier;
+      long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
+
+      for (LoadPeriod loadPeriod : loadPeriods) {
+        if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
+            && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
+          multiplier *= loadPeriod.getLoadIncreaseMultiplier();
+          break;
+        }
+      }
+      for (int i = 0; i < multiplier; i++) {
+        outputReceiver.output(element);
+      }
+    }
+  }
+
+  /** Exports test metrics to InfluxDB or BigQuery depending on the configuration. */
+  protected void exportMetrics(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-2008998407

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add BigTableIO Stress test [beam]

Posted by "akashorabek (via GitHub)" <gi...@apache.org>.
akashorabek commented on PR #30630:
URL: https://github.com/apache/beam/pull/30630#issuecomment-1997784152

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org