You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/17 00:14:50 UTC

[beam] branch master updated: [BEAM-8335] Refactor IPythonLogHandler

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3714cc5  [BEAM-8335] Refactor IPythonLogHandler
     new 428a1db  Merge pull request #11138 from [BEAM-8335] Refactor IPythonLogHandler
3714cc5 is described below

commit 3714cc5b56ff6fe5a4a7d415480b6ec7fb660fc4
Author: KevinGG <ka...@gmail.com>
AuthorDate: Mon Mar 16 13:02:41 2020 -0700

    [BEAM-8335] Refactor IPythonLogHandler
    
    1. Moved IPythonLogHandler from interactive_utils to interactive.utils.
    2. Fixed the formatting of emit function in IPythonLogHandler.
    3. Ignored index/column positioning from dataframe equivalence
    assertions.
    4. Removed check for jsons since it's not a dependency any more.
---
 .../runners/interactive/interactive_environment.py |  3 +-
 .../apache_beam/runners/interactive/utils.py       | 56 +++++++++++++
 .../apache_beam/runners/interactive/utils_test.py  | 75 ++++++++++++++++-
 sdks/python/apache_beam/utils/interactive_utils.py | 52 ------------
 .../apache_beam/utils/interactive_utils_test.py    | 97 ----------------------
 5 files changed, 129 insertions(+), 154 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index cca0e7b..c686b30 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -33,9 +33,9 @@ import sys
 
 import apache_beam as beam
 from apache_beam.runners import runner
+from apache_beam.runners.interactive.utils import register_ipython_log_handler
 from apache_beam.utils.interactive_utils import is_in_ipython
 from apache_beam.utils.interactive_utils import is_in_notebook
-from apache_beam.utils.interactive_utils import register_ipython_log_handler
 
 # Interactive Beam user flow is data-centric rather than pipeline-centric, so
 # there is only one global interactive environment instance that manages
@@ -169,7 +169,6 @@ class InteractiveEnvironment(object):
     # Check if [interactive] dependencies are installed.
     try:
       import IPython  # pylint: disable=unused-import
-      import jsons  # pylint: disable=unused-import
       import timeloop  # pylint: disable=unused-import
       from facets_overview.generic_feature_statistics_generator import GenericFeatureStatisticsGenerator  # pylint: disable=unused-import
       self._is_interactive_ready = True
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index 046cf6e..5a6b6a3 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -20,6 +20,8 @@
 
 from __future__ import absolute_import
 
+import logging
+
 import pandas as pd
 
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
@@ -76,3 +78,57 @@ def elements_to_df(elements, include_window_info=False):
     final_df = rows_df
 
   return final_df
+
+
+def register_ipython_log_handler():
+  # type: () -> None
+
+  """Adds the IPython handler to a dummy parent logger (named
+  'apache_beam.runners.interactive') of all interactive modules' loggers so that
+  if is_in_notebook, logging displays the logs as HTML in frontends.
+  """
+
+  # apache_beam.runners.interactive is not a module, thus this "root" logger is
+  # a dummy one created to hold the IPython log handler. When children loggers
+  # have propagate as True (by default) and logging level as NOTSET (by default,
+  # so the "root" logger's logging level takes effect), the IPython log handler
+  # will be triggered at the "root"'s own logging level. And if a child logger
+  # sets its logging level, it can take control back.
+  interactive_root_logger = logging.getLogger('apache_beam.runners.interactive')
+  if any([isinstance(h, IPythonLogHandler)
+          for h in interactive_root_logger.handlers]):
+    return
+  interactive_root_logger.setLevel(logging.INFO)
+  interactive_root_logger.addHandler(IPythonLogHandler())
+  # Disable the propagation so that logs emitted from interactive modules should
+  # only be handled by loggers and handlers defined within interactive packages.
+  interactive_root_logger.propagate = False
+
+
+class IPythonLogHandler(logging.Handler):
+  """A logging handler to display logs as HTML in IPython backed frontends."""
+  log_template = """
+            <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
+            <div class="alert alert-{level}">{msg}</div>"""
+
+  logging_to_alert_level_map = {
+      logging.CRITICAL: 'danger',
+      logging.ERROR: 'danger',
+      logging.WARNING: 'warning',
+      logging.INFO: 'info',
+      logging.DEBUG: 'dark',
+      logging.NOTSET: 'light'
+  }
+
+  def emit(self, record):
+    try:
+      from html import escape
+      from IPython.core.display import HTML
+      from IPython.core.display import display
+      display(
+          HTML(
+              self.log_template.format(
+                  level=self.logging_to_alert_level_map[record.levelno],
+                  msg=escape(record.msg % record.args))))
+    except ImportError:
+      pass  # NOOP when dependencies are not available.
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index f9c08c8..7d79eb7 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -17,14 +17,24 @@
 
 from __future__ import absolute_import
 
+import logging
+import sys
 import unittest
 
 import numpy as np
 import pandas as pd
 
+from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import utils
 from apache_beam.utils.windowed_value import WindowedValue
 
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+  from unittest.mock import patch
+except ImportError:
+  from mock import patch
+
 
 class ParseToDataframeTest(unittest.TestCase):
   def test_parse_windowedvalue(self):
