You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/05 17:02:29 UTC

[beam] branch master updated: [BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0af670d4616 [BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554)
0af670d4616 is described below

commit 0af670d46168c5cb182fed1f7e56d3fde1841cd1
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu May 5 10:02:22 2022 -0700

    [BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554)
---
 .test-infra/jenkins/README.md                      |  2 +-
 .../jenkins/job_LoadTests_Combine_Go.groovy        |  8 ++++++++
 .test-infra/jenkins/job_LoadTests_GBK_Go.groovy    | 16 ++++++++++++++++
 .test-infra/jenkins/job_LoadTests_ParDo_Go.groovy  | 10 ++++++++++
 .../jenkins/job_LoadTests_SideInput_Go.groovy      |  6 ++++++
 .test-infra/jenkins/job_LoadTests_coGBK_Go.groovy  | 10 ++++++++++
 CHANGES.md                                         |  7 +++----
 sdks/go/pkg/beam/io/synthetic/source.go            | 22 +++++++++++-----------
 sdks/go/test/load/sideinput/sideinput.go           |  9 ++++-----
 9 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 131d2dde0f5..02a616950f9 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -166,7 +166,7 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/)
 | beam_LoadTests_Go_GBK_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch_PR/) | `Run Load Tests Go GBK Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/) |
 | beam_LoadTests_Go_ParDo_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch_PR/) | `Run Load Tests Go ParDo Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/) |
 | beam_LoadTests_Go_ParDo_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch_PR/) | `Run Load Tests Go ParDo Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/) |
-| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch suite` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) |
+| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) |
 | beam_LoadTests_Go_SideInput_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch_PR/) | `Run Load Tests Go SideInput Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/) |
 | beam_Java_LoadTests_Smoke | [phrase](https://ci-beam.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | `Run Java Load Tests Smoke` |  |
 | beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Java CoGBK Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) |
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy
index d1204a50f48..7e4ca7b284e 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
 import PhraseTriggeringPostCommitBuilder
 import InfluxDBCredentialsHelper
 
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
 
 String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
 
@@ -47,6 +49,8 @@ def batchScenarios = {
         top_count            : 20,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -69,6 +73,8 @@ def batchScenarios = {
         top_count            : 20,
         num_workers          : 16,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -91,6 +97,8 @@ def batchScenarios = {
         top_count            : 20,
         num_workers          : 16,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
   ].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy
index 8e3f26c5cda..559baf6c899 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
 import PhraseTriggeringPostCommitBuilder
 import InfluxDBCredentialsHelper
 
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
 String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
 
 def batchScenarios = {
@@ -46,6 +48,8 @@ def batchScenarios = {
         fanout               : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -68,6 +72,8 @@ def batchScenarios = {
         fanout               : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -90,6 +96,8 @@ def batchScenarios = {
         fanout               : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -112,6 +120,8 @@ def batchScenarios = {
         fanout               : 4,
         num_workers          : 16,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -134,6 +144,8 @@ def batchScenarios = {
         fanout               : 8,
         num_workers          : 16,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -158,6 +170,8 @@ def batchScenarios = {
         fanout               : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -182,6 +196,8 @@ def batchScenarios = {
         fanout               : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
   ]
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy
index a03b9bb0b08..f1f6b40be74 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
 import PhraseTriggeringPostCommitBuilder
 import InfluxDBCredentialsHelper
 
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
 
 
@@ -48,6 +50,8 @@ def batchScenarios = {
         number_of_counters   : 0,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -71,6 +75,8 @@ def batchScenarios = {
         number_of_counters   : 0,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -94,6 +100,8 @@ def batchScenarios = {
         number_of_counters   : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -117,6 +125,8 @@ def batchScenarios = {
         number_of_counters   : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
   ].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
index 16e8618b430..225bbc79998 100644
--- a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
 import PhraseTriggeringPostCommitBuilder
 import InfluxDBCredentialsHelper
 
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
 String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
 
 def batchScenarios = {
@@ -45,6 +47,8 @@ def batchScenarios = {
         access_percentage: 1,
         num_workers          : 10,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -65,6 +69,8 @@ def batchScenarios = {
         '"value_size": 900}\'',
         num_workers          : 10,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ]
   ]
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy
index 09072b6197d..6218fa416d8 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy
@@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
 import PhraseTriggeringPostCommitBuilder
 import InfluxDBCredentialsHelper
 
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
 
 def batchScenarios = {
@@ -53,6 +55,8 @@ def batchScenarios = {
         iterations           : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -82,6 +86,8 @@ def batchScenarios = {
         iterations           : 1,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -111,6 +117,8 @@ def batchScenarios = {
         iterations           : 4,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
     [
@@ -140,6 +148,8 @@ def batchScenarios = {
         iterations           : 4,
         num_workers          : 5,
         autoscaling_algorithm: 'NONE',
+        environment_type     : 'DOCKER',
+        environment_config   : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
       ]
     ],
   ]
diff --git a/CHANGES.md b/CHANGES.md
index 5f7aea3f86b..605ca0e87ab 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,12 +44,11 @@
 ## Bugfixes
 
 * Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-* Fixed Java expansion service to allow specific files to stage ([BEAM-14160](https://issues.apache.org/jira/browse/BEAM-14160)).
-
 ## Known Issues
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 -->
+
 # [2.40.0] - Unreleased
 
 ## Highlights
@@ -68,6 +67,7 @@
 ## Breaking Changes
 
 * X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
 
 ## Deprecations
 
@@ -82,8 +82,7 @@
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
-
-# [2.39.0] - Unreleased
+# [2.39.0] - Unreleased, Cut
 
 ## Highlights
 
diff --git a/sdks/go/pkg/beam/io/synthetic/source.go b/sdks/go/pkg/beam/io/synthetic/source.go
index 75304091d27..822c416f696 100644
--- a/sdks/go/pkg/beam/io/synthetic/source.go
+++ b/sdks/go/pkg/beam/io/synthetic/source.go
@@ -191,7 +191,7 @@ func DefaultSourceConfig() *SourceConfigBuilder {
 // Valid values are in the range of [1, ...] and the default value is 1. Values
 // of 0 (and below) are invalid as they result in sources that emit no elements.
 func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
-	b.cfg.NumElements = val
+	b.cfg.NumElements = int64(val)
 	return b
 }
 
@@ -210,7 +210,7 @@ func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
 // of 0 (and below) are invalid as they would result in dropping elements that
 // are expected to be emitted.
 func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
-	b.cfg.InitialSplits = val
+	b.cfg.InitialSplits = int64(val)
 	return b
 }
 
@@ -219,7 +219,7 @@ func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
 //
 // Valid values are in the range of [1, ...] and the default value is 8.
 func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
-	b.cfg.KeySize = val
+	b.cfg.KeySize = int64(val)
 	return b
 }
 
@@ -228,7 +228,7 @@ func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
 //
 // Valid values are in the range of [1, ...] and the default value is 8.
 func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
-	b.cfg.ValueSize = val
+	b.cfg.ValueSize = int64(val)
 	return b
 }
 
@@ -237,7 +237,7 @@ func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
 //
 // Valid values are in the range of [0, ...] and the default value is 0.
 func (b *SourceConfigBuilder) NumHotKeys(val int) *SourceConfigBuilder {
-	b.cfg.NumHotKeys = val
+	b.cfg.NumHotKeys = int64(val)
 	return b
 }
 
@@ -299,10 +299,10 @@ func (b *SourceConfigBuilder) BuildFromJSON(jsonData []byte) SourceConfig {
 // synthetic source. It should be created via a SourceConfigBuilder, not by
 // directly initializing it (the fields are public to allow encoding).
 type SourceConfig struct {
-	NumElements    int     `json:"num_records"`
-	InitialSplits  int     `json:"initial_splits"`
-	KeySize        int     `json:"key_size"`
-	ValueSize      int     `json:"value_size"`
-	NumHotKeys     int     `json:"num_hot_keys"`
-	HotKeyFraction float64 `json:"hot_key_fraction"`
+	NumElements    int64   `json:"num_records" beam:"num_records"`
+	InitialSplits  int64   `json:"initial_splits" beam:"initial_splits"`
+	KeySize        int64   `json:"key_size" beam:"key_size"`
+	ValueSize      int64   `json:"value_size" beam:"value_size"`
+	NumHotKeys     int64   `json:"num_hot_keys" beam:"num_hot_keys"`
+	HotKeyFraction float64 `json:"hot_key_fraction" beam:"hot_key_fraction"`
 }
diff --git a/sdks/go/test/load/sideinput/sideinput.go b/sdks/go/test/load/sideinput/sideinput.go
index fe4f4527075..6f7cb6f2d41 100644
--- a/sdks/go/test/load/sideinput/sideinput.go
+++ b/sdks/go/test/load/sideinput/sideinput.go
@@ -52,13 +52,12 @@ func parseSyntheticConfig() synthetic.SourceConfig {
 }
 
 type doFn struct {
-	ElementsToAccess int
+	ElementsToAccess int64
 }
 
 func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
-	var key []byte
-	var value []byte
-	i := 0
+	var key, value []byte
+	var i int64
 	for values(&key, &value) {
 		if i >= fn.ElementsToAccess {
 			break
@@ -75,7 +74,7 @@ func main() {
 	p, s := beam.NewPipelineWithRoot()
 
 	syntheticConfig := parseSyntheticConfig()
-	elementsToAccess := syntheticConfig.NumElements * *accessPercentage / 100
+	elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100)
 
 	src := synthetic.SourceSingle(s, syntheticConfig)
 	src = beam.ParDo(s, &load.RuntimeMonitor{}, src)