You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/27 23:28:16 UTC

[GitHub] [beam] leiyiz opened a new pull request #12709: add more options to nexmark launcher

leiyiz opened a new pull request #12709:
URL: https://github.com/apache/beam/pull/12709


   Added batch mode option to the launcher and incoporated changes from Yichi's pr.
   different requirements for different running modes
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace
   --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   ![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg)
   ![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483289536



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -67,18 +67,20 @@
 import logging
 import sys
 import uuid
-
-from google.cloud import pubsub
+from time import sleep

Review comment:
       since you wouldn't have to import each function you want to use from time module. It's not a big deal but I believe calling time.sleep() and time.time() will concise enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.03%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.18%   -0.04%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53803     +133     
   ==========================================
   + Hits        21587    21619      +32     
   - Misses      32083    32184     +101     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...1b2f8e9](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r484928699



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +223,126 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))

Review comment:
       Do you need to timestamp as well?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +223,126 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
-      else:
+      if not self.streaming:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.log_performance(perf)
+
     except Exception as exc:
       query_errors.append(str(exc))
       raise
 
+  def monitor(self, job, event_monitor, result_monitor):
+    logging.info('starting to monitor the job')
+    last_active_ms = -1
+    perf = None
+    cancel_job = False
+    waiting_for_shutdown = False
+
+    while True:
+      now = int(time.time() * 1000)  # current time in ms
+      logging.debug('now is %d', now)
+
+      curr_perf = NexmarkLauncher.get_performance(
+          job, event_monitor, result_monitor)
+      if perf is None or curr_perf.has_progress(perf):
+        last_active_ms = now
+      if self.streaming and not waiting_for_shutdown:
+        quiet_duration = (now - last_active_ms) // 1000
+        if curr_perf.event_count >= self.args.num_events and\
+           curr_perf.result_count >= 0 and quiet_duration > self.DONE_DELAY:

Review comment:
       @leiyiz the logic in this method is a little difficult to follow (the spurious codecov annotations don't help either) - do you think you could add a pydoc at the top roughly describing what the method does?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(
+      self,
+      runtime_sec=None,
+      event_count=None,
+      event_per_sec=None,
+      result_count=None):
+    self.runtime_sec = runtime_sec if runtime_sec else -1.0
+    self.event_count = event_count if event_count else -1
+    self.event_per_sec = event_per_sec if event_per_sec else -1.0
+    self.result_count = result_count if result_count else -1
+
+  def has_progress(self, previous_perf):
+    # type: (NexmarkPerf) -> bool
+
+    """
+    Args:
+      previous_perf: a NexmarkPerf object to be compared to self
+
+    Returns:
+      True if there are activity between self and other NexmarkPerf values
+    """
+    if self.runtime_sec != previous_perf.runtime_sec or\

Review comment:
       let's use parentheses instead of `\`

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(
+      self,
+      runtime_sec=None,
+      event_count=None,
+      event_per_sec=None,
+      result_count=None):
+    self.runtime_sec = runtime_sec if runtime_sec else -1.0
+    self.event_count = event_count if event_count else -1
+    self.event_per_sec = event_per_sec if event_per_sec else -1.0
+    self.result_count = result_count if result_count else -1
+
+  def has_progress(self, previous_perf):
+    # type: (NexmarkPerf) -> bool
+
+    """
+    Args:
+      previous_perf: a NexmarkPerf object to be compared to self
+
+    Returns:
+      True if there are activity between self and other NexmarkPerf values
+    """
+    if self.runtime_sec != previous_perf.runtime_sec or\
+       self.event_count != previous_perf.event_count or\
+       self.result_count != previous_perf.result_count:

Review comment:
       Also, is progress in `runtime_sec` an indicator of progress in the pipeline? Perhaps could you add pydoc for this class describing what each attribute represents?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +223,126 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
-      else:
+      if not self.streaming:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.log_performance(perf)
+
     except Exception as exc:
       query_errors.append(str(exc))
       raise
 
