You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 19:27:16 UTC

[GitHub] [airflow] bdsoha opened a new pull request, #27969: Completed D400 for multiple folders

bdsoha opened a new pull request, #27969:
URL: https://github.com/apache/airflow/pull/27969

   Completed D400 for:
   
   - `airflow/jobs/*`
   - `airflow/executors/*`
   
   Related to #10742 .
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27969: Completed D400 for multiple folders

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27969:
URL: https://github.com/apache/airflow/pull/27969#discussion_r1034345312


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -454,7 +454,7 @@ def __init__(self):
     @provide_session
     def clear_not_launched_queued_tasks(self, session=None) -> None:
         """
-        Clear unlaunched tasks that were previously queued.
+        Clear launched tasks that were not yet launched, but previously queued.

Review Comment:
   This sentence does not make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bdsoha commented on pull request #27969: Completed D400 for multiple folders

Posted by GitBox <gi...@apache.org>.
bdsoha commented on PR #27969:
URL: https://github.com/apache/airflow/pull/27969#issuecomment-1336742856

   > But needs rebase and conflict resolution :(
   
   I also added the periods that were removed during the conflict resolution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27969: Completed D400 for multiple folders

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27969:
URL: https://github.com/apache/airflow/pull/27969#issuecomment-1329806323

   Checks :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27969: Completed D400 for multiple folders

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27969:
URL: https://github.com/apache/airflow/pull/27969#discussion_r1040567143


##########
airflow/executors/celery_executor.py:
##########
@@ -540,6 +541,7 @@ def _set_celery_pending_task_timeout(
         self, key: TaskInstanceKey, timeout_type: _CeleryPendingTaskTimeoutType | None
     ) -> None:
         """
+        Set pending task timeout.

Review Comment:
   ```suggestion
           Set pending task timeout.
   
   ```



##########
airflow/executors/celery_kubernetes_executor.py:
##########
@@ -52,40 +52,44 @@ def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: Kuberne
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from celery and kubernetes executor"""
+        """Return queued tasks from celery and kubernetes executor."""
         queued_tasks = self.celery_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from celery and kubernetes executor"""
+        """Return running tasks from celery and kubernetes executor."""
         return self.celery_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> int | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.

Review Comment:
   ```suggestion
           Inherited attribute from BaseExecutor.
   
   ```



##########
airflow/executors/celery_kubernetes_executor.py:
##########
@@ -52,40 +52,44 @@ def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: Kuberne
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from celery and kubernetes executor"""
+        """Return queued tasks from celery and kubernetes executor."""
         queued_tasks = self.celery_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from celery and kubernetes executor"""
+        """Return running tasks from celery and kubernetes executor."""
         return self.celery_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> int | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.
+        Since this is not really an executor, but a wrapper of executors
+        we implemented it as property, so we can have custom setter.
         """
         return self._job_id
 
     @job_id.setter
     def job_id(self, value: int | None) -> None:
-        """job_id is manipulated by SchedulerJob.  We must propagate the job_id to wrapped executors."""
+        """
+        job_id is manipulated by SchedulerJob.
+        We must propagate the job_id to wrapped executors.
+        """

Review Comment:
   ```suggestion
           """Expose job ID for SchedulerJob."""
   ```



##########
airflow/executors/local_executor.py:
##########
@@ -50,8 +50,8 @@
 
 class LocalWorkerBase(Process, LoggingMixin):
     """
-    LocalWorkerBase implementation to run airflow commands. Executes the given
-    command and puts the result into a result queue when done, terminating execution.
+    LocalWorkerBase implementation to run airflow commands.
+    Executes the given command and puts the result into a result queue when done, terminating execution.

Review Comment:
   ```suggestion
       LocalWorkerBase implementation to run airflow commands.
   
       Executes the given command and puts the result into a result queue when done, terminating execution.
   ```



##########
airflow/executors/local_executor.py:
##########
@@ -263,18 +262,14 @@ def sync(self) -> None:
                 self.executor.workers_active -= 1
 
         def end(self) -> None:
-            """
-            This method is called when the caller is done submitting job and
-            wants to wait synchronously for the job submitted previously to be
-            all done.
-            """
+            """Wait synchronously for the previously submitted job to complete."""
             while self.executor.workers_active > 0:
                 self.executor.sync()
 
     class LimitedParallelism:
         """
-        Implements LocalExecutor with limited parallelism using a task queue to
-        coordinate work distribution.
+        Implements LocalExecutor with limited parallelism.
+        Uses a task queue to coordinate work distribution.

Review Comment:
   ```suggestion
           Implements LocalExecutor with limited parallelism.
   
           Uses a task queue to coordinate work distribution.
   ```



##########
airflow/executors/local_kubernetes_executor.py:
##########
@@ -52,41 +52,45 @@ def __init__(self, local_executor: LocalExecutor, kubernetes_executor: Kubernete
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from local and kubernetes executor"""
+        """Return queued tasks from local and kubernetes executor."""
         queued_tasks = self.local_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from local and kubernetes executor"""
+        """Return running tasks from local and kubernetes executor."""
         return self.local_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> str | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.
+        Since this is not really an executor, but a wrapper of executors
+        we implemented it as property, so we can have custom setter.

Review Comment:
   ```suggestion
           Inherited attribute from BaseExecutor.
   
           Since this is not really an executor, but a wrapper of executors
           we implemented it as property, so we can have custom setter.
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -67,13 +67,13 @@ class BackfillJob(BaseJob):
     @attr.define
     class _DagRunTaskStatus:
         """
