You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/11/27 18:56:02 UTC
[2/3] incubator-airflow git commit: [AIRFLOW-868] Add postgres_to_gcs
operator and unittests
[AIRFLOW-868] Add postgres_to_gcs operator and unittests
Adds a postgres_to_gcs operator to contrib so that a user can copy a
dump from postgres to google cloud storage. Tests write to local
NamedTemporayFiles so we correctly test serializing encoded ndjson in
both python3 and python2.7.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8fa2e90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8fa2e90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8fa2e90
Branch: refs/heads/master
Commit: d8fa2e9049328341dc58a635b34f04fa52de543e
Parents: 2f79610
Author: Adam Boscarino <aj...@gmail.com>
Authored: Sun Feb 12 18:27:15 2017 -0500
Committer: Devon Peticolas <de...@peticol.as>
Committed: Wed Nov 15 14:34:59 2017 -0500
----------------------------------------------------------------------
.../operators/postgres_to_gcs_operator.py | 242 +++++++++++++++++++
.../operators/test_postgres_to_gcs_operator.py | 153 ++++++++++++
2 files changed, 395 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8fa2e90/airflow/contrib/operators/postgres_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/postgres_to_gcs_operator.py b/airflow/contrib/operators/postgres_to_gcs_operator.py
new file mode 100644
index 0000000..441ccf5
--- /dev/null
+++ b/airflow/contrib/operators/postgres_to_gcs_operator.py
@@ -0,0 +1,242 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import json
+import logging
+import time
+import datetime
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.hooks.postgres_hook import PostgresHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from decimal import Decimal
+from tempfile import NamedTemporaryFile
+
+PY3 = sys.version_info[0] == 3
+
+
+class PostgresToGoogleCloudStorageOperator(BaseOperator):
+ """
+ Copy data from Postgres to Google Cloud Storage in JSON format.
+ """
+ template_fields = ('sql', 'bucket', 'filename', 'schema_filename',
+ 'parameters')
+ template_ext = ('.sql', )
+ ui_color = '#a0e08c'
+
+ @apply_defaults
+ def __init__(self,
+ sql,
+ bucket,
+ filename,
+ schema_filename=None,
+ approx_max_file_size_bytes=1900000000,
+ postgres_conn_id='postgres_default',
+ google_cloud_storage_conn_id='google_cloud_storage_default',
+ delegate_to=None,
+ parameters=None,
+ *args,
+ **kwargs):
+ """
+ :param sql: The SQL to execute on the Postgres table.
+ :type sql: string
+ :param bucket: The bucket to upload to.
+ :type bucket: string
+ :param filename: The filename to use as the object name when uploading
+ to Google Cloud Storage. A {} should be specified in the filename
+ to allow the operator to inject file numbers in cases where the
+ file is split due to size.
+ :type filename: string
+ :param schema_filename: If set, the filename to use as the object name
+ when uploading a .json file containing the BigQuery schema fields
+ for the table that was dumped from Postgres.
+ :type schema_filename: string
+ :param approx_max_file_size_bytes: This operator supports the ability
+ to split large table dumps into multiple files (see notes in the
+ filenamed param docs above). Google Cloud Storage allows for files
+ to be a maximum of 4GB. This param allows developers to specify the
+ file size of the splits.
+ :type approx_max_file_size_bytes: long
+ :param postgres_conn_id: Reference to a specific Postgres hook.
+ :type postgres_conn_id: string
+ :param google_cloud_storage_conn_id: Reference to a specific Google
+ cloud storage hook.
+ :type google_cloud_storage_conn_id: string
+ :param delegate_to: The account to impersonate, if any. For this to
+ work, the service account making the request must have domain-wide
+ delegation enabled.
+ :param parameters: a parameters dict that is substituted at query runtime.
+ :type parameters: dict
+ """
+ super(PostgresToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
+ self.sql = sql
+ self.bucket = bucket
+ self.filename = filename
+ self.schema_filename = schema_filename
+ self.approx_max_file_size_bytes = approx_max_file_size_bytes
+ self.postgres_conn_id = postgres_conn_id
+ self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+ self.delegate_to = delegate_to
+ self.parameters = parameters
+
+ def execute(self, context):
+ cursor = self._query_postgres()
+ files_to_upload = self._write_local_data_files(cursor)
+
+ # If a schema is set, create a BQ schema JSON file.
+ if self.schema_filename:
+ files_to_upload.update(self._write_local_schema_file(cursor))
+
+ # Flush all files before uploading
+ for file_handle in files_to_upload.values():
+ file_handle.flush()
+
+ self._upload_to_gcs(files_to_upload)
+
+ # Close all temp file handles.
+ for file_handle in files_to_upload.values():
+ file_handle.close()
+
+ def _query_postgres(self):
+ """
+ Queries Postgres and returns a cursor to the results.
+ """
+ postgres = PostgresHook(postgres_conn_id=self.postgres_conn_id)
+ conn = postgres.get_conn()
+ cursor = conn.cursor()
+ cursor.execute(self.sql, self.parameters)
+ return cursor
+
+ def _write_local_data_files(self, cursor):
+ """
+ Takes a cursor, and writes results to a local file.
+
+ :return: A dictionary where keys are filenames to be used as object
+ names in GCS, and values are file handles to local files that
+ contain the data for the GCS objects.
+ """
+ schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
+ file_no = 0
+ tmp_file_handle = NamedTemporaryFile(delete=True)
+ tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
+
+ for row in cursor:
+ # Convert datetime objects to utc seconds, and decimals to floats
+ row = map(self.convert_types, row)
+ row_dict = dict(zip(schema, row))
+
+ s = json.dumps(row_dict, sort_keys=True)
+ if PY3:
+ s = s.encode('utf-8')
+ tmp_file_handle.write(s)
+
+ # Append newline to make dumps BigQuery compatible.
+ tmp_file_handle.write(b'\n')
+
+ # Stop if the file exceeds the file size limit.
+ if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
+ file_no += 1
+ tmp_file_handle = NamedTemporaryFile(delete=True)
+ tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle
+
+ return tmp_file_handles
+
+ def _write_local_schema_file(self, cursor):
+ """
+ Takes a cursor, and writes the BigQuery schema for the results to a
+ local file system.
+
+ :return: A dictionary where key is a filename to be used as an object
+ name in GCS, and values are file handles to local files that
+ contains the BigQuery schema fields in .json format.
+ """
+ schema = []
+ for field in cursor.description:
+ # See PEP 249 for details about the description tuple.
+ field_name = field[0]
+ field_type = self.type_map(field[1])
+ field_mode = 'REPEATED' if field[1] in (1009, 1005, 1007,
+ 1016) else 'NULLABLE'
+ schema.append({
+ 'name': field_name,
+ 'type': field_type,
+ 'mode': field_mode,
+ })
+
+ logging.info('Using schema for %s: %s', self.schema_filename, schema)
+ tmp_schema_file_handle = NamedTemporaryFile(delete=True)
+ s = json.dumps(schema, sort_keys=True)
+ if PY3:
+ s = s.encode('utf-8')
+ tmp_schema_file_handle.write(s)
+ return {self.schema_filename: tmp_schema_file_handle}
+
+ def _upload_to_gcs(self, files_to_upload):
+ """
+ Upload all of the file splits (and optionally the schema .json file) to
+ Google Cloud Storage.
+ """
+ hook = GoogleCloudStorageHook(
+ google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+ delegate_to=self.delegate_to)
+ for object, tmp_file_handle in files_to_upload.items():
+ hook.upload(self.bucket, object, tmp_file_handle.name,
+ 'application/json')
+
+ @classmethod
+ def convert_types(cls, value):
+ """
+ Takes a value from Postgres, and converts it to a value that's safe for
+ JSON/Google Cloud Storage/BigQuery. Dates are converted to UTC seconds.
+ Decimals are converted to floats. Times are converted to seconds.
+ """
+ if type(value) in (datetime.datetime, datetime.date):
+ return time.mktime(value.timetuple())
+ elif type(value) == datetime.time:
+ formated_time = time.strptime(str(value), "%H:%M:%S")
+ return datetime.timedelta(
+ hours=formated_time.tm_hour,
+ minutes=formated_time.tm_min,
+ seconds=formated_time.tm_sec).seconds
+ elif isinstance(value, Decimal):
+ return float(value)
+ else:
+ return value
+
+ @classmethod
+ def type_map(cls, postgres_type):
+ """
+ Helper function that maps from Postgres fields to BigQuery fields. Used
+ when a schema_filename is set.
+ """
+ d = {
+ 1114: 'TIMESTAMP',
+ 1184: 'TIMESTAMP',
+ 1082: 'TIMESTAMP',
+ 1083: 'TIMESTAMP',
+ 1005: 'INTEGER',
+ 1007: 'INTEGER',
+ 1016: 'INTEGER',
+ 20: 'INTEGER',
+ 21: 'INTEGER',
+ 23: 'INTEGER',
+ 16: 'BOOLEAN',
+ 700: 'FLOAT',
+ 701: 'FLOAT',
+ 1700: 'FLOAT'
+ }
+
+ return d[postgres_type] if postgres_type in d else 'STRING'
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8fa2e90/tests/contrib/operators/test_postgres_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_postgres_to_gcs_operator.py b/tests/contrib/operators/test_postgres_to_gcs_operator.py
new file mode 100644
index 0000000..9567243
--- /dev/null
+++ b/tests/contrib/operators/test_postgres_to_gcs_operator.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import sys
+import unittest
+
+from airflow.contrib.operators.postgres_to_gcs_operator import PostgresToGoogleCloudStorageOperator
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+PY3 = sys.version_info[0] == 3
+
+TASK_ID = 'test-postgres-to-gcs'
+POSTGRES_CONN_ID = 'postgres_conn_test'
+SQL = 'select 1'
+BUCKET = 'gs://test'
+FILENAME = 'test_{}.ndjson'
+# we expect the psycopg cursor to return encoded strs in py2 and decoded in py3
+if PY3:
+ ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)]
+ CURSOR_DESCRIPTION = (('some_str', 0), ('some_num', 1005))
+else:
+ ROWS = [(b'mock_row_content_1', 42), (b'mock_row_content_2', 43), (b'mock_row_content_3', 44)]
+ CURSOR_DESCRIPTION = ((b'some_str', 0), (b'some_num', 1005))
+NDJSON_LINES = [
+ b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
+ b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
+ b'{"some_num": 44, "some_str": "mock_row_content_3"}\n'
+]
+SCHEMA_FILENAME = 'schema_test.json'
+SCHEMA_JSON = b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, {"mode": "REPEATED", "name": "some_num", "type": "INTEGER"}]'
+
+
+class PostgresToGoogleCloudStorageOperatorTest(unittest.TestCase):
+ def test_init(self):
+ """Test PostgresToGoogleCloudStorageOperator instance is properly initialized."""
+ op = PostgresToGoogleCloudStorageOperator(
+ task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=FILENAME)
+ self.assertEqual(op.task_id, TASK_ID)
+ self.assertEqual(op.sql, SQL)
+ self.assertEqual(op.bucket, BUCKET)
+ self.assertEqual(op.filename, FILENAME)
+
+ @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook')
+ @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook')
+ def test_exec_success(self, gcs_hook_mock_class, pg_hook_mock_class):
+ """Test the execute function in case where the run is successful."""
+ op = PostgresToGoogleCloudStorageOperator(
+ task_id=TASK_ID,
+ postgres_conn_id=POSTGRES_CONN_ID,
+ sql=SQL,
+ bucket=BUCKET,
+ filename=FILENAME)
+
+ pg_hook_mock = pg_hook_mock_class.return_value
+ pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+ pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+ gcs_hook_mock = gcs_hook_mock_class.return_value
+
+ def _assert_upload(bucket, obj, tmp_filename, content_type):
+ self.assertEqual(BUCKET, bucket)
+ self.assertEqual(FILENAME.format(0), obj)
+ self.assertEqual('application/json', content_type)
+ with open(tmp_filename, 'rb') as f:
+ self.assertEqual(b''.join(NDJSON_LINES), f.read())
+
+ gcs_hook_mock.upload.side_effect = _assert_upload
+
+ op.execute(None)
+
+ pg_hook_mock_class.assert_called_once_with(postgres_conn_id=POSTGRES_CONN_ID)
+ pg_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL, None)
+
+ @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook')
+ @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook')
+ def test_file_splitting(self, gcs_hook_mock_class, pg_hook_mock_class):
+ """Test that ndjson is split by approx_max_file_size_bytes param."""
+ pg_hook_mock = pg_hook_mock_class.return_value
+ pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+ pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+ gcs_hook_mock = gcs_hook_mock_class.return_value
+ expected_upload = {
+ FILENAME.format(0): b''.join(NDJSON_LINES[:2]),
+ FILENAME.format(1): NDJSON_LINES[2],
+ }
+
+ def _assert_upload(bucket, obj, tmp_filename, content_type):
+ self.assertEqual(BUCKET, bucket)
+ self.assertEqual('application/json', content_type)
+ with open(tmp_filename, 'rb') as f:
+ self.assertEqual(expected_upload[obj], f.read())
+
+ gcs_hook_mock.upload.side_effect = _assert_upload
+
+ op = PostgresToGoogleCloudStorageOperator(
+ task_id=TASK_ID,
+ sql=SQL,
+ bucket=BUCKET,
+ filename=FILENAME,
+ approx_max_file_size_bytes=len(expected_upload[FILENAME.format(0)]))
+ op.execute(None)
+
+ @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook')
+ @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook')
+ def test_schema_file(self, gcs_hook_mock_class, pg_hook_mock_class):
+ """Test writing schema files."""
+ pg_hook_mock = pg_hook_mock_class.return_value
+ pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+ pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+ gcs_hook_mock = gcs_hook_mock_class.return_value
+
+ def _assert_upload(bucket, obj, tmp_filename, content_type):
+ if obj == SCHEMA_FILENAME:
+ with open(tmp_filename, 'rb') as f:
+ self.assertEqual(SCHEMA_JSON, f.read())
+
+ gcs_hook_mock.upload.side_effect = _assert_upload
+
+ op = PostgresToGoogleCloudStorageOperator(
+ task_id=TASK_ID,
+ sql=SQL,
+ bucket=BUCKET,
+ filename=FILENAME,
+ schema_filename=SCHEMA_FILENAME)
+ op.execute(None)
+
+ # once for the file and once for the schema
+ self.assertEqual(2, gcs_hook_mock.upload.call_count)