You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ms...@apache.org on 2020/05/08 08:40:02 UTC
[airflow] branch master updated: Patch Pool.DEFAULT_POOL_NAME in
BaseOperator (#8587)
This is an automated email from the ASF dual-hosted git repository.
msumit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new b37ce29 Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)
b37ce29 is described below
commit b37ce294b938fa1f591e526f8ee326f3dcea3e24
Author: vshshjn7 <vs...@gmail.com>
AuthorDate: Fri May 8 14:09:30 2020 +0530
Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)
Co-authored-by: Vishesh Jain <vi...@twitter.com>
---
UPDATING.md | 10 ++++++++++
airflow/models/baseoperator.py | 4 ++--
tests/serialization/test_dag_serialization.py | 2 ++
3 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index db50c55..b10cbfb 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -62,6 +62,16 @@ https://developers.google.com/style/inclusive-documentation
-->
+### Ability to patch Pool.DEFAULT_POOL_NAME in BaseOperator
+It was not possible to patch pool in BaseOperator as the signature sets the default value of pool
+as Pool.DEFAULT_POOL_NAME.
+While using subdagoperator in unittest(without initializing the sqlite db), it was throwing the
+following error:
+```
+sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: slot_pool.
+```
+Fix for this, https://github.com/apache/airflow/pull/8587
+
### Change signature of BigQueryGetDatasetTablesOperator
Was:
```python
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 2e431ca..a9576a1 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -316,7 +316,7 @@ class BaseOperator(Operator, LoggingMixin):
priority_weight: int = 1,
weight_rule: str = WeightRule.DOWNSTREAM,
queue: str = conf.get('celery', 'default_queue'),
- pool: str = Pool.DEFAULT_POOL_NAME,
+ pool: Optional[str] = None,
pool_slots: int = 1,
sla: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = None,
@@ -385,7 +385,7 @@ class BaseOperator(Operator, LoggingMixin):
self.retries = retries
self.queue = queue
- self.pool = pool
+ self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool
self.pool_slots = pool_slots
if self.pool_slots < 1:
raise AirflowException("pool slots for %s in dag %s cannot be less than 1"
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 9dda878..6580744 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -70,6 +70,7 @@ serialized_simple_dag_ground_truth = {
"bash_command": "echo {{ task.task_id }}",
"_task_type": "BashOperator",
"_task_module": "airflow.operators.bash",
+ "pool": "default_pool",
},
{
"task_id": "custom_task",
@@ -84,6 +85,7 @@ serialized_simple_dag_ground_truth = {
"template_fields": ['bash_command'],
"_task_type": "CustomOperator",
"_task_module": "tests.test_utils.mock_operators",
+ "pool": "default_pool",
},
],
"timezone": "UTC",