You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/28 01:16:56 UTC

[airflow] 01/02: Upgrades moto to newer version (~=2.0) (#15051)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cfb6e9fdb194f44debef18a88f2f5a7241728c6e
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Mar 27 21:24:09 2021 +0100

    Upgrades moto to newer version (~=2.0) (#15051)
    
    According to https://github.com/spulec/moto/issues/3535#issuecomment-808706939
    1.3.17 version of moto with a fix to be compatible with mock> 4.0.3 is
    not going to be released because of breaking changes. Therefore we need
    to migrate to newer version of moto.
    
    At the same time we can get rid of the old botocore limitation, which
    was added apparently to handle some test errors. We are relying fully
    on what boto3 depends on.
    
    Upgrading dependencies also discovered that mysql tests need to
    be fixed because upgraded version of dependencies cause some test
    failure (those turned out to be badly written tests).
    
    (cherry picked from commit e8aa3de4bbd218d9e3b899ee221a2ea8e3f3a6de)
---
 setup.py                                           |  20 ++--
 tests/operators/test_generic_transfer.py           |   7 +-
 tests/providers/amazon/aws/.gitignore              |   1 +
 tests/providers/amazon/aws/hooks/test_s3.py        |  28 ++---
 .../amazon/aws/hooks/test_secrets_manager.py       |  26 ++++-
 .../amazon/aws/secrets/test_secrets_manager.py     |  34 +++++-
 .../apache/hive/transfers/test_mysql_to_hive.py    | 124 +++++++++++----------
 .../google/cloud/operators/test_stackdriver.py     |  38 +++++--
 tests/providers/mysql/hooks/test_mysql.py          |  31 +++---
 tests/providers/mysql/operators/test_mysql.py      |   8 +-
 10 files changed, 193 insertions(+), 124 deletions(-)

diff --git a/setup.py b/setup.py
index 7db75b8..26ad19a 100644
--- a/setup.py
+++ b/setup.py
@@ -194,8 +194,7 @@ def get_sphinx_theme_version() -> str:
 # If you change this mark you should also change ./scripts/ci/check_order_setup.py
 # Start dependencies group
 amazon = [
-    'boto3>=1.15.0,<1.16.0',
-    'botocore>=1.18.0,<1.19.0',
+    'boto3>=1.15.0,<1.18.0',
     'watchtower~=0.7.3',
 ]
 apache_beam = [
@@ -217,7 +216,7 @@ azure = [
     'azure-keyvault>=4.1.0',
     'azure-kusto-data>=0.0.43,<0.1',
     'azure-mgmt-containerinstance>=1.5.0,<2.0',
-    'azure-mgmt-datafactory>=0.13.0',
+    'azure-mgmt-datafactory>=1.0.0,<2.0',
     'azure-mgmt-datalake-store>=0.5.0',
     'azure-mgmt-resource>=2.2.0',
     'azure-storage-blob>=12.7.0',
@@ -315,6 +314,7 @@ google = [
     'grpcio-gcp>=0.2.2',
     'json-merge-patch~=0.2',
     'pandas-gbq',
+    'plyvel',
 ]
 grpc = [
     'google-auth>=1.0.0, <2.0.0dev',
@@ -364,7 +364,7 @@ mssql = [
 ]
 mysql = [
     'mysql-connector-python>=8.0.11, <=8.0.22',
-    'mysqlclient>=1.3.6,<1.4',
+    'mysqlclient>=1.3.6,<3',
 ]
 neo4j = ['neo4j>=4.2.1']
 odbc = [
@@ -390,7 +390,7 @@ pinot = [
     'pinotdb>0.1.2,<1.0.0',
 ]
 plexus = [
-    'arrow>=0.16.0',
+    'arrow>=0.16.0,<1.0.0',
 ]
 postgres = [
     'psycopg2-binary>=2.7.4',
@@ -468,6 +468,7 @@ zendesk = [
 # End dependencies group
 
 devel = [
+    'aws_xray_sdk',
     'beautifulsoup4~=4.7.1',
     'black',
     'blinker',
@@ -485,17 +486,15 @@ devel = [
     'ipdb',
     'jira',
     'jsonpath-ng',
-    # HACK: Moto is not compatible with newer versions
-    # See: https://github.com/spulec/moto/issues/3535
-    'mock<4.0.3',
+    'jsondiff',
     'mongomock',
-    'moto<2',
+    'moto~=2.0',
     'mypy==0.770',
     'parameterized',
     'paramiko',
     'pipdeptree',
     'pre-commit',
-    'pylint',
+    'pylint>=2.7.0',
     'pysftp',
     'pytest~=6.0',
     'pytest-cov',
@@ -503,6 +502,7 @@ devel = [
     'pytest-rerunfailures~=9.1',
     'pytest-timeouts',
     'pytest-xdist',
+    'python-jose',
     'pywinrm',
     'qds-sdk>=1.9.6',
     'requests_mock',
diff --git a/tests/operators/test_generic_transfer.py b/tests/operators/test_generic_transfer.py
index a94b56f..4780b41 100644
--- a/tests/operators/test_generic_transfer.py
+++ b/tests/operators/test_generic_transfer.py
@@ -17,6 +17,7 @@
 # under the License.
 
 import unittest
+from contextlib import closing
 
 import pytest
 from parameterized import parameterized
@@ -43,9 +44,11 @@ class TestMySql(unittest.TestCase):
 
     def tearDown(self):
         drop_tables = {'test_mysql_to_mysql', 'test_airflow'}
-        with MySqlHook().get_conn() as conn:
+        with closing(MySqlHook().get_conn()) as conn:
             for table in drop_tables:
-                conn.execute(f"DROP TABLE IF EXISTS {table}")
+                # Previous version tried to run execute directly on dbapi call, which was accidentally working
+                with closing(conn.cursor()) as cur:
+                    cur.execute(f"DROP TABLE IF EXISTS {table}")
 
     @parameterized.expand(
         [
diff --git a/tests/providers/amazon/aws/.gitignore b/tests/providers/amazon/aws/.gitignore
new file mode 100644
index 0000000..4083037
--- /dev/null
+++ b/tests/providers/amazon/aws/.gitignore
@@ -0,0 +1 @@
+local
diff --git a/tests/providers/amazon/aws/hooks/test_s3.py b/tests/providers/amazon/aws/hooks/test_s3.py
index d962068..e7f05137 100644
--- a/tests/providers/amazon/aws/hooks/test_s3.py
+++ b/tests/providers/amazon/aws/hooks/test_s3.py
@@ -36,6 +36,18 @@ except ImportError:
     mock_s3 = None
 
 
+# This class needs to be separated out because if there are earlier mocks in the same class
+# the tests will fail on teardown.
+class TestAwsS3HookNoMock:
+    def test_check_for_bucket_raises_error_with_invalid_conn_id(self, monkeypatch):
+        monkeypatch.delenv('AWS_PROFILE', raising=False)
+        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)
+        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)
+        hook = S3Hook(aws_conn_id="does_not_exist")
+        with pytest.raises(NoCredentialsError):
+            hook.check_for_bucket("test-non-existing-bucket")
+
+
 @pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
 class TestAwsS3Hook:
     @mock_s3
@@ -52,14 +64,6 @@ class TestAwsS3Hook:
         assert hook.check_for_bucket(s3_bucket) is True
         assert hook.check_for_bucket('not-a-bucket') is False
 
-    def test_check_for_bucket_raises_error_with_invalid_conn_id(self, s3_bucket, monkeypatch):
-        monkeypatch.delenv('AWS_PROFILE', raising=False)
-        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)
-        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)
-        hook = S3Hook(aws_conn_id="does_not_exist")
-        with pytest.raises(NoCredentialsError):
-            hook.check_for_bucket(s3_bucket)
-
     @mock_s3
     def test_get_bucket(self):
         hook = S3Hook()
@@ -156,14 +160,6 @@ class TestAwsS3Hook:
         assert hook.check_for_key('b', s3_bucket) is False
         assert hook.check_for_key(f's3://{s3_bucket}//b') is False
 
-    def test_check_for_key_raises_error_with_invalid_conn_id(self, monkeypatch, s3_bucket):
-        monkeypatch.delenv('AWS_PROFILE', raising=False)
-        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)
-        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)
-        hook = S3Hook(aws_conn_id="does_not_exist")
-        with pytest.raises(NoCredentialsError):
-            hook.check_for_key('a', s3_bucket)
-
     def test_get_key(self, s3_bucket):
         hook = S3Hook()
         bucket = hook.get_bucket(s3_bucket)
diff --git a/tests/providers/amazon/aws/hooks/test_secrets_manager.py b/tests/providers/amazon/aws/hooks/test_secrets_manager.py
index bfcd847..b41b381 100644
--- a/tests/providers/amazon/aws/hooks/test_secrets_manager.py
+++ b/tests/providers/amazon/aws/hooks/test_secrets_manager.py
@@ -43,12 +43,17 @@ class TestSecretsManagerHook(unittest.TestCase):
         secret_value = '{"user": "test"}'
         hook = SecretsManagerHook(aws_conn_id='aws_default')
 
-        param = {
+        create_param = {
+            'Name': secret_name,
+        }
+
+        put_param = {
             'SecretId': secret_name,
             'SecretString': secret_value,
         }
 
-        hook.get_conn().put_secret_value(**param)
+        hook.get_conn().create_secret(**create_param)
+        hook.get_conn().put_secret_value(**put_param)
 
         secret = hook.get_secret(secret_name)
         assert secret == secret_value
@@ -60,12 +65,17 @@ class TestSecretsManagerHook(unittest.TestCase):
         secret_value = '{"user": "test"}'
         hook = SecretsManagerHook(aws_conn_id='aws_default')
 
-        param = {
+        create_param = {
+            'Name': secret_name,
+        }
+
+        put_param = {
             'SecretId': secret_name,
             'SecretString': secret_value,
         }
 
-        hook.get_conn().put_secret_value(**param)
+        hook.get_conn().create_secret(**create_param)
+        hook.get_conn().put_secret_value(**put_param)
 
         secret = hook.get_secret_as_dict(secret_name)
         assert secret == json.loads(secret_value)
@@ -76,13 +86,17 @@ class TestSecretsManagerHook(unittest.TestCase):
         secret_name = "arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
         secret_value_binary = base64.b64encode(b'{"username": "test"}')
         hook = SecretsManagerHook(aws_conn_id='aws_default')
+        create_param = {
+            'Name': secret_name,
+        }
 
-        param = {
+        put_param = {
             'SecretId': secret_name,
             'SecretBinary': secret_value_binary,
         }
 
-        hook.get_conn().put_secret_value(**param)
+        hook.get_conn().create_secret(**create_param)
+        hook.get_conn().put_secret_value(**put_param)
 
         secret = hook.get_secret(secret_name)
         assert secret == base64.b64decode(secret_value_binary)
diff --git a/tests/providers/amazon/aws/secrets/test_secrets_manager.py b/tests/providers/amazon/aws/secrets/test_secrets_manager.py
index d45f1aa..c84dc6a 100644
--- a/tests/providers/amazon/aws/secrets/test_secrets_manager.py
+++ b/tests/providers/amazon/aws/secrets/test_secrets_manager.py
@@ -32,12 +32,19 @@ class TestSecretsManagerBackend(TestCase):
 
     @mock_secretsmanager
     def test_get_conn_uri(self):
+
+        secret_id = 'airflow/connections/test_postgres'
+        create_param = {
+            'Name': secret_id,
+        }
+
         param = {
-            'SecretId': 'airflow/connections/test_postgres',
+            'SecretId': secret_id,
             'SecretString': 'postgresql://airflow:airflow@host:5432/airflow',
         }
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         returned_uri = secrets_manager_backend.get_conn_uri(conn_id="test_postgres")
@@ -50,12 +57,19 @@ class TestSecretsManagerBackend(TestCase):
         SecretsManagerBackend.get_connections should return None
         """
         conn_id = "test_mysql"
+
+        secret_id = 'airflow/connections/test_postgres'
+        create_param = {
+            'Name': secret_id,
+        }
+
         param = {
-            'SecretId': 'airflow/connections/test_postgres',
+            'SecretId': secret_id,
             'SecretString': 'postgresql://airflow:airflow@host:5432/airflow',
         }
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         assert secrets_manager_backend.get_conn_uri(conn_id=conn_id) is None
@@ -63,9 +77,16 @@ class TestSecretsManagerBackend(TestCase):
 
     @mock_secretsmanager
     def test_get_variable(self):
-        param = {'SecretId': 'airflow/variables/hello', 'SecretString': 'world'}
+
+        secret_id = 'airflow/variables/hello'
+        create_param = {
+            'Name': secret_id,
+        }
+
+        param = {'SecretId': secret_id, 'SecretString': 'world'}
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         returned_uri = secrets_manager_backend.get_variable('hello')
@@ -77,9 +98,14 @@ class TestSecretsManagerBackend(TestCase):
         Test that if Variable key is not present,
         SystemsManagerParameterStoreBackend.get_variables should return None
         """
-        param = {'SecretId': 'airflow/variables/hello', 'SecretString': 'world'}
+        secret_id = 'airflow/variables/hello'
+        create_param = {
+            'Name': secret_id,
+        }
+        param = {'SecretId': secret_id, 'SecretString': 'world'}
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         assert secrets_manager_backend.get_variable("test_mysql") is None
diff --git a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
index c6f7736..436b8e8 100644
--- a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
@@ -18,6 +18,7 @@
 
 import unittest
 from collections import OrderedDict
+from contextlib import closing
 from os import path
 from unittest import mock
 
@@ -129,24 +130,25 @@ class TestTransfer(unittest.TestCase):
             'AIRFLOW_CTX_DAG_EMAIL': 'test@airflow.com',
         }
 
-        with MySqlHook().get_conn() as cur:
-            cur.execute(
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cur:
+                cur.execute(
+                    '''
+                CREATE TABLE IF NOT EXISTS baby_names (
+                  org_year integer(4),
+                  baby_name VARCHAR(25),
+                  rate FLOAT(7,6),
+                  sex VARCHAR(4)
+                )
                 '''
-            CREATE TABLE IF NOT EXISTS baby_names (
-              org_year integer(4),
-              baby_name VARCHAR(25),
-              rate FLOAT(7,6),
-              sex VARCHAR(4)
-            )
-            '''
-            )
-
-        for row in rows:
-            cur.execute("INSERT INTO baby_names VALUES(%s, %s, %s, %s);", row)
+                )
+                for row in rows:
+                    cur.execute("INSERT INTO baby_names VALUES(%s, %s, %s, %s);", row)
 
     def tearDown(self):
-        with MySqlHook().get_conn() as cur:
-            cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;")
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cur:
+                cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;")
 
     @mock.patch('subprocess.Popen')
     def test_mysql_to_hive(self, mock_popen):
@@ -314,11 +316,12 @@ class TestTransfer(unittest.TestCase):
         hook = MySqlHook()
 
         try:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
-                conn.execute(
-                    """
-                    CREATE TABLE {} (
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+                    cursor.execute(
+                        """
+                        CREATE TABLE {} (
                         c0 TINYINT,
                         c1 SMALLINT,
                         c2 MEDIUMINT,
@@ -327,9 +330,9 @@ class TestTransfer(unittest.TestCase):
                         c5 TIMESTAMP
                     )
                 """.format(
-                        mysql_table
+                            mysql_table
+                        )
                     )
-                )
 
             op = MySqlToHiveOperator(
                 task_id='test_m2h',
@@ -350,8 +353,9 @@ class TestTransfer(unittest.TestCase):
             ordered_dict["c5"] = "TIMESTAMP"
             assert mock_load_file.call_args[1]["field_dict"] == ordered_dict
         finally:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
 
     @mock.patch('subprocess.Popen')
     def test_mysql_to_hive_verify_csv_special_char(self, mock_popen):
@@ -365,27 +369,28 @@ class TestTransfer(unittest.TestCase):
 
         try:
             db_record = ('c0', '["true"]')
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
-                conn.execute(
-                    """
-                    CREATE TABLE {} (
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+                    cursor.execute(
+                        """
+                        CREATE TABLE {} (
                         c0 VARCHAR(25),
                         c1 VARCHAR(25)
                     )
                 """.format(
-                        mysql_table
-                    )
-                )
-                conn.execute(
-                    """
-                    INSERT INTO {} VALUES (
-                        '{}', '{}'
+                            mysql_table
+                        )
                     )
-                """.format(
-                        mysql_table, *db_record
+                    cursor.execute(
+                        """
+                        INSERT INTO {} VALUES (
+                            '{}', '{}'
+                        )
+                    """.format(
+                            mysql_table, *db_record
+                        )
                     )
-                )
 
             with mock.patch.dict('os.environ', self.env_vars):
                 import unicodecsv as csv
@@ -445,8 +450,9 @@ class TestTransfer(unittest.TestCase):
                 close_fds=True,
             )
         finally:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
 
     @mock.patch('subprocess.Popen')
     def test_mysql_to_hive_verify_loaded_values(self, mock_popen):
@@ -472,11 +478,12 @@ class TestTransfer(unittest.TestCase):
                 -9223372036854775808,
             )
 
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
-                conn.execute(
-                    """
-                    CREATE TABLE {} (
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+                    cursor.execute(
+                        """
+                        CREATE TABLE {} (
                         c0 TINYINT   UNSIGNED,
                         c1 SMALLINT  UNSIGNED,
                         c2 MEDIUMINT UNSIGNED,
@@ -489,18 +496,18 @@ class TestTransfer(unittest.TestCase):
                         c9 BIGINT
                     )
                 """.format(
-                        mysql_table
+                            mysql_table
+                        )
                     )
-                )
-                conn.execute(
-                    """
-                    INSERT INTO {} VALUES (
-                        {}, {}, {}, {}, {}, {}, {}, {}, {}, {}
+                    cursor.execute(
+                        """
+                        INSERT INTO {} VALUES (
+                            {}, {}, {}, {}, {}, {}, {}, {}, {}, {}
+                        )
+                    """.format(
+                            mysql_table, *minmax
+                        )
                     )
-                """.format(
-                        mysql_table, *minmax
-                    )
-                )
 
             with mock.patch.dict('os.environ', self.env_vars):
                 op = MySqlToHiveOperator(
@@ -556,5 +563,6 @@ class TestTransfer(unittest.TestCase):
                 )
 
         finally:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
diff --git a/tests/providers/google/cloud/operators/test_stackdriver.py b/tests/providers/google/cloud/operators/test_stackdriver.py
index 50dd997..6063c92 100644
--- a/tests/providers/google/cloud/operators/test_stackdriver.py
+++ b/tests/providers/google/cloud/operators/test_stackdriver.py
@@ -185,17 +185,33 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
             timeout=DEFAULT,
             metadata=None,
         )
-        assert [
-            {
-                'description': '',
-                'display_name': '',
-                'labels': {},
-                'name': 'test-123',
-                'type_': '',
-                'user_labels': {},
-                'verification_status': 0,
-            }
-        ] == result
+        # Depending on the version of google-apitools installed we might receive the response either with or
+        # without mutation_records.
+        assert result in [
+            [
+                {
+                    'description': '',
+                    'display_name': '',
+                    'labels': {},
+                    'name': 'test-123',
+                    'type_': '',
+                    'user_labels': {},
+                    'verification_status': 0,
+                }
+            ],
+            [
+                {
+                    'description': '',
+                    'display_name': '',
+                    'labels': {},
+                    'mutation_records': [],
+                    'name': 'test-123',
+                    'type_': '',
+                    'user_labels': {},
+                    'verification_status': 0,
+                }
+            ],
+        ]
 
 
 class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):
diff --git a/tests/providers/mysql/hooks/test_mysql.py b/tests/providers/mysql/hooks/test_mysql.py
index 538381f..9e1155d 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -21,6 +21,7 @@ import json
 import os
 import unittest
 import uuid
+from contextlib import closing
 from unittest import mock
 
 import MySQLdb.cursors
@@ -348,9 +349,10 @@ class TestMySql(unittest.TestCase):
 
     def tearDown(self):
         drop_tables = {'test_mysql_to_mysql', 'test_airflow'}
-        with MySqlHook().get_conn() as conn:
-            for table in drop_tables:
-                conn.execute(f"DROP TABLE IF EXISTS {table}")
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cursor:
+                for table in drop_tables:
+                    cursor.execute(f"DROP TABLE IF EXISTS {table}")
 
     @parameterized.expand(
         [
@@ -375,19 +377,20 @@ class TestMySql(unittest.TestCase):
                 f.flush()
 
                 hook = MySqlHook('airflow_db')
-                with hook.get_conn() as conn:
-                    conn.execute(
+                with closing(hook.get_conn()) as conn:
+                    with closing(conn.cursor()) as cursor:
+                        cursor.execute(
+                            """
+                            CREATE TABLE IF NOT EXISTS test_airflow (
+                                dummy VARCHAR(50)
+                            )
                         """
-                        CREATE TABLE IF NOT EXISTS test_airflow (
-                            dummy VARCHAR(50)
                         )
-                    """
-                    )
-                    conn.execute("TRUNCATE TABLE test_airflow")
-                    hook.bulk_load("test_airflow", f.name)
-                    conn.execute("SELECT dummy FROM test_airflow")
-                    results = tuple(result[0] for result in conn.fetchall())
-                    assert sorted(results) == sorted(records)
+                        cursor.execute("TRUNCATE TABLE test_airflow")
+                        hook.bulk_load("test_airflow", f.name)
+                        cursor.execute("SELECT dummy FROM test_airflow")
+                        results = tuple(result[0] for result in cursor.fetchall())
+                        assert sorted(results) == sorted(records)
 
     @parameterized.expand(
         [
diff --git a/tests/providers/mysql/operators/test_mysql.py b/tests/providers/mysql/operators/test_mysql.py
index af5cc25..95b42a3 100644
--- a/tests/providers/mysql/operators/test_mysql.py
+++ b/tests/providers/mysql/operators/test_mysql.py
@@ -16,6 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import unittest
+from contextlib import closing
 
 import pytest
 from parameterized import parameterized
@@ -41,9 +42,10 @@ class TestMySql(unittest.TestCase):
 
     def tearDown(self):
         drop_tables = {'test_mysql_to_mysql', 'test_airflow'}
-        with MySqlHook().get_conn() as conn:
-            for table in drop_tables:
-                conn.execute(f"DROP TABLE IF EXISTS {table}")
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cursor:
+                for table in drop_tables:
+                    cursor.execute(f"DROP TABLE IF EXISTS {table}")
 
     @parameterized.expand(
         [