You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/12/10 11:56:33 UTC

[GitHub] [airflow] ashb commented on a change in pull request #6760: [AIRFLOW-6157] Separate out common protocol for executors.

ashb commented on a change in pull request #6760: [AIRFLOW-6157] Separate out common protocol for executors.
URL: https://github.com/apache/airflow/pull/6760#discussion_r355995069
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -42,7 +43,124 @@
 QueuedTaskInstanceType = Tuple[CommandType, int, Optional[str], SimpleTaskInstance]
 
 
-class BaseExecutor(LoggingMixin):
+class BaseExecutorProtocol(LoggingMixin):
+    """
+    Base Protocol implemented by all executors including multiple executors.
+    """
+
+    def __init__(self):
+        super().__init__()
+
+    def start(self):  # pragma: no cover
+        """
+        Executors may need to get things started.
+        """
+        raise NotImplementedError()
+
+    def has_task(self, task_instance: TaskInstance) -> bool:
+        """
+        Checks if a task is either queued or running in this executor.
+
+        :param task_instance: TaskInstance
+        :return: True if the task is known to this executor
+        """
+        raise NotImplementedError()
+
+    def sync(self) -> None:
+        """
+        Sync will get called periodically by the heartbeat method.
+        Executors should override this to perform gather statuses.
+        """
+        raise NotImplementedError()
+
+    def queue_command(self,
+                      simple_task_instance: SimpleTaskInstance,
+                      command: CommandType,
+                      priority: int = 1,
+                      queue: Optional[str] = None) -> None:
+        """Queues command to task"""
+        raise NotImplementedError()
+
+    def queue_task_instance(self,
+                            task_instance: TaskInstance,
+                            mark_success: bool = False,
+                            pickle_id: Optional[str] = None,
+                            ignore_all_deps: bool = False,
+                            ignore_depends_on_past: bool = False,
+                            ignore_task_deps: bool = False,
+                            ignore_ti_state: bool = False,
+                            pool: Optional[str] = None,
+                            cfg_path: Optional[str] = None) -> None:
+        """Queues task instance."""
+        raise NotImplementedError()
+
+    def heartbeat(self) -> None:
+        """
+        Heartbeat sent to trigger new jobs.
+        """
+        raise NotImplementedError()
+
+    def execute_async(self,
+                      key: TaskInstanceKeyType,
+                      command: CommandType,
+                      queue: Optional[str] = None,
+                      executor_config: Optional[Any] = None) -> None:  # pragma: no cover
+        """
+        This method will execute the command asynchronously.
+
+        :param key: Unique key for the task instance
+        :param command: Command to run
+        :param queue: name of the queue
+        :param executor_config: Configuration passed to the executor.
+        """
+        raise NotImplementedError()
+
+    def end(self) -> None:  # pragma: no cover
+        """
+        This method is called when the caller is done submitting job and
+        wants to wait synchronously for the job submitted previously to be
+        all done.
+        """
+        raise NotImplementedError()
+
+    def terminate(self):
+        """
+        This method is called when the daemon receives a SIGTERM
+        """
+        raise NotImplementedError()
+
+    def is_task_queued(self, task_instance_key: TaskInstanceKeyType) -> bool:
+        """
+        Return True if task instance is queued
+        """
+        raise NotImplementedError()
+
+    def is_task_running(self, task_instance_key: TaskInstanceKeyType) -> bool:
+        """
+        Return True if task instance is running
+        """
+        raise NotImplementedError()
+
+    @property
+    def queued_tasks_keys(self) -> Iterable[TaskInstanceKeyType]:
+        """
+        Returns task keys in iterable form.
+        """
+        raise NotImplementedError()
+
+    def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[str]]:
+        """
+        Returns and flush the event buffer. In case dag_ids is specified
+        it will only return and flush events for the given dag_ids. Otherwise
+        it returns and flushes all events.
+
+        :param dag_ids: to dag_ids to return events for, if None returns all
+        :return: a dict of events
+        """
+        raise NotImplementedError()
+
+
+class BaseExecutor(BaseExecutorProtocol, metaclass=ABCMeta):
 
 Review comment:
   Is there any point in separating out this Protocol? I don't quite see what purpose it serves?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services