You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/21 09:47:46 UTC

[GitHub] [beam] kamilwu commented on a change in pull request #11274: [BEAM-9633] Add PubsubIO performance test

kamilwu commented on a change in pull request #11274:
URL: https://github.com/apache/beam/pull/11274#discussion_r411986754



##########
File path: .test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def psio_read_test = [
+        title          : 'PubsubIO Read Performance Test Python 100000 messages',
+        test           : 'apache_beam.io.gcp.pubsub_read_perf_test',
+        runner         : CommonTestProperties.Runner.DATAFLOW,
+        pipelineOptions: [
+                job_name             : 'performance-tests-psio-read-python-100000-msgs' + now,
+                project              : 'apache-beam-testing',
+                temp_location        : 'gs://temp-storage-for-perf-tests/loadtests',
+                publish_to_big_query : true,
+                metrics_dataset      : 'beam_performance',
+                metrics_table        : 'psio_read_100000msg_results',
+                // Pubsub PublishRequest can have max 10'000'000 bytes
+                input_options        : '\'{"num_records": 100000}\'',
+                num_workers          : 5,
+                autoscaling_algorithm: 'NONE',  // Disable autoscale the worker pool.
+                timeout              : 3600,
+        ]
+]
+
+def psio_write_test = [
+        title          : 'PubsubIO Write Performance Test Python 100000 messages',
+        test           : 'apache_beam.io.gcp.pubsub_write_perf_test',
+        runner         : CommonTestProperties.Runner.DATAFLOW,
+        pipelineOptions: [
+                job_name             : 'performance-tests-psio-write-python-100000-msgs' + now,
+                project              : 'apache-beam-testing',
+                temp_location        : 'gs://temp-storage-for-perf-tests/loadtests',
+                publish_to_big_query : true,
+                metrics_dataset      : 'beam_performance',
+                metrics_table        : 'psio_write_100000msg_results',
+                // Pubsub PublishRequest can have max 10'000'000 bytes
+                input_options        : '\'{' +
+                        '"num_records": 100000,' +
+                        '"key_size": 1,' +
+                        '"value_size": 16}\'',
+                num_workers          : 5,
+                autoscaling_algorithm: 'NONE',  // Disable autoscale the worker pool.
+                timeout              : 3600,
+        ]
+]
+
+def executeJob = { scope, testConfig ->
+    commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240)
+
+    loadTestsBuilder.loadTest(scope, testConfig.title, testConfig.runner, CommonTestProperties.SDK.PYTHON, testConfig.pipelineOptions, testConfig.test)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+        'beam_PubsubIO_Read_Performance_Test_Python',
+        'Run PubsubIO Read Performance Test Python',
+        'PubsubIO Read Performance Test Python',
+        this
+) {
+    executeJob(delegate, psio_read_test)
+}
+
+CronJobBuilder.cronJob('beam_PubsubIO_Read_Performance_Test_Python', 'H 15 * * *', this) {
+    executeJob(delegate, psio_read_test)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+        'beam_PubsubIO_Write_Performance_Test_Python',
+        'Run PubsubIO Write Performance Test Python',
+        'BigQueryIO Write Performance Test Python',

Review comment:
       I believe `BigQueryIO` should be replaced by `PubsubIO`

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_read_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Read operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>
+    }'"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import ReadFromPubSub, WriteToPubSub, PubsubMessage
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing import test_utils
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.test_pipeline import TestPipeline
+
+PUBSUB_NAME = 'pubsub_read_performance'
+
+
+class PubsubReadPerfTest(LoadTest):
+
+  def __init__(self):
+    super(PubsubReadPerfTest, self).__init__()
+    self._setup_env()
+    self._setup_pubsub()
+    self._setup_pipeline()
+
+  def test(self):
+    (
+        self.pipeline
+        | 'Read from pubsub' >> ReadFromPubSub(
+            subscription=self.subscription.name,
+            with_attributes=True,
+            id_label='ack_id')
+        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+  def cleanup(self):
+    test_utils.cleanup_subscriptions(
+      self.sub_client, [self.subscription, self.matcher_subscription])
+    test_utils.cleanup_topics(
+      self.pub_client, [self.topic])
+
+  def _setup_env(self):
+    if not self.pipeline.get_option('timeout'):
+      logging.error('--timeout argument is required.')
+      sys.exit(1)
+
+    self.num_of_messages = int(self.input_options.get('num_records'))
+    self.timeout = int(self.pipeline.get_option('timeout'))
+
+  def _setup_pubsub(self):
+    def init_input(number_of_messages):
+      for n in range(number_of_messages):
+        message = PubsubMessage(
+          bytes(str(n), 'utf-8'),
+          {'ack_id': bytes(str(n), 'utf-8')})
+        bytes_message = WriteToPubSub.to_proto_str(message)
+        self.pub_client.publish(self.topic.name, bytes_message)
+
+    from google.cloud import pubsub

Review comment:
       Unless you need to, let's move this line to the beginning of the file.

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_read_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Read operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>
+    }'"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import ReadFromPubSub, WriteToPubSub, PubsubMessage