+  def monitor(self, job, event_monitor, result_monitor):
+    logging.info('starting to monitor the job')
+    last_active_ms = -1
+    perf = None
+    cancel_job = False
+    waiting_for_shutdown = False
+
+    while True:
+      now = int(time.time() * 1000)  # current time in ms
+      logging.debug('now is %d', now)
+
+      curr_perf = NexmarkLauncher.get_performance(
+          job, event_monitor, result_monitor)
+      if perf is None or curr_perf.has_progress(perf):
+        last_active_ms = now
+      if self.streaming and not waiting_for_shutdown:
+        quiet_duration = (now - last_active_ms) // 1000
+        if curr_perf.event_count >= self.args.num_events and\

Review comment:
       We don't usually rely on `\` to add newlines. We use parentheses (e.g. `if (curr_perf.event_count >=.... \n\t something something):`

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -96,24 +94,41 @@
 
 
 class NexmarkLauncher(object):
+
+  # how long after some result is seen and no activity seen do we cancel job
+  DONE_DELAY = 5 * 60
+  # delay in seconds between sample perf data
+  PERF_DELAY = 20
+  # delay before cancelling the job when pipeline appears to be stuck
+  TERMINATE_DELAY = 1 * 60 * 60
+  # delay before warning when pipeline appears to be stuck
+  WARNING_DELAY = 10 * 60
+
   def __init__(self):
     self.parse_args()
-    self.uuid = str(uuid.uuid4())
-    self.topic_name = self.args.topic_name + self.uuid
-    self.subscription_name = self.args.subscription_name + self.uuid
-    publish_client = pubsub.Client(project=self.project)
-    topic = publish_client.topic(self.topic_name)
-    if topic.exists():
-      logging.info('deleting topic %s', self.topic_name)
-      topic.delete()
-    logging.info('creating topic %s', self.topic_name)
-    topic.create()
-    sub = topic.subscription(self.subscription_name)
-    if sub.exists():
-      logging.info('deleting sub %s', self.topic_name)
-      sub.delete()
-    logging.info('creating sub %s', self.topic_name)
-    sub.create()
+    self.manage_resources = self.args.manage_resources
+    self.uuid = str(uuid.uuid4()) if self.manage_resources else ''
+    self.topic_name = (
+        self.args.topic_name + self.uuid if self.args.topic_name else None)
+    self.subscription_name = (
+        self.args.subscription_name +
+        self.uuid if self.args.subscription_name else None)
+    self.pubsub_mode = self.args.pubsub_mode
+    if self.manage_resources:
+      from google.cloud import pubsub
+      publish_client = pubsub.Client(project=self.project)
+      topic = publish_client.topic(self.topic_name)
+      if topic.exists():
+        logging.info('deleting topic %s', self.topic_name)
+        topic.delete()
+      logging.info('creating topic %s', self.topic_name)
+      topic.create()
+      sub = topic.subscription(self.subscription_name)
+      if sub.exists():
+        logging.info('deleting sub %s', self.topic_name)
+        sub.delete()

Review comment:
       It looks like you can call `self.cleanup` for most of these, and save a few lines? : )

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(
+      self,
+      runtime_sec=None,
+      event_count=None,
+      event_per_sec=None,
+      result_count=None):
+    self.runtime_sec = runtime_sec if runtime_sec else -1.0
+    self.event_count = event_count if event_count else -1
+    self.event_per_sec = event_per_sec if event_per_sec else -1.0
+    self.result_count = result_count if result_count else -1
+
+  def has_progress(self, previous_perf):
+    # type: (NexmarkPerf) -> bool
+
+    """
+    Args:
+      previous_perf: a NexmarkPerf object to be compared to self
+
+    Returns:
+      True if there are activity between self and other NexmarkPerf values
+    """
+    if self.runtime_sec != previous_perf.runtime_sec or\

