You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/27 22:11:18 UTC

[airflow] branch main updated: Allow Legacy SqlSensor to use the common.sql providers (#25293)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b0fd105f4a Allow Legacy SqlSensor to use the common.sql providers (#25293)
b0fd105f4a is described below

commit b0fd105f4ade9933476470f6e247dd5fa518ffc9
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Thu Jul 28 00:11:12 2022 +0200

    Allow Legacy SqlSensor to use the common.sql providers (#25293)
    
    The legacy Airflow SqlSensor, rejects working wiht comon.sql providers
    eve if they are perfectly fine to use.
    
    While users could switch to the common.sql sensor (and it
    should work fine). we should not force the users to switch to it.
    
    We are implementing "fake" class hierarchy in case the provider
    is installed on Airflow 2.3 and below.
    
    In case of Airflow 2.4+ importing the old DbApiHook will fail,
    because it will cause a circular import - in such case our
    new DbApiHook will derive (as it was originally planned) from
    BaseHook.
    
    But In case of Airflow 2.3 and below such import will succeed
    and we are using the original DbApiHook from airflow.hooks.dbapi
    as base class - this way any "common.sql" hook on Airflow 2.3
    and below will also derive from the airlfow.hooks.dbapi.DbApiHook
    - thus it will be possible to use it by the original SqlSensor.
    
    Fixes: #25274
---
 airflow/providers/common/sql/hooks/sql.py      | 20 ++++++++++++++++++--
 tests/providers/common/sql/hooks/test_dbapi.py | 20 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py
index 76d7980850..bd2259802d 100644
--- a/airflow/providers/common/sql/hooks/sql.py
+++ b/airflow/providers/common/sql/hooks/sql.py
@@ -17,9 +17,10 @@
 import warnings
 from contextlib import closing
 from datetime import datetime
-from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Mapping, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Mapping, Optional, Tuple, Type, Union
 
 import sqlparse
+from packaging.version import Version
 from sqlalchemy import create_engine
 from typing_extensions import Protocol
 
@@ -27,6 +28,7 @@ from airflow import AirflowException
 from airflow.hooks.base import BaseHook
 from airflow.providers_manager import ProvidersManager
 from airflow.utils.module_loading import import_string
+from airflow.version import version
 
 if TYPE_CHECKING:
     from sqlalchemy.engine import CursorResult
@@ -76,7 +78,21 @@ class ConnectorProtocol(Protocol):
         """
 
 
-class DbApiHook(BaseHook):
+# In case we are running it on Airflow 2.4+, we should use BaseHook, but on Airflow 2.3 and below
+# We want the DbApiHook to derive from the original DbApiHook from airflow, because otherwise
+# SqlSensor and BaseSqlOperator from "airflow.operators" and "airflow.sensors" will refuse to
+# accept the new Hooks as not derived from the original DbApiHook
+if Version(version) < Version('2.4'):
+    try:
+        from airflow.hooks.dbapi import DbApiHook as BaseForDbApiHook
+    except ImportError:
+        # just in case we have a problem with circular import
+        BaseForDbApiHook: Type[BaseHook] = BaseHook  # type: ignore[no-redef]
+else:
+    BaseForDbApiHook: Type[BaseHook] = BaseHook  # type: ignore[no-redef]
+
+
+class DbApiHook(BaseForDbApiHook):
     """
     Abstract base class for sql hooks.
 
diff --git a/tests/providers/common/sql/hooks/test_dbapi.py b/tests/providers/common/sql/hooks/test_dbapi.py
index a44fa57e07..e957e264f3 100644
--- a/tests/providers/common/sql/hooks/test_dbapi.py
+++ b/tests/providers/common/sql/hooks/test_dbapi.py
@@ -23,10 +23,19 @@ from unittest import mock
 
 import pytest
 
+from airflow.hooks.base import BaseHook
 from airflow.models import Connection
 from airflow.providers.common.sql.hooks.sql import DbApiHook
 
 
+class DbApiHookInProvider(DbApiHook):
+    conn_name_attr = 'test_conn_id'
+
+
+class NonDbApiHook(BaseHook):
+    pass
+
+
 class TestDbApiHook(unittest.TestCase):
     def setUp(self):
         super().setUp()
@@ -390,3 +399,14 @@ class TestDbApiHook(unittest.TestCase):
         with pytest.raises(ValueError) as err:
             self.db_hook.run(sql=[])
         assert err.value.args[0] == "List of SQL statements is empty"
+
+    def test_instance_check_works_for_provider_derived_hook(self):
+        assert isinstance(DbApiHookInProvider(), DbApiHook)
+
+    def test_instance_check_works_for_non_db_api_hook(self):
+        assert not isinstance(NonDbApiHook(), DbApiHook)
+
+    def test_instance_check_works_for_legacy_db_api_hook(self):
+        from airflow.hooks.dbapi import DbApiHook as LegacyDbApiHook
+
+        assert isinstance(DbApiHookInProvider(), LegacyDbApiHook)