You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 07:02:13 UTC

[17/50] incubator-airflow git commit: [AIRFLOW-1313] Add vertica_to_mysql operator

[AIRFLOW-1313] Add vertica_to_mysql operator

Closes #2370 from juise/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3aa8e31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3aa8e31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3aa8e31

Branch: refs/heads/v1-10-test
Commit: c3aa8e31fae4edeff8abe6d38e573ef16583496c
Parents: c5d3576
Author: Alexander Petrovsky <as...@gmail.com>
Authored: Sun Apr 29 00:05:23 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 00:05:23 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/vertica_to_mysql.py   | 142 +++++++++++++++++++
 scripts/ci/requirements.txt                     |   1 +
 .../contrib/operators/test_vertica_to_mysql.py  |  87 ++++++++++++
 3 files changed, 230 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/airflow/contrib/operators/vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py
new file mode 100644
index 0000000..9b14e6c
--- /dev/null
+++ b/airflow/contrib/operators/vertica_to_mysql.py
@@ -0,0 +1,142 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.vertica_hook import VerticaHook
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+from contextlib import closing
+
+import unicodecsv as csv
+from tempfile import NamedTemporaryFile
+
+
+class VerticaToMySqlTransfer(BaseOperator):
+    """
+    Moves data from Vertica to MySQL.
+
+    :param sql: SQL query to execute against the Vertica database
+    :type sql: str
+    :param vertica_conn_id: source Vertica connection
+    :type vertica_conn_id: str
+    :param mysql_table: target MySQL table, use dot notation to target a
+        specific database
+    :type mysql_table: str
+    :param mysql_conn_id: source mysql connection
+    :type mysql_conn_id: str
+    :param mysql_preoperator: sql statement to run against MySQL prior to
+        import, typically use to truncate of delete in place of the data
+        coming in, allowing the task to be idempotent (running the task
+        twice won't double load data)
+    :type mysql_preoperator: str
+    :param mysql_postoperator: sql statement to run against MySQL after the
+        import, typically used to move data from staging to production
+        and issue cleanup commands.
+    :type mysql_postoperator: str
+    :param bulk_load: flag to use bulk_load option.  This loads MySQL directly
+        from a tab-delimited text file using the LOAD DATA LOCAL INFILE command.
+        This option requires an extra connection parameter for the
+        destination MySQL connection: {'local_infile': true}.
+    :type bulk_load: bool
+    """
+
+    template_fields = ('sql', 'mysql_table', 'mysql_preoperator',
+        'mysql_postoperator')
+    template_ext = ('.sql',)
+    ui_color = '#a0e08c'
+
+    @apply_defaults
+    def __init__(
+            self,
+            sql,
+            mysql_table,
+            vertica_conn_id='vertica_default',
+            mysql_conn_id='mysql_default',
+            mysql_preoperator=None,
+            mysql_postoperator=None,
+            bulk_load=False,
+            *args, **kwargs):
+        super(VerticaToMySqlTransfer, self).__init__(*args, **kwargs)
+        self.sql = sql
+        self.mysql_table = mysql_table
+        self.mysql_conn_id = mysql_conn_id
+        self.mysql_preoperator = mysql_preoperator
+        self.mysql_postoperator = mysql_postoperator
+        self.vertica_conn_id = vertica_conn_id
+        self.bulk_load = bulk_load
+
+    def execute(self, context):
+        vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
+        mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+
+        tmpfile = None
+        result = None
+
+        selected_columns = []
+
+        count = 0
+        with closing(vertica.get_conn()) as conn:
+            with closing(conn.cursor()) as cursor:
+                cursor.execute(self.sql)
+                selected_columns = [d.name for d in cursor.description]
+
+                if self.bulk_load:
+                    tmpfile = NamedTemporaryFile("w")
+
+                    logging.info("Selecting rows from Vertica to local file " + str(tmpfile.name) + "...")
+                    logging.info(self.sql)
+
+                    csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
+                    for row in cursor.iterate():
+                        csv_writer.writerow(row)
+                        count += 1
+
+                    tmpfile.flush()
+                else:
+                    logging.info("Selecting rows from Vertica...")
+                    logging.info(self.sql)
+
+                    result = cursor.fetchall()
+                    count = len(result)
+
+                logging.info("Selected rows from Vertica " + str(count))
+
+        if self.mysql_preoperator:
+            logging.info("Running MySQL preoperator...")
+            mysql.run(self.mysql_preoperator)
+
+        try:
+            if self.bulk_load:
+                logging.info("Bulk inserting rows into MySQL...")
+                with closing(mysql.get_conn()) as conn:
+                    with closing(conn.cursor()) as cursor:
+                        cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO TABLE %s LINES TERMINATED BY '\r\n' (%s)" % (tmpfile.name, self.mysql_table, ", ".join(selected_columns)))
+                        conn.commit()
+                tmpfile.close()
+            else:
+                logging.info("Inserting rows into MySQL...")
+                mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+            logging.info("Inserted rows into MySQL " + str(count))
+        except:
+            logging.error("Inserted rows into MySQL 0")
+            raise
+
+        if self.mysql_postoperator:
+            logging.info("Running MySQL postoperator...")
+            mysql.run(self.mysql_postoperator)
+
+        logging.info("Done")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index bba5d29..a2a78df 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -92,5 +92,6 @@ statsd
 thrift
 thrift_sasl
 unicodecsv
