You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/09 11:32:14 UTC

[GitHub] [airflow] ashb commented on a diff in pull request #28440: Propagate logs to stdout when in k8s executor pod

ashb commented on code in PR #28440:
URL: https://github.com/apache/airflow/pull/28440#discussion_r1064530937


##########
airflow/cli/commands/task_command.py:
##########
@@ -390,11 +400,19 @@ def task_run(args, dag=None):
 
     log.info("Running %s on host %s", ti, hostname)
 
+    # IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
+    # behind multiple open sleeping connections while heartbeating, which could
+    # easily exceed the database connection limit when
+    # processing hundreds of simultaneous tasks.
+    # this should be last thing before running, to reduce likelihood of an open session
+    # which can cause trouble if running process in a fork.
+    settings.reconfigure_orm(disable_connection_pool=True)

Review Comment:
   Is this related to this logging PR in anyway?



##########
tests/cli/commands/test_task_command.py:
##########
@@ -69,6 +72,15 @@ def move_back(old_path, new_path):
     shutil.move(new_path, old_path)
 
 
+@pytest.fixture()
+def freeze_time():
+    timestamp = "2022-06-10T12:02:44+00:00"
+    freezer = time_machine.travel(timestamp, tick=False)
+    freezer.start()
+    yield
+    freezer.stop()

Review Comment:
   This fixture doesn't seem to be used anywhere?



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -62,19 +62,29 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
                 # handler, not the one that calls super()__init__.
                 stacklevel=(2 if type(self) == FileTaskHandler else 3),
             )
+        self.maintain_propagate: bool = False
+        """If true, overrides default behavior of setting propagate=False

Review Comment:
   ```suggestion
           """
           If true, overrides default behavior of setting propagate=False
           
   ```



##########
tests/cli/commands/test_task_command.py:
##########
@@ -600,6 +612,48 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job)
                 external_executor_id="ABCD12345",
             )
 
+    @pytest.mark.parametrize("is_k8s", ["true", ""])
+    def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
+        """
+        When running task --local as k8l executor pod, all logging should make it to stdout.

Review Comment:
   ```suggestion
           When running task --local as k8s executor pod, all logging should make it to stdout.
   ```



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -62,19 +62,29 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
                 # handler, not the one that calls super()__init__.
                 stacklevel=(2 if type(self) == FileTaskHandler else 3),
             )
+        self.maintain_propagate: bool = False
+        """If true, overrides default behavior of setting propagate=False

Review Comment:
   (The blank line is the important bit here)



##########
tests/task/task_runner/test_standard_task_runner.py:
##########
@@ -131,38 +141,34 @@ def test_notifies_about_start_and_stop(self):
         )
         dag = dagbag.dags.get("test_example_bash_operator")
         task = dag.get_task("runme_1")
+        dag.create_dagrun(
+            run_id="test",
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            state=State.RUNNING,
+            start_date=DEFAULT_DATE,
+        )
+        ti = TaskInstance(task=task, run_id="test")
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)
+        runner = StandardTaskRunner(job1)
+        runner.start()
+
+        # Wait until process sets its pgid to be equal to pid

Review Comment:
   ```suggestion
           # Wait until process makes itself the leader of it's own process group
   ```



##########
tests/task/task_runner/test_standard_task_runner.py:
##########
@@ -131,38 +141,34 @@ def test_notifies_about_start_and_stop(self):
         )
         dag = dagbag.dags.get("test_example_bash_operator")
         task = dag.get_task("runme_1")
+        dag.create_dagrun(
+            run_id="test",
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            state=State.RUNNING,
+            start_date=DEFAULT_DATE,
+        )
+        ti = TaskInstance(task=task, run_id="test")
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)
+        runner = StandardTaskRunner(job1)
+        runner.start()
+
+        # Wait until process sets its pgid to be equal to pid

Review Comment:
   Though why do we have this loop, and then a 10s timeout directly below it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org