You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/12/24 13:18:48 UTC
[airflow] branch master updated: Add OracleToGCS Transfer (#13246)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new e7aeacf Add OracleToGCS Transfer (#13246)
e7aeacf is described below
commit e7aeacf335d373007a32ac65680ba6b5b19f5c9f
Author: Tuan Nguyen <an...@me.com>
AuthorDate: Thu Dec 24 20:18:34 2020 +0700
Add OracleToGCS Transfer (#13246)
---
CONTRIBUTING.rst | 2 +-
airflow/providers/dependencies.json | 1 +
.../cloud/example_dags/example_oracle_to_gcs.py | 39 ++++++
.../google/cloud/transfers/oracle_to_gcs.py | 128 ++++++++++++++++++
airflow/providers/google/provider.yaml | 4 +
.../operators/transfer/oracle_to_gcs.rst | 58 +++++++++
.../google/cloud/transfers/test_oracle_to_gcs.py | 143 +++++++++++++++++++++
7 files changed, 374 insertions(+), 1 deletion(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 5010639..b5ea216 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -692,7 +692,7 @@ apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
apache.livy http
dingding http
discord http
-google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,salesforce,sftp,ssh
+google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh
hashicorp google
microsoft.azure google,oracle
microsoft.mssql odbc
diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json
index 748b1a5..0708440 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -36,6 +36,7 @@
"microsoft.azure",
"microsoft.mssql",
"mysql",
+ "oracle",
"postgres",
"presto",
"salesforce",
diff --git a/airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py
new file mode 100644
index 0000000..bfccc7a
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator
+from airflow.utils import dates
+
+GCS_BUCKET = os.environ.get("GCP_GCS_BUCKET", "example-airflow-oracle-gcs")
+FILENAME = 'test_file'
+
+SQL_QUERY = "SELECT * from test_table"
+
+with models.DAG(
+ 'example_oracle_to_gcs',
+ default_args=dict(start_date=dates.days_ago(1)),
+ schedule_interval=None,
+ tags=['example'],
+) as dag:
+ # [START howto_operator_oracle_to_gcs]
+ upload = OracleToGCSOperator(
+ task_id='oracle_to_gcs', sql=SQL_QUERY, bucket=GCS_BUCKET, filename=FILENAME, export_format='csv'
+ )
+ # [END howto_operator_oracle_to_gcs]
diff --git a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
new file mode 100644
index 0000000..935730e
--- /dev/null
+++ b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import calendar
+from datetime import date, datetime, timedelta
+from decimal import Decimal
+from typing import Dict
+
+import cx_Oracle
+
+from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator
+from airflow.providers.oracle.hooks.oracle import OracleHook
+from airflow.utils.decorators import apply_defaults
+
+
+class OracleToGCSOperator(BaseSQLToGCSOperator):
+ """Copy data from Oracle to Google Cloud Storage in JSON or CSV format.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:OracleToGCSOperator`
+
+ :param oracle_conn_id: Reference to a specific Oracle hook.
+ :type oracle_conn_id: str
+ :param ensure_utc: Ensure TIMESTAMP columns exported as UTC. If set to
+ `False`, TIMESTAMP columns will be exported using the Oracle server's
+ default timezone.
+ :type ensure_utc: bool
+ """
+
+ ui_color = '#a0e08c'
+
+ type_map = {
+ cx_Oracle.DB_TYPE_BINARY_DOUBLE: 'DECIMAL',
+ cx_Oracle.DB_TYPE_BINARY_FLOAT: 'DECIMAL',
+ cx_Oracle.DB_TYPE_BINARY_INTEGER: 'INTEGER',
+ cx_Oracle.DB_TYPE_BOOLEAN: 'BOOLEAN',
+ cx_Oracle.DB_TYPE_DATE: 'TIMESTAMP',
+ cx_Oracle.DB_TYPE_NUMBER: 'NUMERIC',
+ cx_Oracle.DB_TYPE_TIMESTAMP: 'TIMESTAMP',
+ cx_Oracle.DB_TYPE_TIMESTAMP_LTZ: 'TIMESTAMP',
+ cx_Oracle.DB_TYPE_TIMESTAMP_TZ: 'TIMESTAMP',
+ }
+
+ @apply_defaults
+ def __init__(self, *, oracle_conn_id='oracle_default', ensure_utc=False, **kwargs):
+ super().__init__(**kwargs)
+ self.ensure_utc = ensure_utc
+ self.oracle_conn_id = oracle_conn_id
+
+ def query(self):
+ """Queries Oracle and returns a cursor to the results."""
+ oracle = OracleHook(oracle_conn_id=self.oracle_conn_id)
+ conn = oracle.get_conn()
+ cursor = conn.cursor()
+ if self.ensure_utc:
+ # Ensure TIMESTAMP results are in UTC
+ tz_query = "SET time_zone = '+00:00'"
+ self.log.info('Executing: %s', tz_query)
+ cursor.execute(tz_query)
+ self.log.info('Executing: %s', self.sql)
+ cursor.execute(self.sql)
+ return cursor
+
+ def field_to_bigquery(self, field) -> Dict[str, str]:
+ field_type = self.type_map.get(field[1], "STRING")
+
+ field_mode = "NULLABLE" if not field[6] or field_type == "TIMESTAMP" else "REQUIRED"
+ return {
+ 'name': field[0],
+ 'type': field_type,
+ 'mode': field_mode,
+ }
+
+ def convert_type(self, value, schema_type):
+ """
+ Takes a value from Oracle db, and converts it to a value that's safe for
+ JSON/Google Cloud Storage/BigQuery.
+
+ * Datetimes are converted to UTC seconds.
+ * Decimals are converted to floats.
+ * Dates are converted to ISO formatted string if given schema_type is
+ DATE, or UTC seconds otherwise.
+ * Binary type fields are converted to integer if given schema_type is
+ INTEGER, or encoded with base64 otherwise. Imported BYTES data must
+ be base64-encoded according to BigQuery documentation:
+ https://cloud.google.com/bigquery/data-types
+
+ :param value: Oracle db column value
+ :type value: Any
+ :param schema_type: BigQuery data type
+ :type schema_type: str
+ """
+ if value is None:
+ return value
+ if isinstance(value, datetime):
+ value = calendar.timegm(value.timetuple())
+ elif isinstance(value, timedelta):
+ value = value.total_seconds()
+ elif isinstance(value, Decimal):
+ value = float(value)
+ elif isinstance(value, date):
+ if schema_type == "DATE":
+ value = value.isoformat()
+ else:
+ value = calendar.timegm(value.timetuple())
+ elif isinstance(value, bytes):
+ if schema_type == "INTEGER":
+ value = int.from_bytes(value, "big")
+ else:
+ value = base64.standard_b64encode(value).decode('ascii')
+ return value
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 190edba..4338a64 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -633,6 +633,10 @@ transfers:
target-integration-name: Google Cloud Storage (GCS)
python-module: airflow.providers.google.cloud.transfers.mysql_to_gcs
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/mysql_to_gcs.rst
+ - source-integration-name: Oracle
+ target-integration-name: Google Cloud Storage (GCS)
+ python-module: airflow.providers.google.cloud.transfers.oracle_to_gcs
+ how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst
- source-integration-name: Google Cloud Storage (GCS)
target-integration-name: Google Spreadsheet
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst
diff --git a/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst
new file mode 100644
index 0000000..861cc25
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst
@@ -0,0 +1,58 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Oracle To Google Cloud Storage Operator
+=======================================
+The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) service is
+used to store large data from various applications. This page shows how to copy
+data from Oracle to GCS.
+
+.. contents::
+ :depth: 1
+ :local:
+
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include::/howto/operator/google/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:OracleToGCSOperator:
+
+OracleToGCSOperator
+~~~~~~~~~~~~~~~~~~~
+
+:class:`~airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleToGCSOperator` allows you to upload
+data from Oracle database to GCS.
+
+When you use this operator, you can optionally compress the data being uploaded to gzip format.
+
+Below is an example of using this operator to upload data to GCS.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_operator_oracle_to_gcs]
+ :end-before: [END howto_operator_oracle_to_gcs]
+
+
+Reference
+---------
+
+For further information, look at:
+* `cx_Oracle Documentation <https://cx-oracle.readthedocs.io/en/latest/>`__
+* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
diff --git a/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py b/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py
new file mode 100644
index 0000000..03f81d7
--- /dev/null
+++ b/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import cx_Oracle
+
+from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator
+
+TASK_ID = 'test-oracle-to-gcs'
+ORACLE_CONN_ID = 'oracle_conn_test'
+SQL = 'select 1'
+BUCKET = 'gs://test'
+JSON_FILENAME = 'test_{}.ndjson'
+GZIP = False
+
+ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)]
+CURSOR_DESCRIPTION = (
+ ('some_str', cx_Oracle.DB_TYPE_VARCHAR, None, None, None, None, None),
+ ('some_num', cx_Oracle.DB_TYPE_NUMBER, None, None, None, None, None),
+)
+NDJSON_LINES = [
+ b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
+ b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
+ b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
+]
+SCHEMA_FILENAME = 'schema_test.json'
+SCHEMA_JSON = [
+ b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
+ b'{"mode": "NULLABLE", "name": "some_num", "type": "NUMERIC"}]',
+]
+
+
+class TestOracleToGoogleCloudStorageOperator(unittest.TestCase):
+ def test_init(self):
+ """Test OracleToGoogleCloudStorageOperator instance is properly initialized."""
+ op = OracleToGCSOperator(task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME)
+ self.assertEqual(op.task_id, TASK_ID)
+ self.assertEqual(op.sql, SQL)
+ self.assertEqual(op.bucket, BUCKET)
+ self.assertEqual(op.filename, JSON_FILENAME)
+
+ @mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
+ @mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
+ def test_exec_success_json(self, gcs_hook_mock_class, oracle_hook_mock_class):
+ """Test successful run of execute function for JSON"""
+ op = OracleToGCSOperator(
+ task_id=TASK_ID, oracle_conn_id=ORACLE_CONN_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME
+ )
+
+ oracle_hook_mock = oracle_hook_mock_class.return_value
+ oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+ oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+ gcs_hook_mock = gcs_hook_mock_class.return_value
+
+ def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
+ self.assertEqual(BUCKET, bucket)
+ self.assertEqual(JSON_FILENAME.format(0), obj)
+ self.assertEqual('application/json', mime_type)
+ self.assertEqual(GZIP, gzip)
+ with open(tmp_filename, 'rb') as file:
+ self.assertEqual(b''.join(NDJSON_LINES), file.read())
+
+ gcs_hook_mock.upload.side_effect = _assert_upload
+
+ op.execute(None)
+
+ oracle_hook_mock_class.assert_called_once_with(oracle_conn_id=ORACLE_CONN_ID)
+ oracle_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL)
+
+ @mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
+ @mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
+ def test_file_splitting(self, gcs_hook_mock_class, oracle_hook_mock_class):
+ """Test that ndjson is split by approx_max_file_size_bytes param."""
+ oracle_hook_mock = oracle_hook_mock_class.return_value
+ oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+ oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+ gcs_hook_mock = gcs_hook_mock_class.return_value
+ expected_upload = {
+ JSON_FILENAME.format(0): b''.join(NDJSON_LINES[:2]),
+ JSON_FILENAME.format(1): NDJSON_LINES[2],
+ }
+
+ def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
+ self.assertEqual(BUCKET, bucket)
+ self.assertEqual('application/json', mime_type)
+ self.assertEqual(GZIP, gzip)
+ with open(tmp_filename, 'rb') as file:
+ self.assertEqual(expected_upload[obj], file.read())
+
+ gcs_hook_mock.upload.side_effect = _assert_upload
+
+ op = OracleToGCSOperator(
+ task_id=TASK_ID,
+ sql=SQL,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ approx_max_file_size_bytes=len(expected_upload[JSON_FILENAME.format(0)]),
+ )
+ op.execute(None)
+
+ @mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
+ @mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
+ def test_schema_file(self, gcs_hook_mock_class, oracle_hook_mock_class):
+ """Test writing schema files."""
+ oracle_hook_mock = oracle_hook_mock_class.return_value
+ oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+ oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+ gcs_hook_mock = gcs_hook_mock_class.return_value
+
+ def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip): # pylint: disable=unused-argument
+ if obj == SCHEMA_FILENAME:
+ with open(tmp_filename, 'rb') as file:
+ self.assertEqual(b''.join(SCHEMA_JSON), file.read())
+
+ gcs_hook_mock.upload.side_effect = _assert_upload
+
+ op = OracleToGCSOperator(
+ task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME, schema_filename=SCHEMA_FILENAME
+ )
+ op.execute(None)
+
+ # once for the file and once for the schema
+ self.assertEqual(2, gcs_hook_mock.upload.call_count)