You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/18 00:29:07 UTC

[GitHub] [airflow] kaxil opened a new pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

kaxil opened a new pull request #9363:
URL: https://github.com/apache/airflow/pull/9363


   This is to fix duplicate log lines in Task logs:
   
   Previous:
   ```
   [2020-06-17 13:45:03,708] {taskinstance.py:1115} INFO - Marking task as SUCCESS.dag_id=example_bash_operator, task_id=runme_2, execution_date=20200615T000000, start_date=20200617T124502, end_date=20200617T124503
   [2020-06-17 13:45:07,598] {logging_mixin.py:91} INFO - [2020-06-17 13:45:07,598] {local_task_job.py:103} INFO - Task exited with return code 0
   ```
   
   With this change:
   
   ```
   [2020-06-18 01:23:28,204] {taskinstance.py:1115} INFO - Marking task as SUCCESS.dag_id=example_bash_operator, task_id=runme_0, execution_date=20200618T002252, start_date=20200618T002327, end_date=20200618T002328
   [2020-06-18 01:23:32,051] {local_task_job.py:104} INFO - Task exited with return code 0
   ```
   
   Related issue: https://github.com/apache/airflow/issues/8972
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aneesh-joseph commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
aneesh-joseph commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445298308



##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -73,11 +74,24 @@ def _start_by_fork(self):  # pylint: disable=inconsistent-return-statements
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
+
             proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
             if hasattr(args, "job_id"):
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
 
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            for handler in airflow_logger_handlers:
+                if isinstance(handler, FileTaskHandler):

Review comment:
       not very sure on this, but can this be 
   
   `if handler.name == 'task' or isinstance(handler, FileTaskHandler):`
   
   instead? This will help if we are using a custom task log Handler which doesn't extend from `FileTaskHandler`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445493107



##########
File path: airflow/config_templates/config.yml
##########
@@ -517,6 +517,17 @@
       type: string
       example: ~
       default: "task"
+    - name: donot_modify_handlers
+      description: |
+        If ``donot_modify_handlers = True``, we do not modify logging handlers in task_run command.
+        If the flag is set to False, we remove all handlers from the root logger.
+        and add all handlers from 'airflow.task' logger to the root Logger. This is done
+        to get all the logs from the print & log statements in the DAG files before a task is run
+        The handlers are restored after the task completes execution.
+      version_added: ~
+      type: boolean
+      example: ~
+      default: "False"

Review comment:
       in https://github.com/apache/airflow/pull/9363/commits/6f8225fa5f5cd84cd94c40aefef736b8328b6529




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#issuecomment-645943964


   Might this also fix the issue where some logs are disappearing? #8484


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r444519387



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -182,9 +184,25 @@ def task_run(args, dag=None):
     if args.interactive:
         _run_task_by_selected_method(args, dag, ti)
     else:
+        airflow_logger = None
+        airflow_logger_handlers = []
+        if CAN_FORK:
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            airflow_logger = logging.getLogger('airflow')
+            for handler in airflow_logger_handlers:
+                airflow_logger.addHandler(handler)
+            airflow_logger.setLevel(logging.getLogger('airflow.task').level)
+            airflow_logger.propagate = False

Review comment:
       Is this the best place to put this? Could it perhaps instead be put in StandardTaskRUnner._start_by_fork?
   
   This all feels a little odd and out of place here. At the very least this needs loads of explanatory comments!
   
   What does this change do if someone runs `airflow task run` directly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#issuecomment-646395583


   > Might this also fix the issue where some logs are disappearing? #8484
   
   Should fix now with the latest commit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mjpieters commented on a change in pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
mjpieters commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r517297855



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -177,14 +177,44 @@ def task_run(args, dag=None):
     ti.init_run_context(raw=args.raw)
 
     hostname = get_hostname()
+
     print(f"Running {ti} on host {hostname}")
 
     if args.interactive:
         _run_task_by_selected_method(args, dag, ti)
     else:
