You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/11/27 18:56:02 UTC

[2/3] incubator-airflow git commit: [AIRFLOW-868] Add postgres_to_gcs operator and unittests

[AIRFLOW-868] Add postgres_to_gcs operator and unittests

Adds a postgres_to_gcs operator to contrib so that a user can copy a
dump from postgres to google cloud storage. Tests write to local
NamedTemporayFiles so we correctly test serializing encoded ndjson in
both python3 and python2.7.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8fa2e90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8fa2e90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8fa2e90

Branch: refs/heads/master
Commit: d8fa2e9049328341dc58a635b34f04fa52de543e
Parents: 2f79610
Author: Adam Boscarino <aj...@gmail.com>
Authored: Sun Feb 12 18:27:15 2017 -0500
Committer: Devon Peticolas <de...@peticol.as>
Committed: Wed Nov 15 14:34:59 2017 -0500

----------------------------------------------------------------------
 .../operators/postgres_to_gcs_operator.py       | 242 +++++++++++++++++++
 .../operators/test_postgres_to_gcs_operator.py  | 153 ++++++++++++
 2 files changed, 395 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8fa2e90/airflow/contrib/operators/postgres_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/postgres_to_gcs_operator.py b/airflow/contrib/operators/postgres_to_gcs_operator.py
new file mode 100644
index 0000000..441ccf5
--- /dev/null
+++ b/airflow/contrib/operators/postgres_to_gcs_operator.py
@@ -0,0 +1,242 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import json
+import logging
+import time
+import datetime
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.hooks.postgres_hook import PostgresHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from decimal import Decimal
+from tempfile import NamedTemporaryFile
+
+PY3 = sys.version_info[0] == 3
+
+
+class PostgresToGoogleCloudStorageOperator(BaseOperator):
+    """
+    Copy data from Postgres to Google Cloud Storage in JSON format.
+    """
+    template_fields = ('sql', 'bucket', 'filename', 'schema_filename',
+                       'parameters')
+    template_ext = ('.sql', )
+    ui_color = '#a0e08c'
+
+    @apply_defaults
+    def __init__(self,
+                 sql,
+                 bucket,
+                 filename,
+                 schema_filename=None,
+                 approx_max_file_size_bytes=1900000000,
+                 postgres_conn_id='postgres_default',
+                 google_cloud_storage_conn_id='google_cloud_storage_default',
+                 delegate_to=None,
+                 parameters=None,
+                 *args,
+                 **kwargs):
+        """
+        :param sql: The SQL to execute on the Postgres table.
+        :type sql: string
+        :param bucket: The bucket to upload to.
+        :type bucket: string
+        :param filename: The filename to use as the object name when uploading
+            to Google Cloud Storage. A {} should be specified in the filename
+            to allow the operator to inject file numbers in cases where the
+            file is split due to size.
+        :type filename: string
+        :param schema_filename: If set, the filename to use as the object name
+            when uploading a .json file containing the BigQuery schema fields
+            for the table that was dumped from Postgres.
+        :type schema_filename: string
+        :param approx_max_file_size_bytes: This operator supports the ability
+            to split large table dumps into multiple files (see notes in the
+            filenamed param docs above). Google Cloud Storage allows for files
+            to be a maximum of 4GB. This param allows developers to specify the
+            file size of the splits.
+        :type approx_max_file_size_bytes: long
+        :param postgres_conn_id: Reference to a specific Postgres hook.
+        :type postgres_conn_id: string
+        :param google_cloud_storage_conn_id: Reference to a specific Google
+            cloud storage hook.
+        :type google_cloud_storage_conn_id: string
+        :param delegate_to: The account to impersonate, if any. For this to
+            work, the service account making the request must have domain-wide
+            delegation enabled.
+        :param parameters: a parameters dict that is substituted at query runtime.
+        :type parameters: dict
+        """
+        super(PostgresToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
+        self.sql = sql
+        self.bucket = bucket
+        self.filename = filename
+        self.schema_filename = schema_filename
+        self.approx_max_file_size_bytes = approx_max_file_size_bytes
+        self.postgres_conn_id = postgres_conn_id
+        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+        self.delegate_to = delegate_to
+        self.parameters = parameters
+
+    def execute(self, context):
+        cursor = self._query_postgres()
+        files_to_upload = self._write_local_data_files(cursor)
+
+        # If a schema is set, create a BQ schema JSON file.
+        if self.schema_filename:
+            files_to_upload.update(self._write_local_schema_file(cursor))
+
+        # Flush all files before uploading
+        for file_handle in files_to_upload.values():
+            file_handle.flush()
+
+        self._upload_to_gcs(files_to_upload)
+
+        # Close all temp file handles.
+        for file_handle in files_to_upload.values():
+            file_handle.close()
+
+    def _query_postgres(self):
+        """
+        Queries Postgres and returns a cursor to the results.
+        """
+        postgres = PostgresHook(postgres_conn_id=self.postgres_conn_id)
+        conn = postgres.get_conn()
+        cursor = conn.cursor()
+        cursor.execute(self.sql, self.parameters)
+        return cursor
+
+    def _write_local_data_files(self, cursor):
+        """
+        Takes a cursor, and writes results to a local file.
+
+        :return: A dictionary where keys are filenames to be used as object
+            names in GCS, and values are file handles to local files that
+            contain the data for the GCS objects.
+        """
+        schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
+        file_no = 0
+        tmp_file_handle = NamedTemporaryFile(delete=True)
+        tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
+
+        for row in cursor:
+            # Convert datetime objects to utc seconds, and decimals to floats
+            row = map(self.convert_types, row)
+            row_dict = dict(zip(schema, row))
+
+            s = json.dumps(row_dict, sort_keys=True)
+            if PY3:
+                s = s.encode('utf-8')
+            tmp_file_handle.write(s)
+
+            # Append newline to make dumps BigQuery compatible.
+            tmp_file_handle.write(b'\n')
+
+            # Stop if the file exceeds the file size limit.
+            if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
+                file_no += 1
+                tmp_file_handle = NamedTemporaryFile(delete=True)
+                tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle
+
+        return tmp_file_handles
+
+    def _write_local_schema_file(self, cursor):
+        """
+        Takes a cursor, and writes the BigQuery schema for the results to a
+        local file system.
+
+        :return: A dictionary where key is a filename to be used as an object
+            name in GCS, and values are file handles to local files that
+            contains the BigQuery schema fields in .json format.
+        """
+        schema = []
+        for field in cursor.description:
+            # See PEP 249 for details about the description tuple.
+            field_name = field[0]
+            field_type = self.type_map(field[1])
+            field_mode = 'REPEATED' if field[1] in (1009, 1005, 1007,
+                                                    1016) else 'NULLABLE'
+            schema.append({
+                'name': field_name,
+                'type': field_type,
+                'mode': field_mode,
+            })
+
+        logging.info('Using schema for %s: %s', self.schema_filename, schema)
+        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
+        s = json.dumps(schema, sort_keys=True)
+        if PY3:
+            s = s.encode('utf-8')
+        tmp_schema_file_handle.write(s)
+        return {self.schema_filename: tmp_schema_file_handle}
+
+    def _upload_to_gcs(self, files_to_upload):
+        """
+        Upload all of the file splits (and optionally the schema .json file) to
+        Google Cloud Storage.
+        """
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+            delegate_to=self.delegate_to)
+        for object, tmp_file_handle in files_to_upload.items():
+            hook.upload(self.bucket, object, tmp_file_handle.name,
+                        'application/json')
+
+    @classmethod
+    def convert_types(cls, value):
+        """
+        Takes a value from Postgres, and converts it to a value that's safe for
+        JSON/Google Cloud Storage/BigQuery. Dates are converted to UTC seconds.
+        Decimals are converted to floats. Times are converted to seconds.
+        """
+        if type(value) in (datetime.datetime, datetime.date):
+            return time.mktime(value.timetuple())
+        elif type(value) == datetime.time:
+            formated_time = time.strptime(str(value), "%H:%M:%S")
+            return datetime.timedelta(
+                hours=formated_time.tm_hour,
+                minutes=formated_time.tm_min,
+                seconds=formated_time.tm_sec).seconds
+        elif isinstance(value, Decimal):
+            return float(value)
+        else:
+            return value
+
+    @classmethod
+    def type_map(cls, postgres_type):
+        """
+        Helper function that maps from Postgres fields to BigQuery fields. Used
+        when a schema_filename is set.
+        """
+        d = {
+            1114: 'TIMESTAMP',
+            1184: 'TIMESTAMP',
+            1082: 'TIMESTAMP',
+            1083: 'TIMESTAMP',
+            1005: 'INTEGER',
+            1007: 'INTEGER',
+            1016: 'INTEGER',
+            20: 'INTEGER',
+            21: 'INTEGER',
+            23: 'INTEGER',
+            16: 'BOOLEAN',
+            700: 'FLOAT',
+            701: 'FLOAT',
+            1700: 'FLOAT'
+        }
+
+        return d[postgres_type] if postgres_type in d else 'STRING'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8fa2e90/tests/contrib/operators/test_postgres_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_postgres_to_gcs_operator.py b/tests/contrib/operators/test_postgres_to_gcs_operator.py
new file mode 100644
index 0000000..9567243
--- /dev/null
+++ b/tests/contrib/operators/test_postgres_to_gcs_operator.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import sys
+import unittest
+
+from airflow.contrib.operators.postgres_to_gcs_operator import PostgresToGoogleCloudStorageOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+PY3 = sys.version_info[0] == 3
+
+TASK_ID = 'test-postgres-to-gcs'
+POSTGRES_CONN_ID = 'postgres_conn_test'
+SQL = 'select 1'
+BUCKET = 'gs://test'
+FILENAME = 'test_{}.ndjson'
+# we expect the psycopg cursor to return encoded strs in py2 and decoded in py3
+if PY3:
+    ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)]
+    CURSOR_DESCRIPTION = (('some_str', 0), ('some_num', 1005))
+else:
+    ROWS = [(b'mock_row_content_1', 42), (b'mock_row_content_2', 43), (b'mock_row_content_3', 44)]
+    CURSOR_DESCRIPTION = ((b'some_str', 0), (b'some_num', 1005))
+NDJSON_LINES = [
+    b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
+    b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
+    b'{"some_num": 44, "some_str": "mock_row_content_3"}\n'
+]
+SCHEMA_FILENAME = 'schema_test.json'
+SCHEMA_JSON = b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, {"mode": "REPEATED", "name": "some_num", "type": "INTEGER"}]'
+
+
+class PostgresToGoogleCloudStorageOperatorTest(unittest.TestCase):
+    def test_init(self):
+        """Test PostgresToGoogleCloudStorageOperator instance is properly initialized."""
+        op = PostgresToGoogleCloudStorageOperator(
+            task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=FILENAME)
+        self.assertEqual(op.task_id, TASK_ID)
+        self.assertEqual(op.sql, SQL)
+        self.assertEqual(op.bucket, BUCKET)
+        self.assertEqual(op.filename, FILENAME)
+
+    @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook')
+    @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook')
+    def test_exec_success(self, gcs_hook_mock_class, pg_hook_mock_class):
+        """Test the execute function in case where the run is successful."""
+        op = PostgresToGoogleCloudStorageOperator(
+            task_id=TASK_ID,
+            postgres_conn_id=POSTGRES_CONN_ID,
+            sql=SQL,
+            bucket=BUCKET,
+            filename=FILENAME)
+
+        pg_hook_mock = pg_hook_mock_class.return_value
+        pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+        pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+        gcs_hook_mock = gcs_hook_mock_class.return_value
+
+        def _assert_upload(bucket, obj, tmp_filename, content_type):
+            self.assertEqual(BUCKET, bucket)
+            self.assertEqual(FILENAME.format(0), obj)
+            self.assertEqual('application/json', content_type)
+            with open(tmp_filename, 'rb') as f:
+                self.assertEqual(b''.join(NDJSON_LINES), f.read())
+
+        gcs_hook_mock.upload.side_effect = _assert_upload
+
+        op.execute(None)
+
+        pg_hook_mock_class.assert_called_once_with(postgres_conn_id=POSTGRES_CONN_ID)
+        pg_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL, None)
+
+    @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook')
+    @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook')
+    def test_file_splitting(self, gcs_hook_mock_class, pg_hook_mock_class):
+        """Test that ndjson is split by approx_max_file_size_bytes param."""
+        pg_hook_mock = pg_hook_mock_class.return_value
+        pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+        pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+        gcs_hook_mock = gcs_hook_mock_class.return_value
+        expected_upload = {
+            FILENAME.format(0): b''.join(NDJSON_LINES[:2]),
+            FILENAME.format(1): NDJSON_LINES[2],
+        }
+
+        def _assert_upload(bucket, obj, tmp_filename, content_type):
+            self.assertEqual(BUCKET, bucket)
+            self.assertEqual('application/json', content_type)
+            with open(tmp_filename, 'rb') as f:
+                self.assertEqual(expected_upload[obj], f.read())
+
+        gcs_hook_mock.upload.side_effect = _assert_upload
+
+        op = PostgresToGoogleCloudStorageOperator(
+            task_id=TASK_ID,
+            sql=SQL,
+            bucket=BUCKET,
+            filename=FILENAME,
+            approx_max_file_size_bytes=len(expected_upload[FILENAME.format(0)]))
+        op.execute(None)
+
+    @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook')
+    @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook')
+    def test_schema_file(self, gcs_hook_mock_class, pg_hook_mock_class):
+        """Test writing schema files."""
+        pg_hook_mock = pg_hook_mock_class.return_value
+        pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+        pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+        gcs_hook_mock = gcs_hook_mock_class.return_value
+
+        def _assert_upload(bucket, obj, tmp_filename, content_type):
+            if obj == SCHEMA_FILENAME:
+                with open(tmp_filename, 'rb') as f:
+                    self.assertEqual(SCHEMA_JSON, f.read())
+
+        gcs_hook_mock.upload.side_effect = _assert_upload
+
+        op = PostgresToGoogleCloudStorageOperator(
+            task_id=TASK_ID,
+            sql=SQL,
+            bucket=BUCKET,
+            filename=FILENAME,
+            schema_filename=SCHEMA_FILENAME)
+        op.execute(None)
+
+        # once for the file and once for the schema
+        self.assertEqual(2, gcs_hook_mock.upload.call_count)