You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/27 20:24:30 UTC

[airflow] branch master updated: Upgrades moto to newer version (~=2.0) (#15051)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new e8aa3de  Upgrades moto to newer version (~=2.0) (#15051)
e8aa3de is described below

commit e8aa3de4bbd218d9e3b899ee221a2ea8e3f3a6de
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Mar 27 21:24:09 2021 +0100

    Upgrades moto to newer version (~=2.0) (#15051)
    
    According to https://github.com/spulec/moto/issues/3535#issuecomment-808706939
    1.3.17 version of moto with a fix to be compatible with mock> 4.0.3 is
    not going to be released because of breaking changes. Therefore we need
    to migrate to newer version of moto.
    
    At the same time we can get rid of the old botocore limitation, which
    was added apparently to handle some test errors. We are relying fully
    on what boto3 depends on.
    
    Upgrading dependencies also discovered that mysql tests need to
    be fixed because upgraded version of dependencies cause some test
    failure (those turned out to be badly written tests).
---
 setup.py                                           |   9 +-
 tests/operators/test_generic_transfer.py           |   7 +-
 tests/providers/amazon/aws/.gitignore              |   1 +
 tests/providers/amazon/aws/hooks/test_s3.py        |  28 ++--
 .../amazon/aws/hooks/test_secrets_manager.py       |  26 +++-
 .../amazon/aws/secrets/test_secrets_manager.py     |  34 ++++-
 .../apache/hive/transfers/test_mysql_to_hive.py    | 156 +++++++++++----------
 .../google/cloud/operators/test_stackdriver.py     |  38 +++--
 tests/providers/mysql/hooks/test_mysql.py          |  31 ++--
 tests/providers/mysql/operators/test_mysql.py      |   8 +-
 10 files changed, 203 insertions(+), 135 deletions(-)

diff --git a/setup.py b/setup.py
index 79c37ec..6a8aa00 100644
--- a/setup.py
+++ b/setup.py
@@ -195,7 +195,6 @@ def get_sphinx_theme_version() -> str:
 # Start dependencies group
 amazon = [
     'boto3>=1.15.0,<1.18.0',
-    'botocore>=1.18.0,<1.19.0',
     'watchtower~=0.7.3',
 ]
 apache_beam = [
@@ -469,6 +468,7 @@ zendesk = [
 # End dependencies group
 
 devel = [
+    'aws_xray_sdk',
     'beautifulsoup4~=4.7.1',
     'black',
     'blinker',
@@ -486,11 +486,9 @@ devel = [
     'ipdb',
     'jira',
     'jsonpath-ng',
-    # HACK: Moto is not compatible with newer versions
-    # See: https://github.com/spulec/moto/issues/3535
-    'mock<4.0.3',
+    'jsondiff',
     'mongomock',
-    'moto<2',
+    'moto~=2.0',
     'mypy==0.770',
     'parameterized',
     'paramiko',
@@ -504,6 +502,7 @@ devel = [
     'pytest-rerunfailures~=9.1',
     'pytest-timeouts',
     'pytest-xdist',
+    'python-jose',
     'pywinrm',
     'qds-sdk>=1.9.6',
     'requests_mock',
diff --git a/tests/operators/test_generic_transfer.py b/tests/operators/test_generic_transfer.py
index a94b56f..4780b41 100644
--- a/tests/operators/test_generic_transfer.py
+++ b/tests/operators/test_generic_transfer.py
@@ -17,6 +17,7 @@
 # under the License.
 
 import unittest
+from contextlib import closing
 
 import pytest
 from parameterized import parameterized
@@ -43,9 +44,11 @@ class TestMySql(unittest.TestCase):
 
     def tearDown(self):
         drop_tables = {'test_mysql_to_mysql', 'test_airflow'}
-        with MySqlHook().get_conn() as conn:
+        with closing(MySqlHook().get_conn()) as conn:
             for table in drop_tables:
-                conn.execute(f"DROP TABLE IF EXISTS {table}")
+                # Previous version tried to run execute directly on dbapi call, which was accidentally working
+                with closing(conn.cursor()) as cur:
+                    cur.execute(f"DROP TABLE IF EXISTS {table}")
 
     @parameterized.expand(
         [
diff --git a/tests/providers/amazon/aws/.gitignore b/tests/providers/amazon/aws/.gitignore
new file mode 100644
index 0000000..4083037
--- /dev/null
+++ b/tests/providers/amazon/aws/.gitignore
@@ -0,0 +1 @@
+local
diff --git a/tests/providers/amazon/aws/hooks/test_s3.py b/tests/providers/amazon/aws/hooks/test_s3.py
index 837f9f0..b56d4fc 100644
--- a/tests/providers/amazon/aws/hooks/test_s3.py
+++ b/tests/providers/amazon/aws/hooks/test_s3.py
@@ -36,6 +36,18 @@ except ImportError:
     mock_s3 = None
 
 
+# This class needs to be separated out because if there are earlier mocks in the same class
+# the tests will fail on teardown.
+class TestAwsS3HookNoMock:
+    def test_check_for_bucket_raises_error_with_invalid_conn_id(self, monkeypatch):
+        monkeypatch.delenv('AWS_PROFILE', raising=False)
+        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)
+        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)
+        hook = S3Hook(aws_conn_id="does_not_exist")
+        with pytest.raises(NoCredentialsError):
+            hook.check_for_bucket("test-non-existing-bucket")
+
+
 @pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
 class TestAwsS3Hook:
     @mock_s3
@@ -62,14 +74,6 @@ class TestAwsS3Hook:
         assert hook.check_for_bucket(s3_bucket) is True
         assert hook.check_for_bucket('not-a-bucket') is False
 
-    def test_check_for_bucket_raises_error_with_invalid_conn_id(self, s3_bucket, monkeypatch):
-        monkeypatch.delenv('AWS_PROFILE', raising=False)
-        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)
-        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)
-        hook = S3Hook(aws_conn_id="does_not_exist")
-        with pytest.raises(NoCredentialsError):
-            hook.check_for_bucket(s3_bucket)
-
     @mock_s3
     def test_get_bucket(self):
         hook = S3Hook()
@@ -166,14 +170,6 @@ class TestAwsS3Hook:
         assert hook.check_for_key('b', s3_bucket) is False
         assert hook.check_for_key(f's3://{s3_bucket}//b') is False
 
-    def test_check_for_key_raises_error_with_invalid_conn_id(self, monkeypatch, s3_bucket):
-        monkeypatch.delenv('AWS_PROFILE', raising=False)
-        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)
-        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)
-        hook = S3Hook(aws_conn_id="does_not_exist")
-        with pytest.raises(NoCredentialsError):
-            hook.check_for_key('a', s3_bucket)
-
     def test_get_key(self, s3_bucket):
         hook = S3Hook()
         bucket = hook.get_bucket(s3_bucket)
diff --git a/tests/providers/amazon/aws/hooks/test_secrets_manager.py b/tests/providers/amazon/aws/hooks/test_secrets_manager.py
index bfcd847..b41b381 100644
--- a/tests/providers/amazon/aws/hooks/test_secrets_manager.py
+++ b/tests/providers/amazon/aws/hooks/test_secrets_manager.py
@@ -43,12 +43,17 @@ class TestSecretsManagerHook(unittest.TestCase):
         secret_value = '{"user": "test"}'
         hook = SecretsManagerHook(aws_conn_id='aws_default')
 
-        param = {
+        create_param = {
+            'Name': secret_name,
+        }
+
+        put_param = {
             'SecretId': secret_name,
             'SecretString': secret_value,
         }
 
-        hook.get_conn().put_secret_value(**param)
+        hook.get_conn().create_secret(**create_param)
+        hook.get_conn().put_secret_value(**put_param)
 
         secret = hook.get_secret(secret_name)
         assert secret == secret_value
@@ -60,12 +65,17 @@ class TestSecretsManagerHook(unittest.TestCase):
         secret_value = '{"user": "test"}'
         hook = SecretsManagerHook(aws_conn_id='aws_default')
 
-        param = {
+        create_param = {
+            'Name': secret_name,
+        }
+
+        put_param = {
             'SecretId': secret_name,
             'SecretString': secret_value,
         }
 
-        hook.get_conn().put_secret_value(**param)
+        hook.get_conn().create_secret(**create_param)
+        hook.get_conn().put_secret_value(**put_param)
 
         secret = hook.get_secret_as_dict(secret_name)
         assert secret == json.loads(secret_value)
@@ -76,13 +86,17 @@ class TestSecretsManagerHook(unittest.TestCase):
         secret_name = "arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
         secret_value_binary = base64.b64encode(b'{"username": "test"}')
         hook = SecretsManagerHook(aws_conn_id='aws_default')
+        create_param = {
+            'Name': secret_name,
+        }
 
-        param = {
+        put_param = {
             'SecretId': secret_name,
             'SecretBinary': secret_value_binary,
         }
 
-        hook.get_conn().put_secret_value(**param)
+        hook.get_conn().create_secret(**create_param)
+        hook.get_conn().put_secret_value(**put_param)
 
         secret = hook.get_secret(secret_name)
         assert secret == base64.b64decode(secret_value_binary)
diff --git a/tests/providers/amazon/aws/secrets/test_secrets_manager.py b/tests/providers/amazon/aws/secrets/test_secrets_manager.py
index d45f1aa..c84dc6a 100644
--- a/tests/providers/amazon/aws/secrets/test_secrets_manager.py
+++ b/tests/providers/amazon/aws/secrets/test_secrets_manager.py
@@ -32,12 +32,19 @@ class TestSecretsManagerBackend(TestCase):
 
     @mock_secretsmanager
     def test_get_conn_uri(self):
+
+        secret_id = 'airflow/connections/test_postgres'
+        create_param = {
+            'Name': secret_id,
+        }
+
         param = {
-            'SecretId': 'airflow/connections/test_postgres',
+            'SecretId': secret_id,
             'SecretString': 'postgresql://airflow:airflow@host:5432/airflow',
         }
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         returned_uri = secrets_manager_backend.get_conn_uri(conn_id="test_postgres")
@@ -50,12 +57,19 @@ class TestSecretsManagerBackend(TestCase):
         SecretsManagerBackend.get_connections should return None
         """
         conn_id = "test_mysql"
+
+        secret_id = 'airflow/connections/test_postgres'
+        create_param = {
+            'Name': secret_id,
+        }
+
         param = {
-            'SecretId': 'airflow/connections/test_postgres',
+            'SecretId': secret_id,
             'SecretString': 'postgresql://airflow:airflow@host:5432/airflow',
         }
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         assert secrets_manager_backend.get_conn_uri(conn_id=conn_id) is None
@@ -63,9 +77,16 @@ class TestSecretsManagerBackend(TestCase):
 
     @mock_secretsmanager
     def test_get_variable(self):
-        param = {'SecretId': 'airflow/variables/hello', 'SecretString': 'world'}
+
+        secret_id = 'airflow/variables/hello'
+        create_param = {
+            'Name': secret_id,
+        }
+
+        param = {'SecretId': secret_id, 'SecretString': 'world'}
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         returned_uri = secrets_manager_backend.get_variable('hello')
@@ -77,9 +98,14 @@ class TestSecretsManagerBackend(TestCase):
         Test that if Variable key is not present,
         SystemsManagerParameterStoreBackend.get_variables should return None
         """
-        param = {'SecretId': 'airflow/variables/hello', 'SecretString': 'world'}
+        secret_id = 'airflow/variables/hello'
+        create_param = {
+            'Name': secret_id,
+        }
+        param = {'SecretId': secret_id, 'SecretString': 'world'}
 
         secrets_manager_backend = SecretsManagerBackend()
+        secrets_manager_backend.client.create_secret(**create_param)
         secrets_manager_backend.client.put_secret_value(**param)
 
         assert secrets_manager_backend.get_variable("test_mysql") is None
diff --git a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
index 0413b5c..1b7e2cd 100644
--- a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
@@ -18,6 +18,7 @@
 
 import unittest
 from collections import OrderedDict
+from contextlib import closing
 from os import path
 from unittest import mock
 
@@ -129,24 +130,25 @@ class TestTransfer(unittest.TestCase):
             'AIRFLOW_CTX_DAG_EMAIL': 'test@airflow.com',
         }
 
-        with MySqlHook().get_conn() as cur:
-            cur.execute(
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cur:
+                cur.execute(
+                    '''
+                CREATE TABLE IF NOT EXISTS baby_names (
+                  org_year integer(4),
+                  baby_name VARCHAR(25),
+                  rate FLOAT(7,6),
+                  sex VARCHAR(4)
+                )
                 '''
-            CREATE TABLE IF NOT EXISTS baby_names (
-              org_year integer(4),
-              baby_name VARCHAR(25),
-              rate FLOAT(7,6),
-              sex VARCHAR(4)
-            )
-            '''
-            )
-
-        for row in rows:
-            cur.execute("INSERT INTO baby_names VALUES(%s, %s, %s, %s);", row)
+                )
+                for row in rows:
+                    cur.execute("INSERT INTO baby_names VALUES(%s, %s, %s, %s);", row)
 
     def tearDown(self):
-        with MySqlHook().get_conn() as cur:
-            cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;")
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cur:
+                cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;")
 
     @mock.patch('subprocess.Popen')
     def test_mysql_to_hive(self, mock_popen):
@@ -314,20 +316,21 @@ class TestTransfer(unittest.TestCase):
         hook = MySqlHook()
 
         try:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
-                conn.execute(
-                    f"""
-                    CREATE TABLE {mysql_table} (
-                        c0 TINYINT,
-                        c1 SMALLINT,
-                        c2 MEDIUMINT,
-                        c3 INT,
-                        c4 BIGINT,
-                        c5 TIMESTAMP
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+                    cursor.execute(
+                        f"""
+                        CREATE TABLE {mysql_table} (
+                            c0 TINYINT,
+                            c1 SMALLINT,
+                            c2 MEDIUMINT,
+                            c3 INT,
+                            c4 BIGINT,
+                            c5 TIMESTAMP
+                        )
+                    """
                     )
-                """
-                )
 
             op = MySqlToHiveOperator(
                 task_id='test_m2h',
@@ -348,8 +351,9 @@ class TestTransfer(unittest.TestCase):
             ordered_dict["c5"] = "TIMESTAMP"
             assert mock_load_file.call_args[1]["field_dict"] == ordered_dict
         finally:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
 
     @mock.patch('subprocess.Popen')
     def test_mysql_to_hive_verify_csv_special_char(self, mock_popen):
@@ -363,25 +367,26 @@ class TestTransfer(unittest.TestCase):
 
         try:
             db_record = ('c0', '["true"]')
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
-                conn.execute(
-                    f"""
-                    CREATE TABLE {mysql_table} (
-                        c0 VARCHAR(25),
-                        c1 VARCHAR(25)
-                    )
-                """
-                )
-                conn.execute(
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+                    cursor.execute(
+                        f"""
+                        CREATE TABLE {mysql_table} (
+                            c0 VARCHAR(25),
+                            c1 VARCHAR(25)
+                        )
                     """
-                    INSERT INTO {} VALUES (
-                        '{}', '{}'
                     )
-                """.format(
-                        mysql_table, *db_record
+                    cursor.execute(
+                        """
+                        INSERT INTO {} VALUES (
+                            '{}', '{}'
+                        )
+                    """.format(
+                            mysql_table, *db_record
+                        )
                     )
-                )
 
             with mock.patch.dict('os.environ', self.env_vars):
                 import unicodecsv as csv
@@ -441,8 +446,9 @@ class TestTransfer(unittest.TestCase):
                 close_fds=True,
             )
         finally:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
 
     @mock.patch('subprocess.Popen')
     def test_mysql_to_hive_verify_loaded_values(self, mock_popen):
@@ -468,33 +474,34 @@ class TestTransfer(unittest.TestCase):
                 -9223372036854775808,
             )
 
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
-                conn.execute(
-                    f"""
-                    CREATE TABLE {mysql_table} (
-                        c0 TINYINT   UNSIGNED,
-                        c1 SMALLINT  UNSIGNED,
-                        c2 MEDIUMINT UNSIGNED,
-                        c3 INT       UNSIGNED,
-                        c4 BIGINT    UNSIGNED,
-                        c5 TINYINT,
-                        c6 SMALLINT,
-                        c7 MEDIUMINT,
-                        c8 INT,
-                        c9 BIGINT
-                    )
-                """
-                )
-                conn.execute(
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+                    cursor.execute(
+                        f"""
+                        CREATE TABLE {mysql_table} (
+                            c0 TINYINT   UNSIGNED,
+                            c1 SMALLINT  UNSIGNED,
+                            c2 MEDIUMINT UNSIGNED,
+                            c3 INT       UNSIGNED,
+                            c4 BIGINT    UNSIGNED,
+                            c5 TINYINT,
+                            c6 SMALLINT,
+                            c7 MEDIUMINT,
+                            c8 INT,
+                            c9 BIGINT
+                        )
                     """
-                    INSERT INTO {} VALUES (
-                        {}, {}, {}, {}, {}, {}, {}, {}, {}, {}
                     )
-                """.format(
-                        mysql_table, *minmax
+                    cursor.execute(
+                        """
+                        INSERT INTO {} VALUES (
+                            {}, {}, {}, {}, {}, {}, {}, {}, {}, {}
+                        )
+                    """.format(
+                            mysql_table, *minmax
+                        )
                     )
-                )
 
             with mock.patch.dict('os.environ', self.env_vars):
                 op = MySqlToHiveOperator(
@@ -550,5 +557,6 @@ class TestTransfer(unittest.TestCase):
                 )
 
         finally:
-            with hook.get_conn() as conn:
-                conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
+            with closing(hook.get_conn()) as conn:
+                with closing(conn.cursor()) as cursor:
+                    cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}")
diff --git a/tests/providers/google/cloud/operators/test_stackdriver.py b/tests/providers/google/cloud/operators/test_stackdriver.py
index 50dd997..6063c92 100644
--- a/tests/providers/google/cloud/operators/test_stackdriver.py
+++ b/tests/providers/google/cloud/operators/test_stackdriver.py
@@ -185,17 +185,33 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
             timeout=DEFAULT,
             metadata=None,
         )
-        assert [
-            {
-                'description': '',
-                'display_name': '',
-                'labels': {},
-                'name': 'test-123',
-                'type_': '',
-                'user_labels': {},
-                'verification_status': 0,
-            }
-        ] == result
+        # Depending on the version of google-apitools installed we might receive the response either with or
+        # without mutation_records.
+        assert result in [
+            [
+                {
+                    'description': '',
+                    'display_name': '',
+                    'labels': {},
+                    'name': 'test-123',
+                    'type_': '',
+                    'user_labels': {},
+                    'verification_status': 0,
+                }
+            ],
+            [
+                {
+                    'description': '',
+                    'display_name': '',
+                    'labels': {},
+                    'mutation_records': [],
+                    'name': 'test-123',
+                    'type_': '',
+                    'user_labels': {},
+                    'verification_status': 0,
+                }
+            ],
+        ]
 
 
 class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):
diff --git a/tests/providers/mysql/hooks/test_mysql.py b/tests/providers/mysql/hooks/test_mysql.py
index 19f0bd8..a17be83 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -21,6 +21,7 @@ import json
 import os
 import unittest
 import uuid
+from contextlib import closing
 from unittest import mock
 
 import MySQLdb.cursors
@@ -348,9 +349,10 @@ class TestMySql(unittest.TestCase):
 
     def tearDown(self):
         drop_tables = {'test_mysql_to_mysql', 'test_airflow'}
-        with MySqlHook().get_conn() as conn:
-            for table in drop_tables:
-                conn.execute(f"DROP TABLE IF EXISTS {table}")
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cursor:
+                for table in drop_tables:
+                    cursor.execute(f"DROP TABLE IF EXISTS {table}")
 
     @parameterized.expand(
         [
@@ -375,19 +377,20 @@ class TestMySql(unittest.TestCase):
                 f.flush()
 
                 hook = MySqlHook('airflow_db')
-                with hook.get_conn() as conn:
-                    conn.execute(
+                with closing(hook.get_conn()) as conn:
+                    with closing(conn.cursor()) as cursor:
+                        cursor.execute(
+                            """
+                            CREATE TABLE IF NOT EXISTS test_airflow (
+                                dummy VARCHAR(50)
+                            )
                         """
-                        CREATE TABLE IF NOT EXISTS test_airflow (
-                            dummy VARCHAR(50)
                         )
-                    """
-                    )
-                    conn.execute("TRUNCATE TABLE test_airflow")
-                    hook.bulk_load("test_airflow", f.name)
-                    conn.execute("SELECT dummy FROM test_airflow")
-                    results = tuple(result[0] for result in conn.fetchall())
-                    assert sorted(results) == sorted(records)
+                        cursor.execute("TRUNCATE TABLE test_airflow")
+                        hook.bulk_load("test_airflow", f.name)
+                        cursor.execute("SELECT dummy FROM test_airflow")
+                        results = tuple(result[0] for result in cursor.fetchall())
+                        assert sorted(results) == sorted(records)
 
     @parameterized.expand(
         [
diff --git a/tests/providers/mysql/operators/test_mysql.py b/tests/providers/mysql/operators/test_mysql.py
index 73a5b73..c8d3128 100644
--- a/tests/providers/mysql/operators/test_mysql.py
+++ b/tests/providers/mysql/operators/test_mysql.py
@@ -16,6 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import unittest
+from contextlib import closing
 
 import pytest
 from parameterized import parameterized
@@ -41,9 +42,10 @@ class TestMySql(unittest.TestCase):
 
     def tearDown(self):
         drop_tables = {'test_mysql_to_mysql', 'test_airflow'}
-        with MySqlHook().get_conn() as conn:
-            for table in drop_tables:
-                conn.execute(f"DROP TABLE IF EXISTS {table}")
+        with closing(MySqlHook().get_conn()) as conn:
+            with closing(conn.cursor()) as cursor:
+                for table in drop_tables:
+                    cursor.execute(f"DROP TABLE IF EXISTS {table}")
 
     @parameterized.expand(
         [