You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/06 05:55:15 UTC

[beam] branch master updated: Merge pull request #11032 from [BEAM-8335] Display rather than logging when is_in_notebook.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 457d4dc  Merge pull request #11032 from [BEAM-8335] Display rather than logging when is_in_notebook.
457d4dc is described below

commit 457d4dce27a854b1b42f97565056e4e766b39015
Author: Ning Kang <ka...@gmail.com>
AuthorDate: Thu Mar 5 21:54:58 2020 -0800

    Merge pull request #11032 from [BEAM-8335] Display rather than logging when is_in_notebook.
    
    * [BEAM-8335] Display rather than logging when is_in_notebook.
    
    1. Added an IPythonLogHandler to display logs as styled HTMLs to
       frontends connected to current IPython kernel. The implementation
       works with all IPython notebooks.
    2. The user can control the logging with native Python logging APIs.
    3. The log hanlder is only registered when current environment is in a
       notebook and it only affects module loggers under interactive
       package.
    
    Change-Id: Iaec9272e0ee61d091979822680615791f1a57cd6
    
    * isort and isolate tests from the order of them being executed
    
    Change-Id: Iaab96e178f1ee286cb064d88563459503e60ec9d
    
    * Added a TODO item to reorganize the code
    
    Change-Id: I29aa356a5310de42f43d6610081f2378ea16db9d
    
    Co-authored-by: Ning Kang <ni...@ningk-macbookpro.roam.corp.google.com>
