You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/02 01:10:41 UTC

[GitHub] [beam] y1chi commented on a change in pull request #12709: add more options to nexmark launcher

y1chi commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r481494611



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':

Review comment:
       why are we limiting this to dataflow runner? the benchmark should be made available to all runners

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -263,24 +378,27 @@ def get_performance(result, event_monitor, result_monitor):
         result_monitor.namespace,
         result_monitor.name_prefix + MonitorSuffix.EVENT_TIME)
 
+    perf = NexmarkPerf()
+    perf.event_count = event_count

Review comment:
       you can add these fields to NexmarkPerf init as kwarg.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -67,18 +67,20 @@
 import logging
 import sys
 import uuid
-
-from google.cloud import pubsub
+from time import sleep

Review comment:
       can we just import time

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
##########
@@ -230,6 +230,15 @@ def millis_to_timestamp(millis):
 
 def get_counter_metric(result, namespace, name):
   # type: (PipelineResult, str, str) -> int
+
+  """
+  get specific counter metric from pipeline result

Review comment:
       we should use google pydoc style.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)
       else:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.__class__.log_performance(perf)
+
     except Exception as exc:
       query_errors.append(str(exc))
       raise
 
+  def monitor(self, job, event_monitor, result_monitor):
+    last_active_ms = -1
+    perf = None
+    cancel_job = False
+    waiting_for_shutdown = False
+
+    while True:
+      now = int(time() * 1000)  # current time in ms
+      logging.debug('now is %d', now)
+
+      curr_perf = NexmarkLauncher.get_performance(
+          job, event_monitor, result_monitor)
+      if perf is None or curr_perf.is_active(perf):
+        last_active_ms = now
+      if self.streaming and not waiting_for_shutdown:
+        quiet_duration = (now - last_active_ms) // 1000
+        if curr_perf.event_count >= self.args.num_events and\
+           curr_perf.result_count >= 0 and quiet_duration > self.DONE_DELAY:
+          logging.info('streaming query appears to have finished executing')
+          waiting_for_shutdown = True
+          cancel_job = True
+        elif quiet_duration > self.TERMINATE_DELAY:
+          logging.error(
+              'streaming query have been stuck for %d seconds', quiet_duration)
+          logging.error('canceling streaming job')
+          waiting_for_shutdown = True
+          cancel_job = True
+        elif quiet_duration > self.WARNING_DELAY:
+          logging.warning(
+              'streaming query have been stuck for %d seconds', quiet_duration)
+
+        if cancel_job:
+          job.cancel()
+
+      stopped = PipelineState.is_terminal(job.state)
+      if stopped:
+        break
+
+      perf = curr_perf
+      if not waiting_for_shutdown:
+        if last_active_ms == now:
+          logging.info('acticity seen, new performance data extracted')

Review comment:
       typo acticity -> activity

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':
+        print('error: argument --wait_until_finish_duration\n' +
+              '--project and --DataflowRunner required when running in streaming mode')  # pylint: disable=line-too-long
+        sys.exit(1)
+    else:
+      if self.args.input is None:
+        print('error: argument --input is required when running in batch mode')

Review comment:
       ditto

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -156,34 +191,32 @@ def parse_args(self):
 
     # Usage with Dataflow requires a project to be supplied.
     self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --project is required')
-      sys.exit(1)
-
-    # Pub/Sub is currently available for use only in streaming pipelines.
     self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
-    if self.streaming is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --streaming is required')
-      sys.exit(1)
 
     # wait_until_finish ensures that the streaming job is canceled.
     self.wait_until_finish_duration = (
         self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-    if self.wait_until_finish_duration is None:
-      parser.print_usage()
-      print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required')  # pylint: disable=line-too-long
-      sys.exit(1)
+    self.runner = self.pipeline_options.view_as(StandardOptions).runner
+
+    if self.streaming:
+      if self.wait_until_finish_duration is None\
+          or self.project is None or self.runner != 'DataflowRunner':
+        print('error: argument --wait_until_finish_duration\n' +

Review comment:
       can we raise exception instead?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)
       else:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.__class__.log_performance(perf)

Review comment:
       __class__ is probably not needed.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +233,128 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          id_label='id',
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
+      if self.runner == 'DataflowRunner':
+        result.wait_until_finish(duration=self.wait_until_finish_duration)

Review comment:
       if we are proactively terminating the job, wait_until_finish_duration is not needed then.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(self):
+    self.runtime_sec = -1.0
+    self.event_count = -1
+    self.event_per_sec = -1.0
+    self.result_count = -1
+
+  def is_active(self, other):

Review comment:
       nit: lets s/other/previous_perf/, s/is_active/has_progress/, I think this will make it more understandable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org