You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2018/05/23 20:58:09 UTC

incubator-airflow git commit: [AIRFLOW-2472] Implement MySqlHook.bulk_dump

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f1ac67bdc -> 3bdb34e77


[AIRFLOW-2472] Implement MySqlHook.bulk_dump

Implement MySqlHook.bulk_dump since the opposite
operation bulk_load is already implemented.
This PR also addresses some flake8 warnings.

Closes #3385 from sekikn/AIRFLOW-2472


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3bdb34e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3bdb34e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3bdb34e7

Branch: refs/heads/master
Commit: 3bdb34e7775fdb85e3dd6cd92a47f91c80b1f67b
Parents: f1ac67b
Author: Kengo Seki <se...@apache.org>
Authored: Wed May 23 13:58:04 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Wed May 23 13:58:04 2018 -0700

----------------------------------------------------------------------
 airflow/hooks/mysql_hook.py                     | 12 ++++
 airflow/utils/tests.py                          |  8 +++
 tests/hooks/test_hive_hook.py                   |  9 +--
 tests/operators/operators.py                    | 75 ++++++++++++++------
 tests/operators/test_redshift_to_s3_operator.py | 11 +--
 tests/operators/test_s3_to_redshift_operator.py |  8 +--
 6 files changed, 79 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/airflow/hooks/mysql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/mysql_hook.py b/airflow/hooks/mysql_hook.py
index d202e55..f52b60c 100644
--- a/airflow/hooks/mysql_hook.py
+++ b/airflow/hooks/mysql_hook.py
@@ -89,6 +89,18 @@ class MySqlHook(DbApiHook):
             """.format(**locals()))
         conn.commit()
 
+    def bulk_dump(self, table, tmp_file):
+        """
+        Dumps a database table into a tab-delimited file
+        """
+        conn = self.get_conn()
+        cur = conn.cursor()
+        cur.execute("""
+            SELECT * INTO OUTFILE '{tmp_file}'
+            FROM {table}
+            """.format(**locals()))
+        conn.commit()
+
     @staticmethod
     def _serialize_cell(cell, conn):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/airflow/utils/tests.py
----------------------------------------------------------------------
diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py
index 9982a07..6f29ffc 100644
--- a/airflow/utils/tests.py
+++ b/airflow/utils/tests.py
@@ -17,8 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import re
 import unittest
 
+
 def skipUnlessImported(module, obj):
     import importlib
     try:
@@ -29,3 +31,9 @@ def skipUnlessImported(module, obj):
         obj in dir(m),
         "Skipping test because {} could not be imported from {}".format(
             obj, module))
+
+
+def assertEqualIgnoreMultipleSpaces(case, first, second, msg=None):
+    def _trim(s):
+        re.sub("\s+", " ", s.strip())
+    return case.assertEqual(_trim(first), _trim(second), msg)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/hooks/test_hive_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py
index d090086..690b5ad 100644
--- a/tests/hooks/test_hive_hook.py
+++ b/tests/hooks/test_hive_hook.py
@@ -22,7 +22,6 @@ import datetime
 import itertools
 import pandas as pd
 import random
-import re
 
 import mock
 import unittest
@@ -34,6 +33,7 @@ from airflow.exceptions import AirflowException
 from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook
 from airflow import DAG, configuration, operators
 from airflow.utils import timezone
+from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
 
 
 configuration.load_test_config()
@@ -188,12 +188,7 @@ class TestHiveCliHook(unittest.TestCase):
             STORED AS textfile
             ;
         """
-
-        def _trim(s):
-            return re.sub("\s+", " ", s.strip())
-
-        self.assertEqual(_trim(mock_run_cli.call_args_list[0][0][0]),
-                         _trim(query))
+        assertEqualIgnoreMultipleSpaces(self, mock_run_cli.call_args_list[0][0][0], query)
 
 
 class TestHiveMetastoreHook(HiveEnvironmentTest):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 9fc2c93..f795fd0 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -23,10 +23,12 @@ from airflow import DAG, configuration, operators
 from airflow.utils.tests import skipUnlessImported
 from airflow.utils import timezone
 
-configuration.load_test_config()
-
+import os
+import mock
 import unittest
 
+configuration.load_test_config()
+
 DEFAULT_DATE = timezone.datetime(2015, 1, 1)
 DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
 DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
