You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/29 04:19:12 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660258928



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor

Review comment:
       i think this could go behind `if TYPE_CHECKING`

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       another option would be to make `executor` a cached property and `executor_class` a property.  this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway.
   
   ```python
   @cached_property
   def executor(self):
       return self._executor or  ExecutorLoader.get_default_executor()  # store init param `executor` as private attr `_executor` 
   ```
   
   this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
   

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -110,11 +111,14 @@ def __init__(
         processor_poll_interval: float = conf.getfloat('scheduler', 'processor_poll_interval'),
         do_pickle: bool = False,
         log: logging.Logger = None,
+        executor: Optional[BaseExecutor] = None,

Review comment:
       missing from docstring

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor

Review comment:
       though.... not sure it matters e.g. if already imported implicitly .... but just in case i figured i'd point it out




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org