@@ -39,7 +49,8 @@ class ParseToDataframeTest(unittest.TestCase):
 
     actual_df = utils.elements_to_df(els, include_window_info=False)
     expected_df = pd.DataFrame([['a', 2], ['b', 3]], columns=[0, 1])
-    pd.testing.assert_frame_equal(actual_df, expected_df)
+    # check_like so that ordering of indices doesn't matter.
+    pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
 
   def test_parse_windowedvalue_with_window_info(self):
     """Tests that WindowedValues are supported and have their own columns.
@@ -56,7 +67,8 @@ class ParseToDataframeTest(unittest.TestCase):
         [['a', 2, int(1e6), els[0].windows, els[0].pane_info],
          ['b', 3, int(1e6), els[1].windows, els[1].pane_info]],
         columns=[0, 1, 'event_time', 'windows', 'pane_info'])
-    pd.testing.assert_frame_equal(actual_df, expected_df)
+    # check_like so that ordering of indices doesn't matter.
+    pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
 
   def test_parse_windowedvalue_with_dicts(self):
     """Tests that dicts play well with WindowedValues.
@@ -77,7 +89,64 @@ class ParseToDataframeTest(unittest.TestCase):
         [[np.nan, 2, np.nan, 4, int(1e6), els[0].windows, els[0].pane_info],
          [1, 2, 3, np.nan, int(1e6), els[1].windows, els[1].pane_info]],
         columns=['a', 'b', 'c', 'd', 'event_time', 'windows', 'pane_info'])
