You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/08 13:55:41 UTC

[GitHub] [beam] pabloem commented on a change in pull request #12709: add more options and monitoring to nexmark launcher

pabloem commented on a change in pull request #12709:
URL: https://github.com/apache/beam/pull/12709#discussion_r484928699



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +223,126 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))

Review comment:
       Do you need to timestamp as well?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +223,126 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
-      else:
+      if not self.streaming:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.log_performance(perf)
+
     except Exception as exc:
       query_errors.append(str(exc))
       raise
 
+  def monitor(self, job, event_monitor, result_monitor):
+    logging.info('starting to monitor the job')
+    last_active_ms = -1
+    perf = None
+    cancel_job = False
+    waiting_for_shutdown = False
+
+    while True:
+      now = int(time.time() * 1000)  # current time in ms
+      logging.debug('now is %d', now)
+
+      curr_perf = NexmarkLauncher.get_performance(
+          job, event_monitor, result_monitor)
+      if perf is None or curr_perf.has_progress(perf):
+        last_active_ms = now
+      if self.streaming and not waiting_for_shutdown:
+        quiet_duration = (now - last_active_ms) // 1000
+        if curr_perf.event_count >= self.args.num_events and\
+           curr_perf.result_count >= 0 and quiet_duration > self.DONE_DELAY:

Review comment:
       @leiyiz the logic in this method is a little difficult to follow (the spurious codecov annotations don't help either) - do you think you could add a pydoc at the top roughly describing what the method does?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(
+      self,
+      runtime_sec=None,
+      event_count=None,
+      event_per_sec=None,
+      result_count=None):
+    self.runtime_sec = runtime_sec if runtime_sec else -1.0
+    self.event_count = event_count if event_count else -1
+    self.event_per_sec = event_per_sec if event_per_sec else -1.0
+    self.result_count = result_count if result_count else -1
+
+  def has_progress(self, previous_perf):
+    # type: (NexmarkPerf) -> bool
+
+    """
+    Args:
+      previous_perf: a NexmarkPerf object to be compared to self
+
+    Returns:
+      True if there are activity between self and other NexmarkPerf values
+    """
+    if self.runtime_sec != previous_perf.runtime_sec or\

Review comment:
       let's use parentheses instead of `\`

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(
+      self,
+      runtime_sec=None,
+      event_count=None,
+      event_per_sec=None,
+      result_count=None):
+    self.runtime_sec = runtime_sec if runtime_sec else -1.0
+    self.event_count = event_count if event_count else -1
+    self.event_per_sec = event_per_sec if event_per_sec else -1.0
+    self.result_count = result_count if result_count else -1
+
+  def has_progress(self, previous_perf):
+    # type: (NexmarkPerf) -> bool
+
+    """
+    Args:
+      previous_perf: a NexmarkPerf object to be compared to self
+
+    Returns:
+      True if there are activity between self and other NexmarkPerf values
+    """
+    if self.runtime_sec != previous_perf.runtime_sec or\
+       self.event_count != previous_perf.event_count or\
+       self.result_count != previous_perf.result_count:

Review comment:
       Also, is progress in `runtime_sec` an indicator of progress in the pipeline? Perhaps could you add pydoc for this class describing what each attribute represents?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -200,46 +223,126 @@ def generate_events(self):
 
     logging.info('Finished event generation.')
 
+  def read_from_file(self):
+    return (
+        self.pipeline
+        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
+        | 'timestamping' >>
+        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
+
+  def read_from_pubsub(self):
     # Read from PubSub into a PCollection.
-    if self.args.subscription_name:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          subscription=sub.full_name)
+    if self.subscription_name:
+      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
+          subscription=self.subscription_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
     else:
-      raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub(
-          topic=topic.full_name)
-    raw_events = (
+      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
+          topic=self.topic_name,
+          with_attributes=True,
+          timestamp_attribute='timestamp')
+    events = (
         raw_events
-        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn())
-        | 'timestamping' >>
-        beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
-    return raw_events
+        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
+        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
+    return events
 
   def run_query(self, query, query_args, query_errors):
     try:
-      self.parse_args()
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
 
       event_monitor = Monitor('.events', 'event')
       result_monitor = Monitor('.results', 'result')
 
-      events = self.generate_events()
+      if self.streaming:
+        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
+          self.generate_events()
+        if self.pubsub_mode == 'PUBLISH_ONLY':
+          return
+        events = self.read_from_pubsub()
+      else:
+        events = self.read_from_file()
+
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
       output = query.load(events, query_args)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
 
       result = self.pipeline.run()