-        Internal status of the backfill job. This class is intended to be instantiated
-        only within a BackfillJob instance and will track the execution of tasks,
-        e.g. running, skipped, succeeded, failed, etc. Information about the dag runs
-        related to the backfill job are also being tracked in this structure,
-        .e.g finished runs, etc. Any other status related information related to the
-        execution of dag runs / tasks can be included in this structure since it makes
-        it easier to pass it around.
+        Internal status of the backfill job.

Review Comment:
   ```suggestion
           Internal status of the backfill job.
   
   ```



##########
airflow/executors/local_executor.py:
##########
@@ -162,8 +162,8 @@ def do_work(self) -> None:
 
 class QueuedLocalWorker(LocalWorkerBase):
     """
-    LocalWorker implementation that is waiting for tasks from a queue and will
-    continue executing commands as they become available in the queue.
+    LocalWorker implementation that is waiting for tasks from a queue.
+    Will continue executing commands as they become available in the queue.

Review Comment:
   ```suggestion
       LocalWorker implementation that is waiting for tasks from a queue.
   
       Will continue executing commands as they become available in the queue.
   ```



##########
airflow/executors/sequential_executor.py:
##########
@@ -34,9 +34,10 @@
 
 class SequentialExecutor(BaseExecutor):
     """
-    This executor will only run one task instance at a time, can be used
-    for debugging. It is also the only executor that can be used with sqlite
-    since sqlite doesn't support multiple connections.
+    This executor will only run one task instance at a time.
+    It can be used for debugging. It is also the only executor
+    that can be used with sqlite since sqlite doesn't support
+    multiple connections.

Review Comment:
   ```suggestion
       This executor will only run one task instance at a time.
   
       It can be used for debugging. It is also the only executor
       that can be used with sqlite since sqlite doesn't support
       multiple connections.
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -162,8 +163,8 @@ def __init__(
 
     def _update_counters(self, ti_status, session=None):
         """
-        Updates the counters per state of the tasks that were running. Can re-add
-        to tasks to run in case required.
+        Updates the counters per state of the tasks that were running.

Review Comment:
   ```suggestion
           Updates the counters per state of the tasks that were running.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -122,6 +122,7 @@ def __init__(
         **kwargs,
     ):
         """
+        Create a BackfillJob.

Review Comment:
   ```suggestion
           Create a BackfillJob.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -413,9 +411,9 @@ def _process_backfill_task_instances(
         session=None,
     ) -> list:
         """
-        Process a set of task instances from a set of dag runs. Special handling is done
-        to account for different task instance states that could be present when running
-        them in a backfill process.
+        Process a set of task instances from a set of dag runs.
+        Special handling is done to account for different task instance states
+        that could be present when running them in a backfill process.

Review Comment:
   ```suggestion
           Process a set of task instances from a set of DAG runs.
   
           Special handling is done to account for different task instance states
           that could be present when running them in a backfill process.
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -291,9 +291,8 @@ def _iter_task_needing_expansion() -> Iterator[AbstractOperator]:
     @provide_session
     def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = None):
         """
-        Returns a dag run for the given run date, which will be matched to an existing
-        dag run if available or create a new dag run otherwise. If the max_active_runs
-        limit is reached, this function will return None.
+        Return an existing dag run for the given run date or create one.

Review Comment:
   ```suggestion
           Return an existing dag run for the given run date or create one.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -901,6 +895,7 @@ def _execute(self, session=None):
     @provide_session
     def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None) -> int | None:
         """
+        Reset state of orphaned tasks.

Review Comment:
   ```suggestion
           Reset state of orphaned tasks.
   
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -85,17 +85,18 @@
 
 def _is_parent_process() -> bool:
     """
-    Returns True if the current process is the parent process. False if the current process is a child
-    process started by multiprocessing.
+    Whether this is a parent process.
+    Return True if the current process is the parent process.
+    False if the current process is a child process started by multiprocessing.
     """
     return multiprocessing.current_process().name == "MainProcess"
 
 
 class SchedulerJob(BaseJob):
     """
-    This SchedulerJob runs for a specific time interval and schedules the jobs
-    that are ready to run. It figures out the latest runs for each
-    task and sees if the dependencies for the next schedules are met.
+    SchedulerJob runs for a specific time interval and schedules jobs that are ready to run.
+    It figures out the latest runs for each task and sees if the dependencies
+    for the next schedules are met.

