You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/05 17:02:29 UTC
[beam] branch master updated: [BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0af670d4616 [BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554)
0af670d4616 is described below
commit 0af670d46168c5cb182fed1f7e56d3fde1841cd1
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu May 5 10:02:22 2022 -0700
[BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554)
---
.test-infra/jenkins/README.md | 2 +-
.../jenkins/job_LoadTests_Combine_Go.groovy | 8 ++++++++
.test-infra/jenkins/job_LoadTests_GBK_Go.groovy | 16 ++++++++++++++++
.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy | 10 ++++++++++
.../jenkins/job_LoadTests_SideInput_Go.groovy | 6 ++++++
.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy | 10 ++++++++++
CHANGES.md | 7 +++----
sdks/go/pkg/beam/io/synthetic/source.go | 22 +++++++++++-----------
sdks/go/test/load/sideinput/sideinput.go | 9 ++++-----
9 files changed, 69 insertions(+), 21 deletions(-)
diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 131d2dde0f5..02a616950f9 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -166,7 +166,7 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/)
| beam_LoadTests_Go_GBK_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch_PR/) | `Run Load Tests Go GBK Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/) |
| beam_LoadTests_Go_ParDo_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch_PR/) | `Run Load Tests Go ParDo Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/) |
| beam_LoadTests_Go_ParDo_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch_PR/) | `Run Load Tests Go ParDo Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/) |
-| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch suite` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) |
+| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) |
| beam_LoadTests_Go_SideInput_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch_PR/) | `Run Load Tests Go SideInput Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/) |
| beam_Java_LoadTests_Smoke | [phrase](https://ci-beam.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | `Run Java Load Tests Smoke` | |
| beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Java CoGBK Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) |
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy
index d1204a50f48..7e4ca7b284e 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
@@ -47,6 +49,8 @@ def batchScenarios = {
top_count : 20,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -69,6 +73,8 @@ def batchScenarios = {
top_count : 20,
num_workers : 16,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -91,6 +97,8 @@ def batchScenarios = {
top_count : 20,
num_workers : 16,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy
index 8e3f26c5cda..559baf6c899 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
def batchScenarios = {
@@ -46,6 +48,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -68,6 +72,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -90,6 +96,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -112,6 +120,8 @@ def batchScenarios = {
fanout : 4,
num_workers : 16,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -134,6 +144,8 @@ def batchScenarios = {
fanout : 8,
num_workers : 16,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -158,6 +170,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -182,6 +196,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
]
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy
index a03b9bb0b08..f1f6b40be74 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -48,6 +50,8 @@ def batchScenarios = {
number_of_counters : 0,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -71,6 +75,8 @@ def batchScenarios = {
number_of_counters : 0,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -94,6 +100,8 @@ def batchScenarios = {
number_of_counters : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -117,6 +125,8 @@ def batchScenarios = {
number_of_counters : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
index 16e8618b430..225bbc79998 100644
--- a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
def batchScenarios = {
@@ -45,6 +47,8 @@ def batchScenarios = {
access_percentage: 1,
num_workers : 10,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -65,6 +69,8 @@ def batchScenarios = {
'"value_size": 900}\'',
num_workers : 10,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
]
]
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy
index 09072b6197d..6218fa416d8 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
def batchScenarios = {
@@ -53,6 +55,8 @@ def batchScenarios = {
iterations : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -82,6 +86,8 @@ def batchScenarios = {
iterations : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -111,6 +117,8 @@ def batchScenarios = {
iterations : 4,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
@@ -140,6 +148,8 @@ def batchScenarios = {
iterations : 4,
num_workers : 5,
autoscaling_algorithm: 'NONE',
+ environment_type : 'DOCKER',
+ environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
]
diff --git a/CHANGES.md b/CHANGES.md
index 5f7aea3f86b..605ca0e87ab 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,12 +44,11 @@
## Bugfixes
* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-* Fixed Java expansion service to allow specific files to stage ([BEAM-14160](https://issues.apache.org/jira/browse/BEAM-14160)).
-
## Known Issues
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
+
# [2.40.0] - Unreleased
## Highlights
@@ -68,6 +67,7 @@
## Breaking Changes
* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
## Deprecations
@@ -82,8 +82,7 @@
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-
-# [2.39.0] - Unreleased
+# [2.39.0] - Unreleased, Cut
## Highlights
diff --git a/sdks/go/pkg/beam/io/synthetic/source.go b/sdks/go/pkg/beam/io/synthetic/source.go
index 75304091d27..822c416f696 100644
--- a/sdks/go/pkg/beam/io/synthetic/source.go
+++ b/sdks/go/pkg/beam/io/synthetic/source.go
@@ -191,7 +191,7 @@ func DefaultSourceConfig() *SourceConfigBuilder {
// Valid values are in the range of [1, ...] and the default value is 1. Values
// of 0 (and below) are invalid as they result in sources that emit no elements.
func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
- b.cfg.NumElements = val
+ b.cfg.NumElements = int64(val)
return b
}
@@ -210,7 +210,7 @@ func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
// of 0 (and below) are invalid as they would result in dropping elements that
// are expected to be emitted.
func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
- b.cfg.InitialSplits = val
+ b.cfg.InitialSplits = int64(val)
return b
}
@@ -219,7 +219,7 @@ func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
//
// Valid values are in the range of [1, ...] and the default value is 8.
func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
- b.cfg.KeySize = val
+ b.cfg.KeySize = int64(val)
return b
}
@@ -228,7 +228,7 @@ func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
//
// Valid values are in the range of [1, ...] and the default value is 8.
func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
- b.cfg.ValueSize = val
+ b.cfg.ValueSize = int64(val)
return b
}
@@ -237,7 +237,7 @@ func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
//
// Valid values are in the range of [0, ...] and the default value is 0.
func (b *SourceConfigBuilder) NumHotKeys(val int) *SourceConfigBuilder {
- b.cfg.NumHotKeys = val
+ b.cfg.NumHotKeys = int64(val)
return b
}
@@ -299,10 +299,10 @@ func (b *SourceConfigBuilder) BuildFromJSON(jsonData []byte) SourceConfig {
// synthetic source. It should be created via a SourceConfigBuilder, not by
// directly initializing it (the fields are public to allow encoding).
type SourceConfig struct {
- NumElements int `json:"num_records"`
- InitialSplits int `json:"initial_splits"`
- KeySize int `json:"key_size"`
- ValueSize int `json:"value_size"`
- NumHotKeys int `json:"num_hot_keys"`
- HotKeyFraction float64 `json:"hot_key_fraction"`
+ NumElements int64 `json:"num_records" beam:"num_records"`
+ InitialSplits int64 `json:"initial_splits" beam:"initial_splits"`
+ KeySize int64 `json:"key_size" beam:"key_size"`
+ ValueSize int64 `json:"value_size" beam:"value_size"`
+ NumHotKeys int64 `json:"num_hot_keys" beam:"num_hot_keys"`
+ HotKeyFraction float64 `json:"hot_key_fraction" beam:"hot_key_fraction"`
}
diff --git a/sdks/go/test/load/sideinput/sideinput.go b/sdks/go/test/load/sideinput/sideinput.go
index fe4f4527075..6f7cb6f2d41 100644
--- a/sdks/go/test/load/sideinput/sideinput.go
+++ b/sdks/go/test/load/sideinput/sideinput.go
@@ -52,13 +52,12 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}
type doFn struct {
- ElementsToAccess int
+ ElementsToAccess int64
}
func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
- var key []byte
- var value []byte
- i := 0
+ var key, value []byte
+ var i int64
for values(&key, &value) {
if i >= fn.ElementsToAccess {
break
@@ -75,7 +74,7 @@ func main() {
p, s := beam.NewPipelineWithRoot()
syntheticConfig := parseSyntheticConfig()
- elementsToAccess := syntheticConfig.NumElements * *accessPercentage / 100
+ elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100)
src := synthetic.SourceSingle(s, syntheticConfig)
src = beam.ParDo(s, &load.RuntimeMonitor{}, src)