You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/11/25 18:48:54 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]

kaxil commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010]
URL: https://github.com/apache/airflow/pull/6596#discussion_r350359901
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -16,67 +14,82 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+"""Base executor."""
 from collections import OrderedDict
+from queue import Queue
+from typing import Any, Dict, Optional, Set, Tuple
 
-# To avoid circular imports
-import airflow.utils.dag_processing
-from airflow.configuration import conf
+from airflow import AirflowException, LoggingMixin, conf
+from airflow.executors.executor_type_aliases import CommandType, ExecutorKeyType
+from airflow.models import TaskInstance
 from airflow.stats import Stats
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.dag_processing import SimpleTaskInstance
 from airflow.utils.state import State
 
-PARALLELISM = conf.getint('core', 'PARALLELISM')
+PARALLELISM: int = conf.getint('core', 'PARALLELISM')
 
 
 class BaseExecutor(LoggingMixin):
-
-    def __init__(self, parallelism=PARALLELISM):
-        """
-        Class to derive in order to interface with executor-type systems
-        like Celery, Yarn and the likes.
-
-        :param parallelism: how many jobs should run at one time. Set to
-            ``0`` for infinity
-        :type parallelism: int
-        """
-        self.parallelism = parallelism
-        self.queued_tasks = OrderedDict()
-        self.running = {}
-        self.event_buffer = {}
+    """
+    Class to derive in order to interface with executor-type systems
+    like Celery, Yarn and the likes.
+
+    :param parallelism: how many jobs should run at one time. Set to
+        ``0`` for infinity
+    :type parallelism: int
+    """
+
+    def __init__(self, parallelism: int = PARALLELISM):
+        super().__init__()
+        self.parallelism: int = parallelism
+        self.queued_tasks: OrderedDict[
+            ExecutorKeyType,
+            Tuple[CommandType, int, Queue, SimpleTaskInstance]] \
+            = OrderedDict()
+        self.running: Set[ExecutorKeyType] = set()
+        self.event_buffer: Dict[ExecutorKeyType, str] = {}
 
     def start(self):  # pragma: no cover
         """
-        Executors may need to get things started. For example LocalExecutor
-        starts N workers.
+        Executors may need to get things started.
         """
 
-    def queue_command(self, simple_task_instance, command, priority=1, queue=None):
+    def queue_command(self,
+                      simple_task_instance: SimpleTaskInstance,
+                      command_to_run: CommandType,
+                      priority: int = 1,
+                      queue: Optional[Queue] = None):
+        """Queues command to task"""
         key = simple_task_instance.key
-        if key not in self.queued_tasks and key not in self.running:
-            self.log.info("Adding to queue: %s", command)
-            self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
+        if key not in self.queued_tasks.keys() and key not in self.running:
+            self.log.info("Adding to queue: %s", command_to_run)
+            if not queue:
 
 Review comment:
   `queue` is an optional param in this method. So if it is None we should not raise an error

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services