-        with redirect_stdout(StreamLogWriter(ti.log, logging.INFO)), \
-                redirect_stderr(StreamLogWriter(ti.log, logging.WARN)):
-            _run_task_by_selected_method(args, dag, ti)
+        if settings.DONOT_MODIFY_HANDLERS:
+            with redirect_stdout(StreamLogWriter(ti.log, logging.INFO)), \
+                    redirect_stderr(StreamLogWriter(ti.log, logging.WARN)):
+                _run_task_by_selected_method(args, dag, ti)
+        else:
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            root_logger_handlers = root_logger.handlers

Review comment:
       This is just a reference to the list, **not a copy**. The list is emptied by `root_logger.removeHandler()` further down. So you end up with an empty list of handlers when you later try to reset.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445493620



##########
File path: tests/cli/commands/test_task_command.py
##########
@@ -245,6 +251,99 @@ def test_local_run(self):
         self.assertEqual(state, State.SUCCESS)
 
 
+class TestLogsfromTaskRunCommand(unittest.TestCase):
+
+    def setUp(self) -> None:
+        self.dag_id = "test_logging_dag"
+        self.task_id = "test_task"
+        reset(self.dag_id)
+        self.execution_date_str = timezone.make_aware(datetime(2017, 1, 1)).isoformat()
+        self.log_dir = conf.get('logging', 'base_log_folder')
+        self.log_filename = f"{self.dag_id}/{self.task_id}/{self.execution_date_str}/1.log"
+        self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename)
+        self.parser = cli_parser.get_parser()
+        try:
+            os.remove(self.ti_log_file_path)
+        except OSError:
+            pass
+
+    def tearDown(self) -> None:
+        reset(self.dag_id)
+        try:
+            os.remove(self.ti_log_file_path)
+        except OSError:
+            pass
+
+    def assert_log_line(self, text, logs_list):
+        """
+        Get Log Line and assert only 1 Entry exists with the given text. Also check that
+        "logging_mixin" line does not appear in that log line to avoid duplicate loggigng as below:
+
+        [2020-06-24 16:47:23,537] {logging_mixin.py:91} INFO - [2020-06-24 16:47:23,536] {python.py:135}
+        """
+        log_lines = [log for log in logs_list if text in log]
+        self.assertEqual(len(log_lines), 1)
+        log_line = log_lines[0]
+        if "Print" not in log_line:

Review comment:
       Updated in https://github.com/apache/airflow/pull/9363/commits/aab851430817fd793fc2fb1d0cd509eecd8c25ac




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aneesh-joseph commented on pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
aneesh-joseph commented on pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#issuecomment-649206229


   thank you @kaxil , I tried out this change on 1.10.10 and it fixed my logging issues, hope this makes into 1.10.11 :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#issuecomment-645942703


   Related or fixes?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r444833072



##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -16,16 +16,17 @@
 # specific language governing permissions and limitations
 # under the License.
 """Standard task runner"""
-
+import logging
 import os
 
 import psutil
 from setproctitle import setproctitle  # pylint: disable=no-name-in-module
 
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.process_utils import reap_process_group
 
-CAN_FORK = hasattr(os, 'fork')
+CAN_FORK = True

Review comment:
       Don't think you want this change, do you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aneesh-joseph commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
aneesh-joseph commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445298308



##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -73,11 +74,24 @@ def _start_by_fork(self):  # pylint: disable=inconsistent-return-statements
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
+
             proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
             if hasattr(args, "job_id"):
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
 
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            for handler in airflow_logger_handlers:
+                if isinstance(handler, FileTaskHandler):

Review comment:
       not sure on this, but can this be 
   
   `if handler.name == 'task':`
   
   instead? This will help if we are using a custom task log Handler which doesn't extend from `FileTaskHandler`

##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -73,11 +74,24 @@ def _start_by_fork(self):  # pylint: disable=inconsistent-return-statements
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
+
             proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
             if hasattr(args, "job_id"):
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
 
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            for handler in airflow_logger_handlers:
+                if isinstance(handler, FileTaskHandler):

