You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2018/05/23 20:58:09 UTC
incubator-airflow git commit: [AIRFLOW-2472] Implement
MySqlHook.bulk_dump
Repository: incubator-airflow
Updated Branches:
refs/heads/master f1ac67bdc -> 3bdb34e77
[AIRFLOW-2472] Implement MySqlHook.bulk_dump
Implement MySqlHook.bulk_dump since the opposite
operation bulk_load is already implemented.
This PR also addresses some flake8 warnings.
Closes #3385 from sekikn/AIRFLOW-2472
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3bdb34e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3bdb34e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3bdb34e7
Branch: refs/heads/master
Commit: 3bdb34e7775fdb85e3dd6cd92a47f91c80b1f67b
Parents: f1ac67b
Author: Kengo Seki <se...@apache.org>
Authored: Wed May 23 13:58:04 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Wed May 23 13:58:04 2018 -0700
----------------------------------------------------------------------
airflow/hooks/mysql_hook.py | 12 ++++
airflow/utils/tests.py | 8 +++
tests/hooks/test_hive_hook.py | 9 +--
tests/operators/operators.py | 75 ++++++++++++++------
tests/operators/test_redshift_to_s3_operator.py | 11 +--
tests/operators/test_s3_to_redshift_operator.py | 8 +--
6 files changed, 79 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/airflow/hooks/mysql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/mysql_hook.py b/airflow/hooks/mysql_hook.py
index d202e55..f52b60c 100644
--- a/airflow/hooks/mysql_hook.py
+++ b/airflow/hooks/mysql_hook.py
@@ -89,6 +89,18 @@ class MySqlHook(DbApiHook):
""".format(**locals()))
conn.commit()
+ def bulk_dump(self, table, tmp_file):
+ """
+ Dumps a database table into a tab-delimited file
+ """
+ conn = self.get_conn()
+ cur = conn.cursor()
+ cur.execute("""
+ SELECT * INTO OUTFILE '{tmp_file}'
+ FROM {table}
+ """.format(**locals()))
+ conn.commit()
+
@staticmethod
def _serialize_cell(cell, conn):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/airflow/utils/tests.py
----------------------------------------------------------------------
diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py
index 9982a07..6f29ffc 100644
--- a/airflow/utils/tests.py
+++ b/airflow/utils/tests.py
@@ -17,8 +17,10 @@
# specific language governing permissions and limitations
# under the License.
+import re
import unittest
+
def skipUnlessImported(module, obj):
import importlib
try:
@@ -29,3 +31,9 @@ def skipUnlessImported(module, obj):
obj in dir(m),
"Skipping test because {} could not be imported from {}".format(
obj, module))
+
+
+def assertEqualIgnoreMultipleSpaces(case, first, second, msg=None):
+ def _trim(s):
+ re.sub("\s+", " ", s.strip())
+ return case.assertEqual(_trim(first), _trim(second), msg)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/hooks/test_hive_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py
index d090086..690b5ad 100644
--- a/tests/hooks/test_hive_hook.py
+++ b/tests/hooks/test_hive_hook.py
@@ -22,7 +22,6 @@ import datetime
import itertools
import pandas as pd
import random
-import re
import mock
import unittest
@@ -34,6 +33,7 @@ from airflow.exceptions import AirflowException
from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook
from airflow import DAG, configuration, operators
from airflow.utils import timezone
+from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
configuration.load_test_config()
@@ -188,12 +188,7 @@ class TestHiveCliHook(unittest.TestCase):
STORED AS textfile
;
"""
-
- def _trim(s):
- return re.sub("\s+", " ", s.strip())
-
- self.assertEqual(_trim(mock_run_cli.call_args_list[0][0][0]),
- _trim(query))
+ assertEqualIgnoreMultipleSpaces(self, mock_run_cli.call_args_list[0][0][0], query)
class TestHiveMetastoreHook(HiveEnvironmentTest):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 9fc2c93..f795fd0 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -23,10 +23,12 @@ from airflow import DAG, configuration, operators
from airflow.utils.tests import skipUnlessImported
from airflow.utils import timezone
-configuration.load_test_config()
-
+import os
+import mock
import unittest
+configuration.load_test_config()
+
DEFAULT_DATE = timezone.datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
@@ -51,8 +53,8 @@ class MySqlTest(unittest.TestCase):
dummy VARCHAR(50)
);
"""
- import airflow.operators.mysql_operator
- t = operators.mysql_operator.MySqlOperator(
+ from airflow.operators.mysql_operator import MySqlOperator
+ t = MySqlOperator(
task_id='basic_mysql',
sql=sql,
mysql_conn_id='airflow_db',
@@ -64,8 +66,8 @@ class MySqlTest(unittest.TestCase):
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
- import airflow.operators.mysql_operator
- t = operators.mysql_operator.MySqlOperator(
+ from airflow.operators.mysql_operator import MySqlOperator
+ t = MySqlOperator(
task_id='mysql_operator_test_multi',
mysql_conn_id='airflow_db',
sql=sql, dag=self.dag)
@@ -93,10 +95,40 @@ class MySqlTest(unittest.TestCase):
results = tuple(result[0] for result in c.fetchall())
self.assertEqual(sorted(results), sorted(records))
+ def test_mysql_hook_test_bulk_dump(self):
+ from airflow.hooks.mysql_hook import MySqlHook
+ hook = MySqlHook('airflow_ci')
+ priv = hook.get_first("SELECT @@global.secure_file_priv")
+ if priv and priv[0]:
+ # Confirm that no error occurs
+ hook.bulk_dump("INFORMATION_SCHEMA.TABLES", os.path.join(priv[0], "TABLES"))
+ else:
+ self.skipTest("Skip test_mysql_hook_test_bulk_load "
+ "since file output is not permitted")
+
+ @mock.patch('airflow.hooks.mysql_hook.MySqlHook.get_conn')
+ def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn):
+ mock_execute = mock.MagicMock()
+ mock_get_conn.return_value.cursor.return_value.execute = mock_execute
+
+ from airflow.hooks.mysql_hook import MySqlHook
+ hook = MySqlHook('airflow_ci')
+ table = "INFORMATION_SCHEMA.TABLES"
+ tmp_file = "/path/to/output/file"
+ hook.bulk_dump(table, tmp_file)
+
+ from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
+ mock_execute.assert_called_once()
+ query = """
+ SELECT * INTO OUTFILE '{tmp_file}'
+ FROM {table}
+ """.format(tmp_file=tmp_file, table=table)
+ assertEqualIgnoreMultipleSpaces(self, mock_execute.call_args[0][0], query)
+
def test_mysql_to_mysql(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
- import airflow.operators.generic_transfer
- t = operators.generic_transfer.GenericTransfer(
+ from airflow.operators.generic_transfer import GenericTransfer
+ t = GenericTransfer(
task_id='test_m2m',
preoperator=[
"DROP TABLE IF EXISTS test_mysql_to_mysql",
@@ -114,10 +146,10 @@ class MySqlTest(unittest.TestCase):
"""
Verifies option to overwrite connection schema
"""
- import airflow.operators.mysql_operator
+ from airflow.operators.mysql_operator import MySqlOperator
sql = "SELECT 1;"
- t = operators.mysql_operator.MySqlOperator(
+ t = MySqlOperator(
task_id='test_mysql_operator_test_schema_overwrite',
sql=sql,
dag=self.dag,
@@ -146,9 +178,8 @@ class PostgresTest(unittest.TestCase):
dummy VARCHAR(50)
);
"""
- import airflow.operators.postgres_operator
- t = operators.postgres_operator.PostgresOperator(
- task_id='basic_postgres', sql=sql, dag=self.dag)
+ from airflow.operators.postgres_operator import PostgresOperator
+ t = PostgresOperator(task_id='basic_postgres', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
autocommitTask = operators.postgres_operator.PostgresOperator(
@@ -166,15 +197,15 @@ class PostgresTest(unittest.TestCase):
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
- import airflow.operators.postgres_operator
- t = operators.postgres_operator.PostgresOperator(
+ from airflow.operators.postgres_operator import PostgresOperator
+ t = PostgresOperator(
task_id='postgres_operator_test_multi', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
def test_postgres_to_postgres(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
- import airflow.operators.generic_transfer
- t = operators.generic_transfer.GenericTransfer(
+ from airflow.operators.generic_transfer import GenericTransfer
+ t = GenericTransfer(
task_id='test_p2p',
preoperator=[
"DROP TABLE IF EXISTS test_postgres_to_postgres",
@@ -192,10 +223,10 @@ class PostgresTest(unittest.TestCase):
"""
Verifies the VACUUM operation runs well with the PostgresOperator
"""
- import airflow.operators.postgres_operator
+ from airflow.operators.postgres_operator import PostgresOperator
sql = "VACUUM ANALYZE;"
- t = operators.postgres_operator.PostgresOperator(
+ t = PostgresOperator(
task_id='postgres_operator_test_vacuum',
sql=sql,
dag=self.dag,
@@ -206,10 +237,10 @@ class PostgresTest(unittest.TestCase):
"""
Verifies option to overwrite connection schema
"""
- import airflow.operators.postgres_operator
+ from airflow.operators.postgres_operator import PostgresOperator
sql = "SELECT 1;"
- t = operators.postgres_operator.PostgresOperator(
+ t = PostgresOperator(
task_id='postgres_operator_test_schema_overwrite',
sql=sql,
dag=self.dag,
@@ -242,7 +273,6 @@ class TransferTests(unittest.TestCase):
end_date=timezone.utcnow())
def test_mysql_to_hive(self):
- # import airflow.operators
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
t = MySqlToHiveTransfer(
@@ -273,7 +303,6 @@ class TransferTests(unittest.TestCase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
def test_mysql_to_hive_tblproperties(self):
- # import airflow.operators
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
t = MySqlToHiveTransfer(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/test_redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_redshift_to_s3_operator.py b/tests/operators/test_redshift_to_s3_operator.py
index e214f3d..379e787 100644
--- a/tests/operators/test_redshift_to_s3_operator.py
+++ b/tests/operators/test_redshift_to_s3_operator.py
@@ -19,11 +19,11 @@
#
import mock
-import re
import unittest
from boto3.session import Session
from airflow.operators.redshift_to_s3_operator import RedshiftToS3Transfer
+from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
class TestRedshiftToS3Transfer(unittest.TestCase):
@@ -92,13 +92,8 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
secret_key=secret_key,
unload_options=unload_options)
- def _trim(s):
- return re.sub("\s+", " ", s.strip())
-
- self.assertEqual(_trim(cur.execute.call_args[0][0]),
- _trim(columns_query))
cur.execute.assert_called_once()
+ assertEqualIgnoreMultipleSpaces(self, cur.execute.call_args[0][0], columns_query)
- self.assertEqual(_trim(mock_run.call_args[0][0]),
- _trim(unload_query))
mock_run.assert_called_once()
+ assertEqualIgnoreMultipleSpaces(self, mock_run.call_args[0][0], unload_query)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3bdb34e7/tests/operators/test_s3_to_redshift_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_s3_to_redshift_operator.py b/tests/operators/test_s3_to_redshift_operator.py
index 0fadda3..2afde6c 100644
--- a/tests/operators/test_s3_to_redshift_operator.py
+++ b/tests/operators/test_s3_to_redshift_operator.py
@@ -19,11 +19,11 @@
#
import mock
-import re
import unittest
from boto3.session import Session
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer
+from airflow.utils.tests import assertEqualIgnoreMultipleSpaces
class TestS3ToRedshiftTransfer(unittest.TestCase):
@@ -67,9 +67,5 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
secret_key=secret_key,
copy_options=copy_options)
- def _trim(s):
- return re.sub("\s+", " ", s.strip())
-
- self.assertEqual(_trim(mock_run.call_args[0][0]),
- _trim(copy_query))
mock_run.assert_called_once()
+ assertEqualIgnoreMultipleSpaces(self, mock_run.call_args[0][0], copy_query)