Review Comment:
   ```suggestion
       SchedulerJob runs for a specific time interval and schedules jobs that are ready to run.
   
       It figures out the latest runs for each task and sees if the dependencies
       for the next schedules are met.
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -241,8 +242,12 @@ def __get_concurrency_maps(
 
     def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -> list[TI]:
         """
-        Finds TIs that are ready for execution with respect to pool limits,
-        dag max_active_tasks, executor state, and priority.
+        Find TIs that are ready for execution based on conditions.
+        Conditions include:
+        - pool limits
+        - dag max_active_tasks
+        - executor state
+        - priority

Review Comment:
   ```suggestion
           Find TIs that are ready for execution based on conditions.
   
           Conditions include:
           - pool limits
           - DAG max_active_tasks
           - executor state
           - priority
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -85,17 +85,18 @@
 
 def _is_parent_process() -> bool:
     """
-    Returns True if the current process is the parent process. False if the current process is a child
-    process started by multiprocessing.
+    Whether this is a parent process.
+    Return True if the current process is the parent process.
+    False if the current process is a child process started by multiprocessing.

Review Comment:
   ```suggestion
       Whether this is a parent process.
   
       Return True if the current process is the parent process.
       False if the current process is a child process started by multiprocessing.
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -816,7 +820,8 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION)
 
     def _run_scheduler_loop(self) -> None:
         """
-        The actual scheduler loop. The main steps in the loop are:
+        The actual scheduler loop.
+        The main steps in the loop are:

Review Comment:
   ```suggestion
           The actual scheduler loop.
   
           The main steps in the loop are:
   ```



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -447,6 +448,7 @@ def __init__(self):
     @provide_session
     def clear_not_launched_queued_tasks(self, session=None) -> None:
         """
+        Clear tasks that were not yet launched, but were previously queued.

Review Comment:
   ```suggestion
           Clear tasks that were not yet launched, but were previously queued.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -55,8 +55,8 @@
 
 class BackfillJob(BaseJob):
     """
-    A backfill job consists of a dag or subdag for a specific time range. It
-    triggers a set of task instance runs, in the right order and lasts for
+    A backfill job consists of a dag or subdag for a specific time range.
+    It triggers a set of task instance runs, in the right order and lasts for

Review Comment:
   ```suggestion
       A backfill job consists of a dag or subdag for a specific time range.
   
       It triggers a set of task instance runs, in the right order and lasts for
   ```



##########
airflow/executors/local_kubernetes_executor.py:
##########
@@ -52,41 +52,45 @@ def __init__(self, local_executor: LocalExecutor, kubernetes_executor: Kubernete
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from local and kubernetes executor"""
+        """Return queued tasks from local and kubernetes executor."""
         queued_tasks = self.local_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from local and kubernetes executor"""
+        """Return running tasks from local and kubernetes executor."""
         return self.local_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> str | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.
+        Since this is not really an executor, but a wrapper of executors
+        we implemented it as property, so we can have custom setter.
         """
         return self._job_id
 
     @job_id.setter
     def job_id(self, value: str | None) -> None:
-        """job_id is manipulated by SchedulerJob.  We must propagate the job_id to wrapped executors."""
+        """
+        job_id is manipulated by SchedulerJob.
+        We must propagate the job_id to wrapped executors.
+        """

Review Comment:
   ```suggestion
           """Expose job ID for SchedulerJob."""
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -733,8 +731,7 @@ def _get_dag_with_subdags(self) -> list[DAG]:
     @provide_session
     def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id, start_date, session=None):
         """
-        Computes the dag runs and their respective task instances for
-        the given run dates and executes the task instances.
+        Compute and execute dag runs and their respective task instances for the given dates.

Review Comment:
   ```suggestion
           Compute and execute dag runs and their respective task instances for the given dates.
   
   ```



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -413,6 +413,7 @@ def terminate(self) -> None:
 
 def get_base_pod_from_template(pod_template_file: str | None, kube_config: Any) -> k8s.V1Pod:
     """
+    Get base pod from template.

Review Comment:
   ```suggestion
       Get base pod from template.
   
   ```



##########
airflow/jobs/base_job.py:
##########
@@ -49,10 +49,10 @@ def _resolve_dagrun_model():
 
 class BaseJob(Base, LoggingMixin):
     """
-    Abstract class to be derived for jobs. Jobs are processing items with state
-    and duration that aren't task instances. For instance a BackfillJob is
-    a collection of task instance runs, but should have its own state, start
-    and end time.
+    Abstract class to be derived for jobs.
+    Jobs are processing items with state and duration that aren't task instances.
+    For instance a BackfillJob is a collection of task instance runs,
+    but should have its own state, start and end time.

Review Comment:
   ```suggestion
       Abstract class to be derived for jobs.
   
       Jobs are processing items with state and duration that aren't task instances.
       For instance a BackfillJob is a collection of task instance runs,
       but should have its own state, start and end time.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #27969: Completed D400 for multiple folders

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #27969:
URL: https://github.com/apache/airflow/pull/27969


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bdsoha commented on pull request #27969: Completed D400 for multiple folders

Posted by GitBox <gi...@apache.org>.
bdsoha commented on PR #27969:
URL: https://github.com/apache/airflow/pull/27969#issuecomment-1333602671

   @potiuk All green!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27969: Completed D400 for multiple folders

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27969:
URL: https://github.com/apache/airflow/pull/27969#issuecomment-1336593061

   But needs rebase and conflict resolution :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org