@@ -51,8 +53,8 @@ class MySqlTest(unittest.TestCase):
             dummy VARCHAR(50)
         );
         """
-        import airflow.operators.mysql_operator
-        t = operators.mysql_operator.MySqlOperator(
+        from airflow.operators.mysql_operator import MySqlOperator
+        t = MySqlOperator(
             task_id='basic_mysql',
             sql=sql,
             mysql_conn_id='airflow_db',
@@ -64,8 +66,8 @@ class MySqlTest(unittest.TestCase):
             "TRUNCATE TABLE test_airflow",
             "INSERT INTO test_airflow VALUES ('X')",
         ]
-        import airflow.operators.mysql_operator
-        t = operators.mysql_operator.MySqlOperator(
+        from airflow.operators.mysql_operator import MySqlOperator
+        t = MySqlOperator(
             task_id='mysql_operator_test_multi',
             mysql_conn_id='airflow_db',
             sql=sql, dag=self.dag)
@@ -93,10 +95,40 @@ class MySqlTest(unittest.TestCase):
                 results = tuple(result[0] for result in c.fetchall())
                 self.assertEqual(sorted(results), sorted(records))
 
+    def test_mysql_hook_test_bulk_dump(self):
+        from airflow.hooks.mysql_hook import MySqlHook
+        hook = MySqlHook('airflow_ci')
+        priv = hook.get_first("SELECT @@global.secure_file_priv")
+        if priv and priv[0]:
+            # Confirm that no error occurs
+            hook.bulk_dump("INFORMATION_SCHEMA.TABLES", os.path.join(priv[0], "TABLES"))
+        else:
+            self.skipTest("Skip test_mysql_hook_test_bulk_load "
+                          "since file output is not permitted")
+
+    @mock.patch('airflow.hooks.mysql_hook.MySqlHook.get_conn')
+    def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn):
+        mock_execute = mock.MagicMock()
+        mock_get_conn.return_value.cursor.return_value.execute = mock_execute
+
+        from airflow.hooks.mysql_hook import MySqlHook
+        hook = MySqlHook('airflow_ci')
+        table = "INFORMATION_SCHEMA.TABLES"
+        tmp_file = "/path/to/output/file"
+        hook.bulk_dump(table, tmp_file)
+
+        from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
+        mock_execute.assert_called_once()
+        query = """
+            SELECT * INTO OUTFILE '{tmp_file}'
+            FROM {table}
+        """.format(tmp_file=tmp_file, table=table)
+        assertEqualIgnoreMultipleSpaces(self, mock_execute.call_args[0][0], query)
+
     def test_mysql_to_mysql(self):
         sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
-        import airflow.operators.generic_transfer
-        t = operators.generic_transfer.GenericTransfer(
+        from airflow.operators.generic_transfer import GenericTransfer
+        t = GenericTransfer(
             task_id='test_m2m',
             preoperator=[
                 "DROP TABLE IF EXISTS test_mysql_to_mysql",
@@ -114,10 +146,10 @@ class MySqlTest(unittest.TestCase):
         """
         Verifies option to overwrite connection schema
         """
-        import airflow.operators.mysql_operator
+        from airflow.operators.mysql_operator import MySqlOperator
 
         sql = "SELECT 1;"
-        t = operators.mysql_operator.MySqlOperator(
+        t = MySqlOperator(
             task_id='test_mysql_operator_test_schema_overwrite',
             sql=sql,
             dag=self.dag,
@@ -146,9 +178,8 @@ class PostgresTest(unittest.TestCase):
             dummy VARCHAR(50)
         );
         """
