You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ms...@apache.org on 2020/05/08 08:40:02 UTC

[airflow] branch master updated: Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)

This is an automated email from the ASF dual-hosted git repository.

msumit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new b37ce29  Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)
b37ce29 is described below

commit b37ce294b938fa1f591e526f8ee326f3dcea3e24
Author: vshshjn7 <vs...@gmail.com>
AuthorDate: Fri May 8 14:09:30 2020 +0530

    Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)
    
    
    Co-authored-by: Vishesh Jain <vi...@twitter.com>
---
 UPDATING.md                                   | 10 ++++++++++
 airflow/models/baseoperator.py                |  4 ++--
 tests/serialization/test_dag_serialization.py |  2 ++
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index db50c55..b10cbfb 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -62,6 +62,16 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+### Ability to patch Pool.DEFAULT_POOL_NAME in BaseOperator
+It was not possible to patch pool in BaseOperator as the signature sets the default value of pool
+as Pool.DEFAULT_POOL_NAME.
+While using subdagoperator in unittest(without initializing the sqlite db), it was throwing the
+following error:
+```
+sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: slot_pool.
+```
+Fix for this, https://github.com/apache/airflow/pull/8587
+
 ### Change signature of BigQueryGetDatasetTablesOperator
 Was:
 ```python
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 2e431ca..a9576a1 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -316,7 +316,7 @@ class BaseOperator(Operator, LoggingMixin):
         priority_weight: int = 1,
         weight_rule: str = WeightRule.DOWNSTREAM,
         queue: str = conf.get('celery', 'default_queue'),
-        pool: str = Pool.DEFAULT_POOL_NAME,
+        pool: Optional[str] = None,
         pool_slots: int = 1,
         sla: Optional[timedelta] = None,
         execution_timeout: Optional[timedelta] = None,
@@ -385,7 +385,7 @@ class BaseOperator(Operator, LoggingMixin):
 
         self.retries = retries
         self.queue = queue
-        self.pool = pool
+        self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool
         self.pool_slots = pool_slots
         if self.pool_slots < 1:
             raise AirflowException("pool slots for %s in dag %s cannot be less than 1"
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 9dda878..6580744 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -70,6 +70,7 @@ serialized_simple_dag_ground_truth = {
                 "bash_command": "echo {{ task.task_id }}",
                 "_task_type": "BashOperator",
                 "_task_module": "airflow.operators.bash",
+                "pool": "default_pool",
             },
             {
                 "task_id": "custom_task",
@@ -84,6 +85,7 @@ serialized_simple_dag_ground_truth = {
                 "template_fields": ['bash_command'],
                 "_task_type": "CustomOperator",
                 "_task_module": "tests.test_utils.mock_operators",
+                "pool": "default_pool",
             },
         ],
         "timezone": "UTC",