Review comment:
       You cannot import multiple objects from a module in a single line. Out pylint will complain. Split this into multiple lines or use something like this in your code: `beam.io.ReadFromPubSub`

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_read_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Read operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>
+    }'"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import ReadFromPubSub, WriteToPubSub, PubsubMessage
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing import test_utils
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.test_pipeline import TestPipeline
+
+PUBSUB_NAME = 'pubsub_read_performance'
+
+
+class PubsubReadPerfTest(LoadTest):
+
+  def __init__(self):
+    super(PubsubReadPerfTest, self).__init__()
+    self._setup_env()
+    self._setup_pubsub()
+    self._setup_pipeline()
+
+  def test(self):
+    (
+        self.pipeline
+        | 'Read from pubsub' >> ReadFromPubSub(
+            subscription=self.subscription.name,
+            with_attributes=True,
+            id_label='ack_id')
+        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+  def cleanup(self):
+    test_utils.cleanup_subscriptions(
+      self.sub_client, [self.subscription, self.matcher_subscription])
+    test_utils.cleanup_topics(
+      self.pub_client, [self.topic])
+
+  def _setup_env(self):
+    if not self.pipeline.get_option('timeout'):
+      logging.error('--timeout argument is required.')
+      sys.exit(1)
+
+    self.num_of_messages = int(self.input_options.get('num_records'))
+    self.timeout = int(self.pipeline.get_option('timeout'))
+
+  def _setup_pubsub(self):
+    def init_input(number_of_messages):
+      for n in range(number_of_messages):
+        message = PubsubMessage(
+          bytes(str(n), 'utf-8'),
+          {'ack_id': bytes(str(n), 'utf-8')})
+        bytes_message = WriteToPubSub.to_proto_str(message)
+        self.pub_client.publish(self.topic.name, bytes_message)
+
+    from google.cloud import pubsub
+
+    self.uuid = str(uuid.uuid4())
+
+    self.pub_client = pubsub.PublisherClient()
+    self.topic = self.pub_client.create_topic(
+      self.pub_client.topic_path(self.project_id, PUBSUB_NAME + self.uuid)
+    )
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.subscription = self.sub_client.create_subscription(
+      self.sub_client.subscription_path(self.project_id, PUBSUB_NAME + self.uuid),
+      self.topic.name
+    )
+    self.matcher_subscription = self.sub_client.create_subscription(
+      self.sub_client.subscription_path(self.project_id, PUBSUB_NAME + '_matcher_' + self.uuid),
+      self.topic.name
+    )
+
+    init_input(self.num_of_messages)
+
+
+  def _setup_pipeline(self):
+    pubsub_msg_verifier = PubSubMessageMatcher(
+        self.project_id,
+        self.matcher_subscription.name,
+        expected_msg_len=self.num_of_messages,
+        timeout=self.timeout
+    )
+
+    self.extra_opts = {
+        'on_success_matcher': all_of(pubsub_msg_verifier),
+        'wait_until_finish_duration': self.timeout * 1000,
+        'streaming': True,
+        'save_main_session': True
+    }
+
+    args = self.pipeline.get_full_options_as_args(**self.extra_opts)
+
+    parser = argparse.ArgumentParser()
+    _, pipeline_args = parser.parse_known_args(args)
+
+    pipeline_options = PipelineOptions(pipeline_args)
+
+    self.pipeline = TestPipeline(options=pipeline_options)

Review comment:
       You don't have to create PipelineOptions and ArgumentParser objects. This should be enough:
   ```
   args = self.pipeline.get_full_options_as_args(**extra_opts)
   self.pipeline = TestPipeline(argv=args)
   ```

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_read_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Read operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>
+    }'"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import ReadFromPubSub, WriteToPubSub, PubsubMessage
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing import test_utils
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.test_pipeline import TestPipeline
+
+PUBSUB_NAME = 'pubsub_read_performance'
+
+
+class PubsubReadPerfTest(LoadTest):
+
+  def __init__(self):
+    super(PubsubReadPerfTest, self).__init__()
+    self._setup_env()
+    self._setup_pubsub()
+    self._setup_pipeline()
+
+  def test(self):
+    (
+        self.pipeline
+        | 'Read from pubsub' >> ReadFromPubSub(
+            subscription=self.subscription.name,
+            with_attributes=True,
+            id_label='ack_id')
+        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+  def cleanup(self):
+    test_utils.cleanup_subscriptions(
+      self.sub_client, [self.subscription, self.matcher_subscription])
+    test_utils.cleanup_topics(
+      self.pub_client, [self.topic])
+
+  def _setup_env(self):
+    if not self.pipeline.get_option('timeout'):
+      logging.error('--timeout argument is required.')
+      sys.exit(1)
+
+    self.num_of_messages = int(self.input_options.get('num_records'))
+    self.timeout = int(self.pipeline.get_option('timeout'))
+
+  def _setup_pubsub(self):
+    def init_input(number_of_messages):
+      for n in range(number_of_messages):
+        message = PubsubMessage(
+          bytes(str(n), 'utf-8'),

Review comment:
       This will fail in Python 2.7, because `bytes` is an alias to `str`, and str() takes at most 1 argument. 
   If you need to convert string to bytes, use `encode` method which works both in Python 2.7 and 3.x

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_read_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Read operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>
+    }'"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import ReadFromPubSub, WriteToPubSub, PubsubMessage
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing import test_utils
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.test_pipeline import TestPipeline
+
+PUBSUB_NAME = 'pubsub_read_performance'
+
+
+class PubsubReadPerfTest(LoadTest):
+
+  def __init__(self):
+    super(PubsubReadPerfTest, self).__init__()
+    self._setup_env()
+    self._setup_pubsub()
+    self._setup_pipeline()
+
+  def test(self):
+    (
+        self.pipeline
+        | 'Read from pubsub' >> ReadFromPubSub(
+            subscription=self.subscription.name,
+            with_attributes=True,
+            id_label='ack_id')
+        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+  def cleanup(self):
+    test_utils.cleanup_subscriptions(
+      self.sub_client, [self.subscription, self.matcher_subscription])
+    test_utils.cleanup_topics(
+      self.pub_client, [self.topic])
+
+  def _setup_env(self):
+    if not self.pipeline.get_option('timeout'):
+      logging.error('--timeout argument is required.')
+      sys.exit(1)
+
+    self.num_of_messages = int(self.input_options.get('num_records'))
+    self.timeout = int(self.pipeline.get_option('timeout'))
+
+  def _setup_pubsub(self):
+    def init_input(number_of_messages):
+      for n in range(number_of_messages):
+        message = PubsubMessage(
+          bytes(str(n), 'utf-8'),
+          {'ack_id': bytes(str(n), 'utf-8')})
+        bytes_message = WriteToPubSub.to_proto_str(message)
+        self.pub_client.publish(self.topic.name, bytes_message)
+
+    from google.cloud import pubsub
+
+    self.uuid = str(uuid.uuid4())
+
+    self.pub_client = pubsub.PublisherClient()
+    self.topic = self.pub_client.create_topic(
+      self.pub_client.topic_path(self.project_id, PUBSUB_NAME + self.uuid)
+    )
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.subscription = self.sub_client.create_subscription(
+      self.sub_client.subscription_path(self.project_id, PUBSUB_NAME + self.uuid),
+      self.topic.name
+    )
+    self.matcher_subscription = self.sub_client.create_subscription(
+      self.sub_client.subscription_path(self.project_id, PUBSUB_NAME + '_matcher_' + self.uuid),
+      self.topic.name
+    )
+
+    init_input(self.num_of_messages)
+
+
+  def _setup_pipeline(self):
+    pubsub_msg_verifier = PubSubMessageMatcher(
+        self.project_id,
+        self.matcher_subscription.name,
+        expected_msg_len=self.num_of_messages,
+        timeout=self.timeout
+    )
+
+    self.extra_opts = {
+        'on_success_matcher': all_of(pubsub_msg_verifier),
+        'wait_until_finish_duration': self.timeout * 1000,
+        'streaming': True,
+        'save_main_session': True
+    }
+
+    args = self.pipeline.get_full_options_as_args(**self.extra_opts)

Review comment:
       I think since we don't use `self.extra_opts` anywhere else, the assignment to `self` is redundant.

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_read_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Read operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner

Review comment:
       I would put emphasis on that the runner must be `TestDataflowRunner`, not `DataflowRunner`. This is very important information for anyone who would run this code.

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_read_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Read operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>
+    }'"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import ReadFromPubSub, WriteToPubSub, PubsubMessage
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing import test_utils
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.test_pipeline import TestPipeline
+
+PUBSUB_NAME = 'pubsub_read_performance'
+
+
+class PubsubReadPerfTest(LoadTest):
+
+  def __init__(self):
+    super(PubsubReadPerfTest, self).__init__()
+    self._setup_env()
+    self._setup_pubsub()
+    self._setup_pipeline()
+
+  def test(self):
+    (
+        self.pipeline
+        | 'Read from pubsub' >> ReadFromPubSub(
+            subscription=self.subscription.name,
+            with_attributes=True,
+            id_label='ack_id')
+        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+  def cleanup(self):
+    test_utils.cleanup_subscriptions(
+      self.sub_client, [self.subscription, self.matcher_subscription])
+    test_utils.cleanup_topics(
+      self.pub_client, [self.topic])
+
+  def _setup_env(self):
+    if not self.pipeline.get_option('timeout'):
+      logging.error('--timeout argument is required.')
+      sys.exit(1)
+
+    self.num_of_messages = int(self.input_options.get('num_records'))
+    self.timeout = int(self.pipeline.get_option('timeout'))
+
+  def _setup_pubsub(self):
+    def init_input(number_of_messages):

Review comment:
       If you don't use a closure, I think you can make this function a top-level one. It would be easier to understand the program flow.

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_write_perf_test.py
##########
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Write operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>,
+    \"key_size\": 1,
+    \"value_size\": <VALUE_SIZE>,
+    }'"
+
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import WriteToPubSub, PubsubMessage, Read
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime, CountMessages
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.test_pipeline import TestPipeline
+
+PUBSUB_NAME = 'pubsub_write_performance'
+
+
+class PubsubWritePerfTest(LoadTest):
+
+  def __init__(self):
+    super(PubsubWritePerfTest, self).__init__()
+    self._setup_env()
+    self._setup_pubsub()
+    self._setup_pipeline()
+
+  def test(self):
+
+    class ToPubsubMessage(beam.DoFn):
+      counter = 0
+
+      def process(self, element):
+        self.counter += 1
+        from apache_beam.io import WriteToPubSub, PubsubMessage
+        message = PubsubMessage(
+          data=element[1],
+          attributes={'ack_id': bytes(str(self.counter), 'utf-8')})

Review comment:
       The same incompatibility error I mentioned earlier.

##########
File path: sdks/python/apache_beam/io/gcp/pubsub_write_perf_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Performance PubsubIO streaming test for Write operation.
+
+Example for TestDataflowRunner:
+
+python -m apache_beam.io.gcp.pubsub_read_perf_test \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --project=<GCP_PROJECT_ID>
+    --temp_location=gs://<BUCKET_NAME>/tmp
+    --staging_location=gs://<BUCKET_NAME>/staging
+    --timeout=<TIME_IN_SECONDS>
+    --publish_to_big_query=<OPTIONAL><true/false>
+    --metrics_dataset=<OPTIONAL>
+    --metrics_table=<OPTIONAL>
+    --dataflow_worker_jar=<OPTIONAL>
+    --input_options='{
+    \"num_records\": <SIZE_OF_INPUT>,
+    \"key_size\": 1,
+    \"value_size\": <VALUE_SIZE>,
+    }'"
+
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import uuid
+
+from hamcrest import all_of
+
+import apache_beam as beam
+from apache_beam.io import WriteToPubSub, PubsubMessage, Read
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime, CountMessages
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.test_pipeline import TestPipeline
+
+PUBSUB_NAME = 'pubsub_write_performance'
+
+
+class PubsubWritePerfTest(LoadTest):
+
+  def __init__(self):
+    super(PubsubWritePerfTest, self).__init__()
+    self._setup_env()
+    self._setup_pubsub()
+    self._setup_pipeline()
+
+  def test(self):
+
+    class ToPubsubMessage(beam.DoFn):

Review comment:
       For trivial `DoFn`'s, you can also define a simple function instead of `DoFn`. Then, if it's a generator function, you can use a `beam.FlatMap` in the pipeline. Here are some examples: https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org