Review comment:
       not very sure on this, but can this be 
   
   `if handler.name == 'task':`
   
   instead? This will help if we are using a custom task log Handler which doesn't extend from `FileTaskHandler`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445459956



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -185,6 +204,14 @@ def task_run(args, dag=None):
         with redirect_stdout(StreamLogWriter(ti.log, logging.INFO)), \
                 redirect_stderr(StreamLogWriter(ti.log, logging.WARN)):
             _run_task_by_selected_method(args, dag, ti)
+
+    for handler in airflow_logger_handlers:
+        if isinstance(handler, FileTaskHandler):
+            root_logger.removeHandler(handler)
+    for handler in root_logger_handlers:
+        if isinstance(handler, RedirectStdHandler):
+            root_logger.addHandler(handler)

Review comment:
       Done

##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -73,11 +74,24 @@ def _start_by_fork(self):  # pylint: disable=inconsistent-return-statements
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
+
             proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
             if hasattr(args, "job_id"):
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
 
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            for handler in airflow_logger_handlers:
+                if isinstance(handler, FileTaskHandler):

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445490607



##########
File path: airflow/config_templates/config.yml
##########
@@ -517,6 +517,17 @@
       type: string
       example: ~
       default: "task"
+    - name: donot_modify_handlers
+      description: |
+        If ``donot_modify_handlers = True``, we do not modify logging handlers in task_run command.
+        If the flag is set to False, we remove all handlers from the root logger.
+        and add all handlers from 'airflow.task' logger to the root Logger. This is done
+        to get all the logs from the print & log statements in the DAG files before a task is run
+        The handlers are restored after the task completes execution.
+      version_added: ~
+      type: boolean
+      example: ~
+      default: "False"

Review comment:
       Updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445424575



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -185,6 +204,14 @@ def task_run(args, dag=None):
         with redirect_stdout(StreamLogWriter(ti.log, logging.INFO)), \
                 redirect_stderr(StreamLogWriter(ti.log, logging.WARN)):
             _run_task_by_selected_method(args, dag, ti)
+
+    for handler in airflow_logger_handlers:
+        if isinstance(handler, FileTaskHandler):
+            root_logger.removeHandler(handler)
+    for handler in root_logger_handlers:
+        if isinstance(handler, RedirectStdHandler):
+            root_logger.addHandler(handler)

Review comment:
       We should add a comment saying why we're doing this _right_ before we call `logging.shutdown` otherwise it looks like it's not needed.
   
   (We need this as one (celery) worker process can now call this multiple times, so if we don't reset this then logs from next task would go to the wrong place, right?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445490640



##########
File path: tests/cli/commands/test_task_command.py
##########
@@ -245,6 +251,99 @@ def test_local_run(self):
         self.assertEqual(state, State.SUCCESS)
 
 
+class TestLogsfromTaskRunCommand(unittest.TestCase):
+
+    def setUp(self) -> None:
+        self.dag_id = "test_logging_dag"
+        self.task_id = "test_task"
+        reset(self.dag_id)
+        self.execution_date_str = timezone.make_aware(datetime(2017, 1, 1)).isoformat()
+        self.log_dir = conf.get('logging', 'base_log_folder')
+        self.log_filename = f"{self.dag_id}/{self.task_id}/{self.execution_date_str}/1.log"
+        self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename)
+        self.parser = cli_parser.get_parser()
+        try:
+            os.remove(self.ti_log_file_path)
+        except OSError:
+            pass
+
+    def tearDown(self) -> None:
+        reset(self.dag_id)
+        try:
+            os.remove(self.ti_log_file_path)
+        except OSError:
+            pass
+
+    def assert_log_line(self, text, logs_list):
+        """
+        Get Log Line and assert only 1 Entry exists with the given text. Also check that
+        "logging_mixin" line does not appear in that log line to avoid duplicate loggigng as below:
+
+        [2020-06-24 16:47:23,537] {logging_mixin.py:91} INFO - [2020-06-24 16:47:23,536] {python.py:135}
+        """
+        log_lines = [log for log in logs_list if text in log]
+        self.assertEqual(len(log_lines), 1)
+        log_line = log_lines[0]
+        if "Print" not in log_line:

Review comment:
       ```suggestion
       def assert_log_line(self, text, logs_list, expect_from_logging_mixin=False):
           """
           Get Log Line and assert only 1 Entry exists with the given text. Also check that
           "logging_mixin" line does not appear in that log line to avoid duplicate loggigng as below:
   
           [2020-06-24 16:47:23,537] {logging_mixin.py:91} INFO - [2020-06-24 16:47:23,536] {python.py:135}
           """
           log_lines = [log for log in logs_list if text in log]
           self.assertEqual(len(log_lines), 1)
           log_line = log_lines[0]
           if expect_from_logging_mixin:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r442725043



##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -73,6 +72,9 @@ def _start_by_fork(self):  # pylint: disable=inconsistent-return-statements
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)

Review comment:
       Was this debugging or did you mean to leave it in?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #9363:
URL: https://github.com/apache/airflow/pull/9363


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445423264



##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -73,11 +74,24 @@ def _start_by_fork(self):  # pylint: disable=inconsistent-return-statements
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
+
             proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
             if hasattr(args, "job_id"):
                 proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
 
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            for handler in airflow_logger_handlers:
+                if isinstance(handler, FileTaskHandler):

Review comment:
       I think actually we should just copy any and all handlers assigned to airflow.task (i.e. remove the if altogether) -- the purpose here is to make all logs produced when running a task go to the task file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #9363: Use 'airflow.task' Logger for LocalTaskJob

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r442734482



##########
File path: airflow/task/task_runner/standard_task_runner.py
##########
@@ -73,6 +72,9 @@ def _start_by_fork(self):  # pylint: disable=inconsistent-return-statements
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)

Review comment:
       I meant to leave it in. Mainly to have similar log statements both when using "os.fork" and "subprocess"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r445489669



##########
File path: airflow/config_templates/config.yml
##########
@@ -517,6 +517,17 @@
       type: string
       example: ~
       default: "task"
+    - name: donot_modify_handlers
+      description: |
+        If ``donot_modify_handlers = True``, we do not modify logging handlers in task_run command.
+        If the flag is set to False, we remove all handlers from the root logger.
+        and add all handlers from 'airflow.task' logger to the root Logger. This is done
+        to get all the logs from the print & log statements in the DAG files before a task is run
+        The handlers are restored after the task completes execution.
+      version_added: ~
+      type: boolean
+      example: ~
+      default: "False"

Review comment:
       I think this should be a hidden setting -- i.e. not actually documented, nor appearing in default_airflow.cfg




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mjpieters commented on a change in pull request #9363: Fix logging issue when running tasks

Posted by GitBox <gi...@apache.org>.
mjpieters commented on a change in pull request #9363:
URL: https://github.com/apache/airflow/pull/9363#discussion_r517298025



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -177,14 +177,44 @@ def task_run(args, dag=None):
     ti.init_run_context(raw=args.raw)
 
     hostname = get_hostname()
+
     print(f"Running {ti} on host {hostname}")
 
     if args.interactive:
         _run_task_by_selected_method(args, dag, ti)
     else:
-        with redirect_stdout(StreamLogWriter(ti.log, logging.INFO)), \
-                redirect_stderr(StreamLogWriter(ti.log, logging.WARN)):
-            _run_task_by_selected_method(args, dag, ti)
+        if settings.DONOT_MODIFY_HANDLERS:
+            with redirect_stdout(StreamLogWriter(ti.log, logging.INFO)), \
+                    redirect_stderr(StreamLogWriter(ti.log, logging.WARN)):
+                _run_task_by_selected_method(args, dag, ti)
+        else:
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            root_logger_handlers = root_logger.handlers
+
+            # Remove all handlers from Root Logger to avoid duplicate logs
+            for handler in root_logger_handlers:
+                root_logger.removeHandler(handler)
+
+            for handler in airflow_logger_handlers:
+                root_logger.addHandler(handler)
+            root_logger.setLevel(logging.getLogger('airflow.task').level)

Review comment:
       The root log level is never restored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org