You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/06/23 13:12:48 UTC

[beam] branch master updated: [BEAM-10258] Fix Dataflow-based Jenkins jobs (#12048)

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c24f23  [BEAM-10258] Fix Dataflow-based Jenkins jobs (#12048)
4c24f23 is described below

commit 4c24f230d57049793218e756cb8bb213e906aaaa
Author: Michal Walenia <32...@users.noreply.github.com>
AuthorDate: Tue Jun 23 15:12:24 2020 +0200

    [BEAM-10258] Fix Dataflow-based Jenkins jobs (#12048)
    
    [BEAM-10258] Add TestDataflowRunner as separate option to CommonTestProperties.
    
    Fix randomized UUID usage in Python PubSubIO tests.
    
    Co-authored-by: Kamil Wasilewski <ka...@polidea.com>
---
 .test-infra/jenkins/CommonTestProperties.groovy             | 10 +++++++---
 .../jenkins/job_PerformanceTests_PubsubIO_Python.groovy     |  6 ++----
 sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py       | 13 +++++++++----
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git a/.test-infra/jenkins/CommonTestProperties.groovy b/.test-infra/jenkins/CommonTestProperties.groovy
index f46323a..b78901b 100644
--- a/.test-infra/jenkins/CommonTestProperties.groovy
+++ b/.test-infra/jenkins/CommonTestProperties.groovy
@@ -26,7 +26,8 @@ class CommonTestProperties {
     }
 
     enum Runner {
-        DATAFLOW("TestDataflowRunner"),
+        DATAFLOW("DataflowRunner"),
+        TEST_DATAFLOW("TestDataflowRunner"),
         SPARK("SparkRunner"),
         SPARK_STRUCTURED_STREAMING("SparkStructuredStreamingRunner"),
         FLINK("FlinkRunner"),
@@ -36,18 +37,21 @@ class CommonTestProperties {
         def RUNNER_DEPENDENCY_MAP = [
                 JAVA: [
                         DATAFLOW: ":runners:google-cloud-dataflow-java",
+                        TEST_DATAFLOW: ":runners:google-cloud-dataflow-java",
                         SPARK: ":runners:spark",
                         SPARK_STRUCTURED_STREAMING: ":runners:spark",
                         FLINK: ":runners:flink:1.10",
                         DIRECT: ":runners:direct-java"
                 ],
                 PYTHON: [
-                        DATAFLOW: "TestDataflowRunner",
+                        DATAFLOW: "DataflowRunner",
+                        TEST_DATAFLOW: "TestDataflowRunner",
                         DIRECT: "DirectRunner",
                         PORTABLE: "PortableRunner"
                 ],
                 PYTHON_37: [
-                        DATAFLOW: "TestDataflowRunner",
+                        DATAFLOW: "DataflowRunner",
+                        TEST_DATAFLOW: "TestDataflowRunner",
                         DIRECT: "DirectRunner",
                         PORTABLE: "PortableRunner"
                 ]
diff --git a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
index f08e53e..e0b5e2f 100644
--- a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
@@ -24,14 +24,12 @@ import static java.util.UUID.randomUUID
 
 def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
 
-def pubsubId = randomUUID()
-def pubsubNamespace = "pubsub_io_performance_${pubsubId}"
 def withDataflowWorkerJar = true
 
 def psio_test = [
         title          : 'PubsubIO Write Performance Test Python 2GB',
         test           : 'apache_beam.io.gcp.pubsub_io_perf_test',
-        runner         : CommonTestProperties.Runner.DATAFLOW,
+        runner         : CommonTestProperties.Runner.TEST_DATAFLOW,
         pipelineOptions: [
                 job_name                  : 'performance-tests-psio-python-2gb' + now,
                 project                   : 'apache-beam-testing',
@@ -49,7 +47,7 @@ def psio_test = [
                         '"value_size": 1024}\'',
                 num_workers               : 5,
                 autoscaling_algorithm     : 'NONE',  // Disable autoscale the worker pool.
-                pubsub_namespace          : pubsubNamespace,
+                pubsub_namespace_prefix   : 'pubsub_io_performance_',
                 wait_until_finish_duration: 1000 * 60 * 10, // in milliseconds
         ]
 ]
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py b/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py
index a0457c0..24bc9b9 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py
@@ -30,7 +30,7 @@ python -m apache_beam.io.gcp.pubsub_io_perf_test \
     --temp_location=gs://<BUCKET_NAME>/tmp
     --staging_location=gs://<BUCKET_NAME>/staging
     --wait_until_finish_duration=<TIME_IN_MS>
-    --pubsub_namespace=<PUBSUB_NAMESPACE>
+    --pubsub_namespace_prefix=<PUBSUB_NAMESPACE_PREFIX>
     --publish_to_big_query=<OPTIONAL><true/false>
     --metrics_dataset=<OPTIONAL>
     --metrics_table=<OPTIONAL>
@@ -80,15 +80,17 @@ MATCHER_PULL_TIMEOUT = 60 * 5
 
 class PubsubIOPerfTest(LoadTest):
   def _setup_env(self):
-    if not self.pipeline.get_option('pubsub_namespace'):
-      logging.error('--pubsub_namespace argument is required.')
+    if not self.pipeline.get_option('pubsub_namespace_prefix'):
+      logging.error('--pubsub_namespace_prefix argument is required.')
       sys.exit(1)
     if not self.pipeline.get_option('wait_until_finish_duration'):
       logging.error('--wait_until_finish_duration argument is required.')
       sys.exit(1)
 
     self.num_of_messages = int(self.input_options.get('num_records'))
-    self.pubsub_namespace = self.pipeline.get_option('pubsub_namespace')
+    pubsub_namespace_prefix = self.pipeline.get_option(
+        'pubsub_namespace_prefix')
+    self.pubsub_namespace = pubsub_namespace_prefix + unique_id
 
   def _setup_pubsub(self):
     self.pub_client = pubsub.PublisherClient()
@@ -215,6 +217,9 @@ class PubsubReadPerfTest(PubsubIOPerfTest):
 
 
 if __name__ == '__main__':
+  import uuid
+  unique_id = str(uuid.uuid4())
+
   logging.basicConfig(level=logging.INFO)
   PubsubWritePerfTest().run()
   PubsubReadPerfTest().run()