+vertica_python
 zdesk
 kubernetes

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/tests/contrib/operators/test_vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_vertica_to_mysql.py b/tests/contrib/operators/test_vertica_to_mysql.py
new file mode 100644
index 0000000..5c05e3d
--- /dev/null
+++ b/tests/contrib/operators/test_vertica_to_mysql.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+
+import mock
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.vertica_to_mysql import VerticaToMySqlTransfer
+
+
+def mock_get_conn():
+    commit_mock = mock.MagicMock(
+    )
+    cursor_mock = mock.MagicMock(
+        execute     = [],
+        fetchall    = [['1', '2', '3']],
+        description =  ['a', 'b', 'c'],
+        iterate     = [['1', '2', '3']],
+    )
+    conn_mock = mock.MagicMock(
+        commit      = commit_mock,
+        cursor      = cursor_mock,
+    )
+    return conn_mock
+
+
+class TestVerticaToMySqlTransfer(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.VerticaHook.get_conn', side_effect=mock_get_conn)
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.get_conn', side_effect=mock_get_conn)
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.insert_rows', return_value=True)
+    def test_select_insert_transfer(self, *args):
+        """
+        Test check selection from vertica into memory and
+        after that inserting into mysql
+        """
+        task = VerticaToMySqlTransfer(task_id='test_task_id',
+                                      sql='select a, b, c',
+                                      mysql_table='test_table',
+                                      vertica_conn_id='test_vertica_conn_id',
+                                      mysql_conn_id='test_mysql_conn_id',
+                                      params={},
+                                      bulk_load=False,
+                                      dag=self.dag)
+        task.execute(None)
+
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.VerticaHook.get_conn', side_effect=mock_get_conn)
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.get_conn', side_effect=mock_get_conn)
+    def test_select_bulk_insert_transfer(self, *args):
+        """
+        Test check selection from vertica into temporary file and
+        after that bulk inserting into mysql
+        """
+        task = VerticaToMySqlTransfer(task_id='test_task_id',
+                                      sql='select a, b, c',
+                                      mysql_table='test_table',
+                                      vertica_conn_id='test_vertica_conn_id',
+                                      mysql_conn_id='test_mysql_conn_id',
+                                      params={},
+                                      bulk_load=True,
+                                      dag=self.dag)
+        task.execute(None)
+
+
+if __name__ == '__main__':
+    unittest.main()