---
 .../runners/interactive/background_caching_job.py  |  1 -
 .../runners/interactive/interactive_environment.py |  3 +
 .../runners/interactive/options/capture_control.py |  1 -
 sdks/python/apache_beam/utils/interactive_utils.py | 52 ++++++++++++
 .../apache_beam/utils/interactive_utils_test.py    | 97 ++++++++++++++++++++++
 5 files changed, 152 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index 6b43a29..63dd885 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -260,7 +260,6 @@ def is_source_to_cache_changed(
   # change by default.
   if is_changed and update_cached_source_signature:
     if ie.current_env().options.enable_capture_replay:
-      # TODO(BEAM-8335): display rather than logging when is_in_notebook.
       if not recorded_signature:
         _LOGGER.info(
             'Interactive Beam has detected you have unbounded sources '
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 08fed2c..5d9f716 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -35,6 +35,7 @@ import apache_beam as beam
 from apache_beam.runners import runner
 from apache_beam.utils.interactive_utils import is_in_ipython
 from apache_beam.utils.interactive_utils import is_in_notebook
+from apache_beam.utils.interactive_utils import register_ipython_log_handler
 
 # Interactive Beam user flow is data-centric rather than pipeline-centric, so
 # there is only one global interactive environment instance that manages
@@ -138,6 +139,8 @@ class InteractiveEnvironment(object):
       _LOGGER.warning(
           'You have limited Interactive Beam features since your '
           'ipython kernel is not connected any notebook frontend.')
+    if self._is_in_notebook:
+      register_ipython_log_handler()
 
   @property
   def options(self):
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
index b2a444e..ae6049f 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
@@ -68,7 +68,6 @@ def evict_captured_data():
   Interactive Beam. In future PCollection evaluation/visualization and pipeline
   runs, Interactive Beam will capture fresh data."""
   if ie.current_env().options.enable_capture_replay:
-    # TODO(BEAM-8335): display rather than logging when is_in_notebook.
     _LOGGER.info(
         'You have requested Interactive Beam to evict all captured '
         'data that could be deterministically replayed among multiple '
diff --git a/sdks/python/apache_beam/utils/interactive_utils.py b/sdks/python/apache_beam/utils/interactive_utils.py
index 9820db9..f7cfded 100644
--- a/sdks/python/apache_beam/utils/interactive_utils.py
+++ b/sdks/python/apache_beam/utils/interactive_utils.py
@@ -101,3 +101,55 @@ def _extract_pipeline_of_pvalueish(pvalueish):
   if hasattr(pvalue, 'pipeline'):
     return pvalue.pipeline
   return None
+
+
+# TODO(BEAM-8335): Move this function and the IPythonLogHandler to a util class
+# under interactive package when streaming cache changes are merged.
+def register_ipython_log_handler():
+  """Adds the IPython handler to a dummy parent logger (named
+  'apache_beam.runners.interactive') of all interactive modules' loggers so that
+  if is_in_notebook, logging displays the logs as HTML in frontends."""
+  # apache_beam.runners.interactive is not a module, thus this "root" logger is
+  # a dummy one created to hold the IPython log handler. When children loggers
+  # have propagate as True (by default) and logging level as NOTSET (by default,
+  # so the "root" logger's logging level takes effect), the IPython log handler
+  # will be triggered at the "root"'s own logging level. And if a child logger
+  # sets its logging level, it can take control back.
+  interactive_root_logger = logging.getLogger('apache_beam.runners.interactive')
+  if any([isinstance(h, IPythonLogHandler)
+          for h in interactive_root_logger.handlers]):
+    return
+  interactive_root_logger.setLevel(logging.INFO)
+  interactive_root_logger.addHandler(IPythonLogHandler())
+  # Disable the propagation so that logs emitted from interactive modules should
+  # only be handled by loggers and handlers defined within interactive packages.
+  interactive_root_logger.propagate = False
+
+
+class IPythonLogHandler(logging.Handler):
+  """A logging handler to display logs as HTML in IPython backed frontends."""
+  log_template = """
+            <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
+            <div class="alert alert-{level}">{msg}</div>"""
+
+  logging_to_alert_level_map = {
+      logging.CRITICAL: 'danger',
+      logging.ERROR: 'danger',
+      logging.WARNING: 'warning',
+      logging.INFO: 'info',
+      logging.DEBUG: 'dark',
+      logging.NOTSET: 'light'
+  }
+
+  def emit(self, record):
+    try:
+      from html import escape
+      from IPython.core.display import HTML
+      from IPython.core.display import display
+      display(
+          HTML(
+              self.log_template.format(
+                  level=self.logging_to_alert_level_map[record.levelno],
+                  msg=escape(record.msg))))
+    except ImportError:
+      pass  # NOOP when dependencies are not available.
diff --git a/sdks/python/apache_beam/utils/interactive_utils_test.py b/sdks/python/apache_beam/utils/interactive_utils_test.py
new file mode 100644
index 0000000..936697d
--- /dev/null
+++ b/sdks/python/apache_beam/utils/interactive_utils_test.py
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""Tests for apache_beam.utils.interactive_utils."""
+
+from __future__ import absolute_import
+
+import logging
+import sys
+import unittest
+
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.utils.interactive_utils import IPythonLogHandler
+from apache_beam.utils.interactive_utils import register_ipython_log_handler
+
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+  from unittest.mock import patch
+except ImportError:
+  from mock import patch
+
+
+@unittest.skipIf(
+    not ie.current_env().is_interactive_ready,
+    '[interactive] dependency is not installed.')
+@unittest.skipIf(
+    sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
+class InteractiveUtilsTest(unittest.TestCase):
+  def setUp(self):
+    register_ipython_log_handler()
+    self._interactive_root_logger = logging.getLogger(
+        'apache_beam.runners.interactive')
+
+  def test_ipython_log_handler_not_double_registered(self):
+    register_ipython_log_handler()
+    ipython_log_handlers = list(
+        filter(
+            lambda x: isinstance(x, IPythonLogHandler),
+            [handler for handler in self._interactive_root_logger.handlers]))
+    self.assertEqual(1, len(ipython_log_handlers))
+
+  @patch('apache_beam.utils.interactive_utils.IPythonLogHandler.emit')
+  def test_default_logging_level_is_info(self, mock_emit):
+    # By default the logging level of loggers and log handlers are NOTSET. Also,
+    # the propagation is default to true for all loggers. In this scenario, all
+    # loggings from child loggers will be propagated to the interactive "root"
+    # logger which is set to INFO level that gets handled by the sole log
+    # handler IPythonLogHandler which is set to NOTSET. The effect will be
+    # everything >= info level will be logged through IPython.core.display to
+    # all frontends connected to current kernel.
+    dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy1')
+    dummy_logger.info('info')
+    mock_emit.assert_called_once()
+    dummy_logger.debug('debug')
+    # Emit is not called, so it's still called once.
+    mock_emit.assert_called_once()
+
+  @patch('apache_beam.utils.interactive_utils.IPythonLogHandler.emit')
+  def test_child_module_logger_can_override_logging_level(self, mock_emit):
+    # When a child logger's logging level is configured to something that is not
+    # NOTSET, it takes back the logging control from the interactive "root"
+    # logger by not propagating anything.
+    dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy2')
+    dummy_logger.setLevel(logging.DEBUG)
+    mock_emit.assert_not_called()
+    dummy_logger.debug('debug')
+    # Because the dummy child logger is configured to log at DEBUG level, it
+    # now propagates DEBUG loggings to the interactive "root" logger.
+    mock_emit.assert_called_once()
+    # When the dummy child logger is configured to log at CRITICAL level, it
+    # will only propagate CRITICAL loggings to the interactive "root" logger.
+    dummy_logger.setLevel(logging.CRITICAL)
+    # Error loggings will not be handled now.
+    dummy_logger.error('error')
+    # Emit is not called, so it's still called once.
+    mock_emit.assert_called_once()
+
+
+if __name__ == '__main__':
+  unittest.main()