-    pd.testing.assert_frame_equal(actual_df, expected_df)
+    # check_like so that ordering of indices doesn't matter.
+    pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
+
+
+@unittest.skipIf(
+    not ie.current_env().is_interactive_ready,
+    '[interactive] dependency is not installed.')
+@unittest.skipIf(
+    sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
+class IPythonLogHandlerTest(unittest.TestCase):
+  def setUp(self):
+    utils.register_ipython_log_handler()
+    self._interactive_root_logger = logging.getLogger(
+        'apache_beam.runners.interactive')
+
+  def test_ipython_log_handler_not_double_registered(self):
+    utils.register_ipython_log_handler()
+    ipython_log_handlers = list(
+        filter(
+            lambda x: isinstance(x, utils.IPythonLogHandler),
+            [handler for handler in self._interactive_root_logger.handlers]))
+    self.assertEqual(1, len(ipython_log_handlers))
+
+  @patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
+  def test_default_logging_level_is_info(self, mock_emit):
+    # By default the logging level of loggers and log handlers are NOTSET. Also,
+    # the propagation is default to true for all loggers. In this scenario, all
+    # loggings from child loggers will be propagated to the interactive "root"
+    # logger which is set to INFO level that gets handled by the sole log
+    # handler IPythonLogHandler which is set to NOTSET. The effect will be
+    # everything >= info level will be logged through IPython.core.display to
+    # all frontends connected to current kernel.
+    dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy1')
+    dummy_logger.info('info')
+    mock_emit.assert_called_once()
+    dummy_logger.debug('debug')
+    # Emit is not called, so it's still called once.
+    mock_emit.assert_called_once()
+
+  @patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
+  def test_child_module_logger_can_override_logging_level(self, mock_emit):
+    # When a child logger's logging level is configured to something that is not
+    # NOTSET, it takes back the logging control from the interactive "root"
+    # logger by not propagating anything.
+    dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy2')
+    dummy_logger.setLevel(logging.DEBUG)
+    mock_emit.assert_not_called()
+    dummy_logger.debug('debug')
+    # Because the dummy child logger is configured to log at DEBUG level, it
+    # now propagates DEBUG loggings to the interactive "root" logger.
+    mock_emit.assert_called_once()
+    # When the dummy child logger is configured to log at CRITICAL level, it
+    # will only propagate CRITICAL loggings to the interactive "root" logger.
+    dummy_logger.setLevel(logging.CRITICAL)
+    # Error loggings will not be handled now.
+    dummy_logger.error('error')
+    # Emit is not called, so it's still called once.
+    mock_emit.assert_called_once()
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/utils/interactive_utils.py b/sdks/python/apache_beam/utils/interactive_utils.py
index f7cfded..9820db9 100644
--- a/sdks/python/apache_beam/utils/interactive_utils.py
+++ b/sdks/python/apache_beam/utils/interactive_utils.py
@@ -101,55 +101,3 @@ def _extract_pipeline_of_pvalueish(pvalueish):
   if hasattr(pvalue, 'pipeline'):
     return pvalue.pipeline
   return None
-
-
-# TODO(BEAM-8335): Move this function and the IPythonLogHandler to a util class
-# under interactive package when streaming cache changes are merged.
-def register_ipython_log_handler():
-  """Adds the IPython handler to a dummy parent logger (named
-  'apache_beam.runners.interactive') of all interactive modules' loggers so that
-  if is_in_notebook, logging displays the logs as HTML in frontends."""
-  # apache_beam.runners.interactive is not a module, thus this "root" logger is
-  # a dummy one created to hold the IPython log handler. When children loggers
-  # have propagate as True (by default) and logging level as NOTSET (by default,
-  # so the "root" logger's logging level takes effect), the IPython log handler
-  # will be triggered at the "root"'s own logging level. And if a child logger
-  # sets its logging level, it can take control back.
-  interactive_root_logger = logging.getLogger('apache_beam.runners.interactive')
-  if any([isinstance(h, IPythonLogHandler)
-          for h in interactive_root_logger.handlers]):
-    return
-  interactive_root_logger.setLevel(logging.INFO)
-  interactive_root_logger.addHandler(IPythonLogHandler())
-  # Disable the propagation so that logs emitted from interactive modules should
-  # only be handled by loggers and handlers defined within interactive packages.
-  interactive_root_logger.propagate = False
-
-
-class IPythonLogHandler(logging.Handler):
-  """A logging handler to display logs as HTML in IPython backed frontends."""
-  log_template = """
-            <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
-            <div class="alert alert-{level}">{msg}</div>"""
-
-  logging_to_alert_level_map = {
-      logging.CRITICAL: 'danger',
-      logging.ERROR: 'danger',
-      logging.WARNING: 'warning',
-      logging.INFO: 'info',
-      logging.DEBUG: 'dark',
-      logging.NOTSET: 'light'
-  }
-
-  def emit(self, record):
-    try:
-      from html import escape
-      from IPython.core.display import HTML
-      from IPython.core.display import display
-      display(
-          HTML(
-              self.log_template.format(
-                  level=self.logging_to_alert_level_map[record.levelno],
-                  msg=escape(record.msg))))
-    except ImportError:
-      pass  # NOOP when dependencies are not available.
diff --git a/sdks/python/apache_beam/utils/interactive_utils_test.py b/sdks/python/apache_beam/utils/interactive_utils_test.py
deleted file mode 100644
index 936697d..0000000
--- a/sdks/python/apache_beam/utils/interactive_utils_test.py
+++ /dev/null
@@ -1,97 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# pytype: skip-file
-
-"""Tests for apache_beam.utils.interactive_utils."""
-
-from __future__ import absolute_import
-
-import logging
-import sys
-import unittest
-
-from apache_beam.runners.interactive import interactive_environment as ie
-from apache_beam.utils.interactive_utils import IPythonLogHandler
-from apache_beam.utils.interactive_utils import register_ipython_log_handler
-
-# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
-# unittest.mock module.
-try:
-  from unittest.mock import patch
-except ImportError:
-  from mock import patch
-
-
-@unittest.skipIf(
-    not ie.current_env().is_interactive_ready,
-    '[interactive] dependency is not installed.')
-@unittest.skipIf(
-    sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
-class InteractiveUtilsTest(unittest.TestCase):
-  def setUp(self):
-    register_ipython_log_handler()
-    self._interactive_root_logger = logging.getLogger(
-        'apache_beam.runners.interactive')
-
-  def test_ipython_log_handler_not_double_registered(self):
-    register_ipython_log_handler()
-    ipython_log_handlers = list(
-        filter(
-            lambda x: isinstance(x, IPythonLogHandler),
-            [handler for handler in self._interactive_root_logger.handlers]))
-    self.assertEqual(1, len(ipython_log_handlers))
-
-  @patch('apache_beam.utils.interactive_utils.IPythonLogHandler.emit')
-  def test_default_logging_level_is_info(self, mock_emit):
-    # By default the logging level of loggers and log handlers are NOTSET. Also,
-    # the propagation is default to true for all loggers. In this scenario, all
-    # loggings from child loggers will be propagated to the interactive "root"
-    # logger which is set to INFO level that gets handled by the sole log
-    # handler IPythonLogHandler which is set to NOTSET. The effect will be
-    # everything >= info level will be logged through IPython.core.display to
-    # all frontends connected to current kernel.
-    dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy1')
-    dummy_logger.info('info')
-    mock_emit.assert_called_once()
-    dummy_logger.debug('debug')
-    # Emit is not called, so it's still called once.
-    mock_emit.assert_called_once()
-
-  @patch('apache_beam.utils.interactive_utils.IPythonLogHandler.emit')
-  def test_child_module_logger_can_override_logging_level(self, mock_emit):
-    # When a child logger's logging level is configured to something that is not
-    # NOTSET, it takes back the logging control from the interactive "root"
-    # logger by not propagating anything.
-    dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy2')
-    dummy_logger.setLevel(logging.DEBUG)
-    mock_emit.assert_not_called()
-    dummy_logger.debug('debug')
-    # Because the dummy child logger is configured to log at DEBUG level, it
-    # now propagates DEBUG loggings to the interactive "root" logger.
-    mock_emit.assert_called_once()
-    # When the dummy child logger is configured to log at CRITICAL level, it
-    # will only propagate CRITICAL loggings to the interactive "root" logger.
-    dummy_logger.setLevel(logging.CRITICAL)
-    # Error loggings will not be handled now.
-    dummy_logger.error('error')
-    # Emit is not called, so it's still called once.
-    mock_emit.assert_called_once()
-
-
-if __name__ == '__main__':
-  unittest.main()