-        import airflow.operators.postgres_operator
-        t = operators.postgres_operator.PostgresOperator(
-            task_id='basic_postgres', sql=sql, dag=self.dag)
+        from airflow.operators.postgres_operator import PostgresOperator
+        t = PostgresOperator(task_id='basic_postgres', sql=sql, dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         autocommitTask = operators.postgres_operator.PostgresOperator(
@@ -166,15 +197,15 @@ class PostgresTest(unittest.TestCase):
             "TRUNCATE TABLE test_airflow",
             "INSERT INTO test_airflow VALUES ('X')",
         ]
-        import airflow.operators.postgres_operator
-        t = operators.postgres_operator.PostgresOperator(
+        from airflow.operators.postgres_operator import PostgresOperator
+        t = PostgresOperator(
             task_id='postgres_operator_test_multi', sql=sql, dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_postgres_to_postgres(self):
         sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
-        import airflow.operators.generic_transfer
-        t = operators.generic_transfer.GenericTransfer(
+        from airflow.operators.generic_transfer import GenericTransfer
+        t = GenericTransfer(
             task_id='test_p2p',
             preoperator=[
                 "DROP TABLE IF EXISTS test_postgres_to_postgres",
@@ -192,10 +223,10 @@ class PostgresTest(unittest.TestCase):
         """
         Verifies the VACUUM operation runs well with the PostgresOperator
         """
-        import airflow.operators.postgres_operator
+        from airflow.operators.postgres_operator import PostgresOperator
 
         sql = "VACUUM ANALYZE;"
-        t = operators.postgres_operator.PostgresOperator(
+        t = PostgresOperator(
             task_id='postgres_operator_test_vacuum',
             sql=sql,
             dag=self.dag,
@@ -206,10 +237,10 @@ class PostgresTest(unittest.TestCase):
         """
         Verifies option to overwrite connection schema
         """
-        import airflow.operators.postgres_operator
+        from airflow.operators.postgres_operator import PostgresOperator
 
         sql = "SELECT 1;"
-        t = operators.postgres_operator.PostgresOperator(
+        t = PostgresOperator(
             task_id='postgres_operator_test_schema_overwrite',
             sql=sql,
             dag=self.dag,
@@ -242,7 +273,6 @@ class TransferTests(unittest.TestCase):
             end_date=timezone.utcnow())
 
     def test_mysql_to_hive(self):
-        # import airflow.operators
         from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
         sql = "SELECT * FROM baby_names LIMIT 1000;"
         t = MySqlToHiveTransfer(
@@ -273,7 +303,6 @@ class TransferTests(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_mysql_to_hive_tblproperties(self):
-        # import airflow.operators
         from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
         sql = "SELECT * FROM baby_names LIMIT 1000;"
         t = MySqlToHiveTransfer(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/test_redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_redshift_to_s3_operator.py b/tests/operators/test_redshift_to_s3_operator.py
index e214f3d..379e787 100644
--- a/tests/operators/test_redshift_to_s3_operator.py
+++ b/tests/operators/test_redshift_to_s3_operator.py
@@ -19,11 +19,11 @@
 #
 
 import mock
-import re
 import unittest
 
 from boto3.session import Session
 from airflow.operators.redshift_to_s3_operator import RedshiftToS3Transfer
+from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
 
 
 class TestRedshiftToS3Transfer(unittest.TestCase):
@@ -92,13 +92,8 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
                            secret_key=secret_key,
                            unload_options=unload_options)
 
-        def _trim(s):
-            return re.sub("\s+", " ", s.strip())
-
-        self.assertEqual(_trim(cur.execute.call_args[0][0]),
-                         _trim(columns_query))
         cur.execute.assert_called_once()
+        assertEqualIgnoreMultipleSpaces(self, cur.execute.call_args[0][0], columns_query)
 
-        self.assertEqual(_trim(mock_run.call_args[0][0]),
-                         _trim(unload_query))
         mock_run.assert_called_once()
+        assertEqualIgnoreMultipleSpaces(self, mock_run.call_args[0][0], unload_query)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/test_s3_to_redshift_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_s3_to_redshift_operator.py b/tests/operators/test_s3_to_redshift_operator.py
index 0fadda3..2afde6c 100644
--- a/tests/operators/test_s3_to_redshift_operator.py
+++ b/tests/operators/test_s3_to_redshift_operator.py
@@ -19,11 +19,11 @@
 #
 
 import mock
-import re
 import unittest
 
 from boto3.session import Session
 from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer
+from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
 
 
 class TestS3ToRedshiftTransfer(unittest.TestCase):
@@ -67,9 +67,5 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
                    secret_key=secret_key,
                    copy_options=copy_options)
 
-        def _trim(s):
-            return re.sub("\s+", " ", s.strip())
-
-        self.assertEqual(_trim(mock_run.call_args[0][0]),
-                         _trim(copy_query))
         mock_run.assert_called_once()
+        assertEqualIgnoreMultipleSpaces(self, mock_run.call_args[0][0], copy_query)