-      job_duration = (
-          self.pipeline_options.view_as(TestOptions).wait_until_finish_duration)
-      if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner':  # pylint: disable=line-too-long
-        result.wait_until_finish(duration=job_duration)
-        result.cancel()
-      else:
+      if not self.streaming:
         result.wait_until_finish()
+      perf = self.monitor(result, event_monitor, result_monitor)
+      self.log_performance(perf)
+
     except Exception as exc:
       query_errors.append(str(exc))
       raise
 
+  def monitor(self, job, event_monitor, result_monitor):
+    logging.info('starting to monitor the job')
+    last_active_ms = -1
+    perf = None
+    cancel_job = False
+    waiting_for_shutdown = False
+
+    while True:
+      now = int(time.time() * 1000)  # current time in ms
+      logging.debug('now is %d', now)
+
+      curr_perf = NexmarkLauncher.get_performance(
+          job, event_monitor, result_monitor)
+      if perf is None or curr_perf.has_progress(perf):
+        last_active_ms = now
+      if self.streaming and not waiting_for_shutdown:
+        quiet_duration = (now - last_active_ms) // 1000
+        if curr_perf.event_count >= self.args.num_events and\

Review comment:
       We don't usually rely on `\` to add newlines. We use parentheses (e.g. `if (curr_perf.event_count >=.... \n\t something something):`

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -96,24 +94,41 @@
 
 
 class NexmarkLauncher(object):
+
+  # how long after some result is seen and no activity seen do we cancel job
+  DONE_DELAY = 5 * 60
+  # delay in seconds between sample perf data
+  PERF_DELAY = 20
+  # delay before cancelling the job when pipeline appears to be stuck
+  TERMINATE_DELAY = 1 * 60 * 60
+  # delay before warning when pipeline appears to be stuck
+  WARNING_DELAY = 10 * 60
+
   def __init__(self):
     self.parse_args()
-    self.uuid = str(uuid.uuid4())
-    self.topic_name = self.args.topic_name + self.uuid
-    self.subscription_name = self.args.subscription_name + self.uuid
-    publish_client = pubsub.Client(project=self.project)
-    topic = publish_client.topic(self.topic_name)
-    if topic.exists():
-      logging.info('deleting topic %s', self.topic_name)
-      topic.delete()
-    logging.info('creating topic %s', self.topic_name)
-    topic.create()
-    sub = topic.subscription(self.subscription_name)
-    if sub.exists():
-      logging.info('deleting sub %s', self.topic_name)
-      sub.delete()
-    logging.info('creating sub %s', self.topic_name)
-    sub.create()
+    self.manage_resources = self.args.manage_resources
+    self.uuid = str(uuid.uuid4()) if self.manage_resources else ''
+    self.topic_name = (
+        self.args.topic_name + self.uuid if self.args.topic_name else None)
+    self.subscription_name = (
+        self.args.subscription_name +
+        self.uuid if self.args.subscription_name else None)
+    self.pubsub_mode = self.args.pubsub_mode
+    if self.manage_resources:
+      from google.cloud import pubsub
+      publish_client = pubsub.Client(project=self.project)
+      topic = publish_client.topic(self.topic_name)
+      if topic.exists():
+        logging.info('deleting topic %s', self.topic_name)
+        topic.delete()
+      logging.info('creating topic %s', self.topic_name)
+      topic.create()
+      sub = topic.subscription(self.subscription_name)
+      if sub.exists():
+        logging.info('deleting sub %s', self.topic_name)
+        sub.delete()

Review comment:
       It looks like you can call `self.cleanup` for most of these, and save a few lines? : )

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_perf.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+performance summary for a run of nexmark query
+"""
+
+
+class NexmarkPerf(object):
+  def __init__(
+      self,
+      runtime_sec=None,
+      event_count=None,
+      event_per_sec=None,
+      result_count=None):
+    self.runtime_sec = runtime_sec if runtime_sec else -1.0
+    self.event_count = event_count if event_count else -1
+    self.event_per_sec = event_per_sec if event_per_sec else -1.0
+    self.result_count = result_count if result_count else -1
+
+  def has_progress(self, previous_perf):
+    # type: (NexmarkPerf) -> bool
+
+    """
+    Args:
+      previous_perf: a NexmarkPerf object to be compared to self
+
+    Returns:
+      True if there are activity between self and other NexmarkPerf values
+    """
+    if self.runtime_sec != previous_perf.runtime_sec or\

Review comment:
       e.g.
   ```
   if (self.r_s != ... or
       self.abc != self.def or ...):
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org