You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/17 00:14:50 UTC
[beam] branch master updated: [BEAM-8335] Refactor IPythonLogHandler
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3714cc5 [BEAM-8335] Refactor IPythonLogHandler
new 428a1db Merge pull request #11138 from [BEAM-8335] Refactor IPythonLogHandler
3714cc5 is described below
commit 3714cc5b56ff6fe5a4a7d415480b6ec7fb660fc4
Author: KevinGG <ka...@gmail.com>
AuthorDate: Mon Mar 16 13:02:41 2020 -0700
[BEAM-8335] Refactor IPythonLogHandler
1. Moved IPythonLogHandler from interactive_utils to interactive.utils.
2. Fixed the formatting of emit function in IPythonLogHandler.
3. Ignored index/column positioning from dataframe equivalence
assertions.
4. Removed check for jsons since it's not a dependency any more.
---
.../runners/interactive/interactive_environment.py | 3 +-
.../apache_beam/runners/interactive/utils.py | 56 +++++++++++++
.../apache_beam/runners/interactive/utils_test.py | 75 ++++++++++++++++-
sdks/python/apache_beam/utils/interactive_utils.py | 52 ------------
.../apache_beam/utils/interactive_utils_test.py | 97 ----------------------
5 files changed, 129 insertions(+), 154 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index cca0e7b..c686b30 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -33,9 +33,9 @@ import sys
import apache_beam as beam
from apache_beam.runners import runner
+from apache_beam.runners.interactive.utils import register_ipython_log_handler
from apache_beam.utils.interactive_utils import is_in_ipython
from apache_beam.utils.interactive_utils import is_in_notebook
-from apache_beam.utils.interactive_utils import register_ipython_log_handler
# Interactive Beam user flow is data-centric rather than pipeline-centric, so
# there is only one global interactive environment instance that manages
@@ -169,7 +169,6 @@ class InteractiveEnvironment(object):
# Check if [interactive] dependencies are installed.
try:
import IPython # pylint: disable=unused-import
- import jsons # pylint: disable=unused-import
import timeloop # pylint: disable=unused-import
from facets_overview.generic_feature_statistics_generator import GenericFeatureStatisticsGenerator # pylint: disable=unused-import
self._is_interactive_ready = True
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index 046cf6e..5a6b6a3 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -20,6 +20,8 @@
from __future__ import absolute_import
+import logging
+
import pandas as pd
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
@@ -76,3 +78,57 @@ def elements_to_df(elements, include_window_info=False):
final_df = rows_df
return final_df
+
+
+def register_ipython_log_handler():
+ # type: () -> None
+
+ """Adds the IPython handler to a dummy parent logger (named
+ 'apache_beam.runners.interactive') of all interactive modules' loggers so that
+ if is_in_notebook, logging displays the logs as HTML in frontends.
+ """
+
+ # apache_beam.runners.interactive is not a module, thus this "root" logger is
+ # a dummy one created to hold the IPython log handler. When children loggers
+ # have propagate as True (by default) and logging level as NOTSET (by default,
+ # so the "root" logger's logging level takes effect), the IPython log handler
+ # will be triggered at the "root"'s own logging level. And if a child logger
+ # sets its logging level, it can take control back.
+ interactive_root_logger = logging.getLogger('apache_beam.runners.interactive')
+ if any([isinstance(h, IPythonLogHandler)
+ for h in interactive_root_logger.handlers]):
+ return
+ interactive_root_logger.setLevel(logging.INFO)
+ interactive_root_logger.addHandler(IPythonLogHandler())
+ # Disable the propagation so that logs emitted from interactive modules should
+ # only be handled by loggers and handlers defined within interactive packages.
+ interactive_root_logger.propagate = False
+
+
+class IPythonLogHandler(logging.Handler):
+ """A logging handler to display logs as HTML in IPython backed frontends."""
+ log_template = """
+ <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
+ <div class="alert alert-{level}">{msg}</div>"""
+
+ logging_to_alert_level_map = {
+ logging.CRITICAL: 'danger',
+ logging.ERROR: 'danger',
+ logging.WARNING: 'warning',
+ logging.INFO: 'info',
+ logging.DEBUG: 'dark',
+ logging.NOTSET: 'light'
+ }
+
+ def emit(self, record):
+ try:
+ from html import escape
+ from IPython.core.display import HTML
+ from IPython.core.display import display
+ display(
+ HTML(
+ self.log_template.format(
+ level=self.logging_to_alert_level_map[record.levelno],
+ msg=escape(record.msg % record.args))))
+ except ImportError:
+ pass # NOOP when dependencies are not available.
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index f9c08c8..7d79eb7 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -17,14 +17,24 @@
from __future__ import absolute_import
+import logging
+import sys
import unittest
import numpy as np
import pandas as pd
+from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import utils
from apache_beam.utils.windowed_value import WindowedValue
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+ from unittest.mock import patch
+except ImportError:
+ from mock import patch
+
class ParseToDataframeTest(unittest.TestCase):
def test_parse_windowedvalue(self):
@@ -39,7 +49,8 @@ class ParseToDataframeTest(unittest.TestCase):
actual_df = utils.elements_to_df(els, include_window_info=False)
expected_df = pd.DataFrame([['a', 2], ['b', 3]], columns=[0, 1])
- pd.testing.assert_frame_equal(actual_df, expected_df)
+ # check_like so that ordering of indices doesn't matter.
+ pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
def test_parse_windowedvalue_with_window_info(self):
"""Tests that WindowedValues are supported and have their own columns.
@@ -56,7 +67,8 @@ class ParseToDataframeTest(unittest.TestCase):
[['a', 2, int(1e6), els[0].windows, els[0].pane_info],
['b', 3, int(1e6), els[1].windows, els[1].pane_info]],
columns=[0, 1, 'event_time', 'windows', 'pane_info'])
- pd.testing.assert_frame_equal(actual_df, expected_df)
+ # check_like so that ordering of indices doesn't matter.
+ pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
def test_parse_windowedvalue_with_dicts(self):
"""Tests that dicts play well with WindowedValues.
@@ -77,7 +89,64 @@ class ParseToDataframeTest(unittest.TestCase):
[[np.nan, 2, np.nan, 4, int(1e6), els[0].windows, els[0].pane_info],
[1, 2, 3, np.nan, int(1e6), els[1].windows, els[1].pane_info]],
columns=['a', 'b', 'c', 'd', 'event_time', 'windows', 'pane_info'])
- pd.testing.assert_frame_equal(actual_df, expected_df)
+ # check_like so that ordering of indices doesn't matter.
+ pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
+
+
+@unittest.skipIf(
+ not ie.current_env().is_interactive_ready,
+ '[interactive] dependency is not installed.')
+@unittest.skipIf(
+ sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
+class IPythonLogHandlerTest(unittest.TestCase):
+ def setUp(self):
+ utils.register_ipython_log_handler()
+ self._interactive_root_logger = logging.getLogger(
+ 'apache_beam.runners.interactive')
+
+ def test_ipython_log_handler_not_double_registered(self):
+ utils.register_ipython_log_handler()
+ ipython_log_handlers = list(
+ filter(
+ lambda x: isinstance(x, utils.IPythonLogHandler),
+ [handler for handler in self._interactive_root_logger.handlers]))
+ self.assertEqual(1, len(ipython_log_handlers))
+
+ @patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
+ def test_default_logging_level_is_info(self, mock_emit):
+ # By default the logging level of loggers and log handlers are NOTSET. Also,
+ # the propagation is default to true for all loggers. In this scenario, all
+ # loggings from child loggers will be propagated to the interactive "root"
+ # logger which is set to INFO level that gets handled by the sole log
+ # handler IPythonLogHandler which is set to NOTSET. The effect will be
+ # everything >= info level will be logged through IPython.core.display to
+ # all frontends connected to current kernel.
+ dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy1')
+ dummy_logger.info('info')
+ mock_emit.assert_called_once()
+ dummy_logger.debug('debug')
+ # Emit is not called, so it's still called once.
+ mock_emit.assert_called_once()
+
+ @patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
+ def test_child_module_logger_can_override_logging_level(self, mock_emit):
+ # When a child logger's logging level is configured to something that is not
+ # NOTSET, it takes back the logging control from the interactive "root"
+ # logger by not propagating anything.
+ dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy2')
+ dummy_logger.setLevel(logging.DEBUG)
+ mock_emit.assert_not_called()
+ dummy_logger.debug('debug')
+ # Because the dummy child logger is configured to log at DEBUG level, it
+ # now propagates DEBUG loggings to the interactive "root" logger.
+ mock_emit.assert_called_once()
+ # When the dummy child logger is configured to log at CRITICAL level, it
+ # will only propagate CRITICAL loggings to the interactive "root" logger.
+ dummy_logger.setLevel(logging.CRITICAL)
+ # Error loggings will not be handled now.
+ dummy_logger.error('error')
+ # Emit is not called, so it's still called once.
+ mock_emit.assert_called_once()
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/utils/interactive_utils.py b/sdks/python/apache_beam/utils/interactive_utils.py
index f7cfded..9820db9 100644
--- a/sdks/python/apache_beam/utils/interactive_utils.py
+++ b/sdks/python/apache_beam/utils/interactive_utils.py
@@ -101,55 +101,3 @@ def _extract_pipeline_of_pvalueish(pvalueish):
if hasattr(pvalue, 'pipeline'):
return pvalue.pipeline
return None
-
-
-# TODO(BEAM-8335): Move this function and the IPythonLogHandler to a util class
-# under interactive package when streaming cache changes are merged.
-def register_ipython_log_handler():
- """Adds the IPython handler to a dummy parent logger (named
- 'apache_beam.runners.interactive') of all interactive modules' loggers so that
- if is_in_notebook, logging displays the logs as HTML in frontends."""
- # apache_beam.runners.interactive is not a module, thus this "root" logger is
- # a dummy one created to hold the IPython log handler. When children loggers
- # have propagate as True (by default) and logging level as NOTSET (by default,
- # so the "root" logger's logging level takes effect), the IPython log handler
- # will be triggered at the "root"'s own logging level. And if a child logger
- # sets its logging level, it can take control back.
- interactive_root_logger = logging.getLogger('apache_beam.runners.interactive')
- if any([isinstance(h, IPythonLogHandler)
- for h in interactive_root_logger.handlers]):
- return
- interactive_root_logger.setLevel(logging.INFO)
- interactive_root_logger.addHandler(IPythonLogHandler())
- # Disable the propagation so that logs emitted from interactive modules should
- # only be handled by loggers and handlers defined within interactive packages.
- interactive_root_logger.propagate = False
-
-
-class IPythonLogHandler(logging.Handler):
- """A logging handler to display logs as HTML in IPython backed frontends."""
- log_template = """
- <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
- <div class="alert alert-{level}">{msg}</div>"""
-
- logging_to_alert_level_map = {
- logging.CRITICAL: 'danger',
- logging.ERROR: 'danger',
- logging.WARNING: 'warning',
- logging.INFO: 'info',
- logging.DEBUG: 'dark',
- logging.NOTSET: 'light'
- }
-
- def emit(self, record):
- try:
- from html import escape
- from IPython.core.display import HTML
- from IPython.core.display import display
- display(
- HTML(
- self.log_template.format(
- level=self.logging_to_alert_level_map[record.levelno],
- msg=escape(record.msg))))
- except ImportError:
- pass # NOOP when dependencies are not available.
diff --git a/sdks/python/apache_beam/utils/interactive_utils_test.py b/sdks/python/apache_beam/utils/interactive_utils_test.py
deleted file mode 100644
index 936697d..0000000
--- a/sdks/python/apache_beam/utils/interactive_utils_test.py
+++ /dev/null
@@ -1,97 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# pytype: skip-file
-
-"""Tests for apache_beam.utils.interactive_utils."""
-
-from __future__ import absolute_import
-
-import logging
-import sys
-import unittest
-
-from apache_beam.runners.interactive import interactive_environment as ie
-from apache_beam.utils.interactive_utils import IPythonLogHandler
-from apache_beam.utils.interactive_utils import register_ipython_log_handler
-
-# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
-# unittest.mock module.
-try:
- from unittest.mock import patch
-except ImportError:
- from mock import patch
-
-
-@unittest.skipIf(
- not ie.current_env().is_interactive_ready,
- '[interactive] dependency is not installed.')
-@unittest.skipIf(
- sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
-class InteractiveUtilsTest(unittest.TestCase):
- def setUp(self):
- register_ipython_log_handler()
- self._interactive_root_logger = logging.getLogger(
- 'apache_beam.runners.interactive')
-
- def test_ipython_log_handler_not_double_registered(self):
- register_ipython_log_handler()
- ipython_log_handlers = list(
- filter(
- lambda x: isinstance(x, IPythonLogHandler),
- [handler for handler in self._interactive_root_logger.handlers]))
- self.assertEqual(1, len(ipython_log_handlers))
-
- @patch('apache_beam.utils.interactive_utils.IPythonLogHandler.emit')
- def test_default_logging_level_is_info(self, mock_emit):
- # By default the logging level of loggers and log handlers are NOTSET. Also,
- # the propagation is default to true for all loggers. In this scenario, all
- # loggings from child loggers will be propagated to the interactive "root"
- # logger which is set to INFO level that gets handled by the sole log
- # handler IPythonLogHandler which is set to NOTSET. The effect will be
- # everything >= info level will be logged through IPython.core.display to
- # all frontends connected to current kernel.
- dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy1')
- dummy_logger.info('info')
- mock_emit.assert_called_once()
- dummy_logger.debug('debug')
- # Emit is not called, so it's still called once.
- mock_emit.assert_called_once()
-
- @patch('apache_beam.utils.interactive_utils.IPythonLogHandler.emit')
- def test_child_module_logger_can_override_logging_level(self, mock_emit):
- # When a child logger's logging level is configured to something that is not
- # NOTSET, it takes back the logging control from the interactive "root"
- # logger by not propagating anything.
- dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy2')
- dummy_logger.setLevel(logging.DEBUG)
- mock_emit.assert_not_called()
- dummy_logger.debug('debug')
- # Because the dummy child logger is configured to log at DEBUG level, it
- # now propagates DEBUG loggings to the interactive "root" logger.
- mock_emit.assert_called_once()
- # When the dummy child logger is configured to log at CRITICAL level, it
- # will only propagate CRITICAL loggings to the interactive "root" logger.
- dummy_logger.setLevel(logging.CRITICAL)
- # Error loggings will not be handled now.
- dummy_logger.error('error')
- # Emit is not called, so it's still called once.
- mock_emit.assert_called_once()
-
-
-if __name__ == '__main__':
- unittest.main()