Review comment:
       e.g.
   ```
   if (self.r_s != ... or
       self.abc != self.def or ...):
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/faf76d59a5b24bf7f0daed5a503a07f0fff70ec1?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.29%   -0.01%     
   ==========================================
     Files         451      451              
     Lines       53168    53175       +7     
   ==========================================
     Hits        21429    21429              
   - Misses      31739    31746       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...83c38b9](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483274611



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)

Review comment:
       Don't we need to wait for a little bit for the python pipeline to spin up?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483286761



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(self):
+    self.runtime_sec = -1.0
+    self.event_count = -1
+    self.event_per_sec = -1.0
+    self.result_count = -1
+
+  def is_active(self, other):

Review comment:
       sorry for any confusion, I meant replace `other` with `previous_perf` and replace `is_active` with `has_progress`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bc39bc52f517000b9e8b59c56ede57d49e11ef3?el=desc) will **decrease** coverage by `0.13%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.16%   -0.14%     
   ==========================================
     Files         451      455       +4     
     Lines       53169    53742     +573     
   ==========================================
   + Hits        21429    21587     +158     
   - Misses      31740    32155     +415     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.60% <0.00%> (-0.98%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.77% <0.00%> (-0.85%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZXN5c3RlbXMucHk=) | `57.50% <0.00%> (-0.26%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/environments.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9lbnZpcm9ubWVudHMucHk=) | `52.42% <0.00%> (-0.18%)` | :arrow_down: |
   | [setup.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2V0dXAucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...apache\_beam/runners/portability/portable\_runner.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9wb3J0YWJsZV9ydW5uZXIucHk=) | `27.37% <0.00%> (ø)` | |
   | ... and [12 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...21e48ab](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #12709: [BEAM-8258] add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #12709:
URL: https://github.com/apache/beam/pull/12709


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.05%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.16%   -0.06%     
   ==========================================
     Files         454      455       +1     
     Lines       53670    53750      +80     
   ==========================================
     Hits        21587    21587              
   - Misses      32083    32163      +80     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...d3bb694](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483275456



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -67,18 +67,20 @@
 import logging
 import sys
 import uuid
-
-from google.cloud import pubsub
+from time import sleep

Review comment:
       yeah I think so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.22%   -0.01%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53840     +170     
   ==========================================
   + Hits        21587    21655      +68     
   - Misses      32083    32185     +102     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `23.36% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `28.92% <0.00%> (+0.24%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...1b2f8e9](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.22%   -0.01%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53907     +237     
   ==========================================
   + Hits        21587    21682      +95     
   - Misses      32083    32225     +142     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.05% <0.00%> (-0.55%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `23.36% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...b0b6250](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bc39bc52f517000b9e8b59c56ede57d49e11ef3?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.29%   -0.01%     
   ==========================================
     Files         451      451              
     Lines       53169    53175       +6     
   ==========================================
     Hits        21429    21429              
   - Misses      31740    31746       +6     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...beam/runners/interactive/background\_caching\_job.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9iYWNrZ3JvdW5kX2NhY2hpbmdfam9iLnB5) | `25.24% <0.00%> (+0.24%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...21e48ab](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/faf76d59a5b24bf7f0daed5a503a07f0fff70ec1?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.29%   -0.01%     
   ==========================================
     Files         451      451              
     Lines       53168    53175       +7     
   ==========================================
     Hits        21429    21429              
   - Misses      31739    31746       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...83c38b9](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.03%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.18%   -0.04%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53803     +133     
   ==========================================
   + Hits        21587    21619      +32     
   - Misses      32083    32184     +101     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...d3bb694](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bc39bc52f517000b9e8b59c56ede57d49e11ef3?el=desc) will **decrease** coverage by `0.14%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.16%   -0.15%     
   ==========================================
     Files         451      455       +4     
     Lines       53169    53750     +581     
   ==========================================
   + Hits        21429    21587     +158     
   - Misses      31740    32163     +423     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-1.01%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.60% <0.00%> (-0.98%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZXN5c3RlbXMucHk=) | `57.50% <0.00%> (-0.26%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/environments.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9lbnZpcm9ubWVudHMucHk=) | `52.42% <0.00%> (-0.18%)` | :arrow_down: |
   | [setup.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2V0dXAucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...apache\_beam/runners/portability/portable\_runner.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9wb3J0YWJsZV9ydW5uZXIucHk=) | `27.37% <0.00%> (ø)` | |
   | ... and [12 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...fea6af2](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483307298



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':
+        print('error: argument --wait_until_finish_duration\n' +

Review comment:
       thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483288419



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)

Review comment:
       wait_until_finish duration is overall timeout, we don't need to wait for python pipeline to spin up, and I don't think wait_until_finish was meant for that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/faf76d59a5b24bf7f0daed5a503a07f0fff70ec1?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.29%   -0.01%     
   ==========================================
     Files         451      451              
     Lines       53168    53175       +7     
   ==========================================
     Hits        21429    21429              
   - Misses      31739    31746       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...83c38b9](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bc39bc52f517000b9e8b59c56ede57d49e11ef3?el=desc) will **decrease** coverage by `0.13%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.16%   -0.14%     
   ==========================================
     Files         451      455       +4     
     Lines       53169    53742     +573     
   ==========================================
   + Hits        21429    21587     +158     
   - Misses      31740    32155     +415     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.60% <0.00%> (-0.98%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.77% <0.00%> (-0.85%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZXN5c3RlbXMucHk=) | `57.50% <0.00%> (-0.26%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/environments.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9lbnZpcm9ubWVudHMucHk=) | `52.42% <0.00%> (-0.18%)` | :arrow_down: |
   | [setup.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2V0dXAucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...apache\_beam/runners/portability/portable\_runner.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9wb3J0YWJsZV9ydW5uZXIucHk=) | `27.37% <0.00%> (ø)` | |
   | ... and [12 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...fea6af2](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r485125023



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +223,126 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))

Review comment:
       no because when we read from the pubsub we have timestamp read from attribute




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483273962



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(self):
+    self.runtime_sec = -1.0
+    self.event_count = -1
+    self.event_per_sec = -1.0
+    self.result_count = -1
+
+  def is_active(self, other):

Review comment:
       what does it mean? sorry i don't quite get it
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bc39bc52f517000b9e8b59c56ede57d49e11ef3?el=desc) will **decrease** coverage by `0.14%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.16%   -0.15%     
   ==========================================
     Files         451      455       +4     
     Lines       53169    53750     +581     
   ==========================================
   + Hits        21429    21587     +158     
   - Misses      31740    32163     +423     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-1.01%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.60% <0.00%> (-0.98%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZXN5c3RlbXMucHk=) | `57.50% <0.00%> (-0.26%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/environments.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9lbnZpcm9ubWVudHMucHk=) | `52.42% <0.00%> (-0.18%)` | :arrow_down: |
   | [setup.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2V0dXAucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...apache\_beam/runners/portability/portable\_runner.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9wb3J0YWJsZV9ydW5uZXIucHk=) | `27.37% <0.00%> (ø)` | |
   | ... and [12 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...fea6af2](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483279172



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':
+        print('error: argument --wait_until_finish_duration\n' +

Review comment:
       RuntimeError right? as it is an unexpected error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-686846203


   R: @pabloem 
   R: @y1chi 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483288351



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':

Review comment:
       the ReadFromPubsub on DirectRunner does not support `id_label` argument that's all




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.22%   -0.01%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53840     +170     
   ==========================================
   + Hits        21587    21655      +68     
   - Misses      32083    32185     +102     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `23.36% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `28.92% <0.00%> (+0.24%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...1b2f8e9](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.03%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.18%   -0.04%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53803     +133     
   ==========================================
   + Hits        21587    21619      +32     
   - Misses      32083    32184     +101     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...1c0d677](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.22%   -0.01%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53907     +237     
   ==========================================
   + Hits        21587    21682      +95     
   - Misses      32083    32225     +142     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.05% <0.00%> (-0.55%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `23.36% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...b0b6250](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483289536



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -67,18 +67,20 @@
 import logging
 import sys
 import uuid
-
-from google.cloud import pubsub
+from time import sleep

Review comment:
       since you wouldn't have to import each function you want to use from time module. It's not a big deal but I believe calling time.sleep() and time.time() will be concise enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.22%   -0.01%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53840     +170     
   ==========================================
   + Hits        21587    21655      +68     
   - Misses      32083    32185     +102     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `23.36% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `28.92% <0.00%> (+0.24%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...b0b6250](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bc39bc52f517000b9e8b59c56ede57d49e11ef3?el=desc) will **decrease** coverage by `0.13%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.16%   -0.14%     
   ==========================================
     Files         451      455       +4     
     Lines       53169    53742     +573     
   ==========================================
   + Hits        21429    21587     +158     
   - Misses      31740    32155     +415     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.60% <0.00%> (-0.98%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.77% <0.00%> (-0.85%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZXN5c3RlbXMucHk=) | `57.50% <0.00%> (-0.26%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/environments.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9lbnZpcm9ubWVudHMucHk=) | `52.42% <0.00%> (-0.18%)` | :arrow_down: |
   | [setup.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2V0dXAucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...apache\_beam/runners/portability/portable\_runner.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9wb3J0YWJsZV9ydW5uZXIucHk=) | `27.37% <0.00%> (ø)` | |
   | ... and [12 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...21e48ab](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r481494611



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':

Review comment:
       why are we limiting this to dataflow runner? the benchmark should be made available to all runners

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -263,24 +378,27 @@ def get_performance(result, event_monitor, result_monitor):
         result_monitor.namespace,
         result_monitor.name_prefix + MonitorSuffix.EVENT_TIME)
 
+    perf = NexmarkPerf()
+    perf.event_count = event_count

Review comment:
       you can add these fields to NexmarkPerf init as kwarg.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -67,18 +67,20 @@
 import logging
 import sys
 import uuid
-
-from google.cloud import pubsub
+from time import sleep

Review comment:
       can we just import time

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
##########
@@ -230,6 +230,15 @@ def millis_to_timestamp(millis):
 
 def get_counter_metric(result, namespace, name):
   # type: (PipelineResult, str, str) -> int
+
+  """
+  get specific counter metric from pipeline result

Review comment:
       we should use google pydoc style.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)
       else:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.__class__.log_performance(perf)
+
     except Exception as exc:
       query_errors.append(str(exc))
       raise
 
+  def monitor(self, job, event_monitor, result_monitor):
+    last_active_ms = -1
+    perf = None
+    cancel_job = False
+    waiting_for_shutdown = False
+
+    while True:
+      now = int(time() * 1000)  # current time in ms
+      logging.debug('now is %d', now)
+
+      curr_perf = NexmarkLauncher.get_performance(
+          job, event_monitor, result_monitor)
+      if perf is None or curr_perf.is_active(perf):
+        last_active_ms = now
+      if self.streaming and not waiting_for_shutdown:
+        quiet_duration = (now - last_active_ms) // 1000
+        if curr_perf.event_count >= self.args.num_events and\
+           curr_perf.result_count >= 0 and quiet_duration > self.DONE_DELAY:
+          logging.info('streaming query appears to have finished executing')
+          waiting_for_shutdown = True
+          cancel_job = True
+        elif quiet_duration > self.TERMINATE_DELAY:
+          logging.error(
+              'streaming query have been stuck for %d seconds', quiet_duration)
+          logging.error('canceling streaming job')
+          waiting_for_shutdown = True
+          cancel_job = True
+        elif quiet_duration > self.WARNING_DELAY:
+          logging.warning(
+              'streaming query have been stuck for %d seconds', quiet_duration)
+
+        if cancel_job:
+          job.cancel()
+
+      stopped = PipelineState.is_terminal(job.state)
+      if stopped:
+        break
+
+      perf = curr_perf
+      if not waiting_for_shutdown:
+        if last_active_ms == now:
+          logging.info('acticity seen, new performance data extracted')

Review comment:
       typo acticity -> activity

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':
+        print('error: argument --wait_until_finish_duration\n' +
+              '--project and --DataflowRunner required when running in streaming mode')  # pylint: disable=line-too-long
+        sys.exit(1)
+    else:
+      if self.args.input is None:
+        print('error: argument --input is required when running in batch mode')

Review comment:
       ditto

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':
+        print('error: argument --wait_until_finish_duration\n' +

Review comment:
       can we raise exception instead?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)
       else:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.__class__.log_performance(perf)

Review comment:
       __class__ is probably not needed.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)

Review comment:
       if we are proactively terminating the job, wait_until_finish_duration is not needed then.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(self):
+    self.runtime_sec = -1.0
+    self.event_count = -1
+    self.event_per_sec = -1.0
+    self.result_count = -1
+
+  def is_active(self, other):

Review comment:
       nit: lets s/other/previous_perf/, s/is_active/has_progress/, I think this will make it more understandable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.03%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.18%   -0.04%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53803     +133     
   ==========================================
   + Hits        21587    21619      +32     
   - Misses      32083    32184     +101     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...b5eb0cb](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.22%   -0.01%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53840     +170     
   ==========================================
   + Hits        21587    21655      +68     
   - Misses      32083    32185     +102     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `23.36% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `28.92% <0.00%> (+0.24%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...1b2f8e9](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483275719



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -67,18 +67,20 @@
 import logging
 import sys
 import uuid
-
-from google.cloud import pubsub
+from time import sleep

Review comment:
       but what is the reason to import time instead of only needed methods here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bc39bc52f517000b9e8b59c56ede57d49e11ef3?el=desc) will **decrease** coverage by `0.13%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.30%   40.16%   -0.14%     
   ==========================================
     Files         451      455       +4     
     Lines       53169    53742     +573     
   ==========================================
   + Hits        21429    21587     +158     
   - Misses      31740    32155     +415     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `29.60% <0.00%> (-0.98%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.77% <0.00%> (-0.85%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZXN5c3RlbXMucHk=) | `57.50% <0.00%> (-0.26%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/environments.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9lbnZpcm9ubWVudHMucHk=) | `52.42% <0.00%> (-0.18%)` | :arrow_down: |
   | [setup.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2V0dXAucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/portability/python\_urns.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvcHl0aG9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...apache\_beam/runners/portability/portable\_runner.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9wb3J0YWJsZV9ydW5uZXIucHk=) | `27.37% <0.00%> (ø)` | |
   | ... and [12 more](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...21e48ab](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12709: add more options to nexmark launcher

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12709:
URL: https://github.com/apache/beam/pull/12709#issuecomment-682247488


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=h1) Report
   > Merging [#12709](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/dc3144d54c1f2adcc4f71520d5c1468a4a48e735?el=desc) will **decrease** coverage by `0.03%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12709/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12709      +/-   ##
   ==========================================
   - Coverage   40.22%   40.18%   -0.04%     
   ==========================================
     Files         454      456       +2     
     Lines       53670    53803     +133     
   ==========================================
   + Hits        21587    21619      +32     
   - Misses      32083    32184     +101     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/testing/benchmarks/nexmark/nexmark\_launcher.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19sYXVuY2hlci5weQ==) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_perf.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya19wZXJmLnB5) | `0.00% <0.00%> (ø)` | |
   | [...he\_beam/testing/benchmarks/nexmark/nexmark\_util.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL25leG1hcmsvbmV4bWFya191dGlsLnB5) | `0.00% <0.00%> (ø)` | |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `41.17% <0.00%> (-1.58%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `13.62% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/kinesis.py](https://codecov.io/gh/apache/beam/pull/12709/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2luZXNpcy5weQ==) | `66.66% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=footer). Last update [d89bbfa...d3bb694](https://codecov.io/gh/apache/beam/pull/12709?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483292999



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':

Review comment:
       we can also ignore id_label for now if that is the case, I believe id_label was useful for deduplication, but in the case of a benchmark it is arguably acceptable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r483288885



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':
+        print('error: argument --wait_until_finish_duration\n' +

Review comment:
       It could be ValueError since the argument combination is invalid.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org