You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/12/24 13:18:48 UTC

[airflow] branch master updated: Add OracleToGCS Transfer (#13246)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new e7aeacf  Add OracleToGCS Transfer (#13246)
e7aeacf is described below

commit e7aeacf335d373007a32ac65680ba6b5b19f5c9f
Author: Tuan Nguyen <an...@me.com>
AuthorDate: Thu Dec 24 20:18:34 2020 +0700

    Add OracleToGCS Transfer (#13246)
---
 CONTRIBUTING.rst                                   |   2 +-
 airflow/providers/dependencies.json                |   1 +
 .../cloud/example_dags/example_oracle_to_gcs.py    |  39 ++++++
 .../google/cloud/transfers/oracle_to_gcs.py        | 128 ++++++++++++++++++
 airflow/providers/google/provider.yaml             |   4 +
 .../operators/transfer/oracle_to_gcs.rst           |  58 +++++++++
 .../google/cloud/transfers/test_oracle_to_gcs.py   | 143 +++++++++++++++++++++
 7 files changed, 374 insertions(+), 1 deletion(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 5010639..b5ea216 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -692,7 +692,7 @@ apache.hive                amazon,microsoft.mssql,mysql,presto,samba,vertica
 apache.livy                http
 dingding                   http
 discord                    http
-google                     amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,salesforce,sftp,ssh
+google                     amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh
 hashicorp                  google
 microsoft.azure            google,oracle
 microsoft.mssql            odbc
diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json
index 748b1a5..0708440 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -36,6 +36,7 @@
     "microsoft.azure",
     "microsoft.mssql",
     "mysql",
+    "oracle",
     "postgres",
     "presto",
     "salesforce",
diff --git a/airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py
new file mode 100644
index 0000000..bfccc7a
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator
+from airflow.utils import dates
+
+GCS_BUCKET = os.environ.get("GCP_GCS_BUCKET", "example-airflow-oracle-gcs")
+FILENAME = 'test_file'
+
+SQL_QUERY = "SELECT * from test_table"
+
+with models.DAG(
+    'example_oracle_to_gcs',
+    default_args=dict(start_date=dates.days_ago(1)),
+    schedule_interval=None,
+    tags=['example'],
+) as dag:
+    # [START howto_operator_oracle_to_gcs]
+    upload = OracleToGCSOperator(
+        task_id='oracle_to_gcs', sql=SQL_QUERY, bucket=GCS_BUCKET, filename=FILENAME, export_format='csv'
+    )
+    # [END howto_operator_oracle_to_gcs]
diff --git a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
new file mode 100644
index 0000000..935730e
--- /dev/null
+++ b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import calendar
+from datetime import date, datetime, timedelta
+from decimal import Decimal
+from typing import Dict
+
+import cx_Oracle
+
+from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator
+from airflow.providers.oracle.hooks.oracle import OracleHook
+from airflow.utils.decorators import apply_defaults
+
+
+class OracleToGCSOperator(BaseSQLToGCSOperator):
+    """Copy data from Oracle to Google Cloud Storage in JSON or CSV format.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:OracleToGCSOperator`
+
+    :param oracle_conn_id: Reference to a specific Oracle hook.
+    :type oracle_conn_id: str
+    :param ensure_utc: Ensure TIMESTAMP columns exported as UTC. If set to
+        `False`, TIMESTAMP columns will be exported using the Oracle server's
+        default timezone.
+    :type ensure_utc: bool
+    """
+
+    ui_color = '#a0e08c'
+
+    type_map = {
+        cx_Oracle.DB_TYPE_BINARY_DOUBLE: 'DECIMAL',
+        cx_Oracle.DB_TYPE_BINARY_FLOAT: 'DECIMAL',
+        cx_Oracle.DB_TYPE_BINARY_INTEGER: 'INTEGER',
+        cx_Oracle.DB_TYPE_BOOLEAN: 'BOOLEAN',
+        cx_Oracle.DB_TYPE_DATE: 'TIMESTAMP',
+        cx_Oracle.DB_TYPE_NUMBER: 'NUMERIC',
+        cx_Oracle.DB_TYPE_TIMESTAMP: 'TIMESTAMP',
+        cx_Oracle.DB_TYPE_TIMESTAMP_LTZ: 'TIMESTAMP',
+        cx_Oracle.DB_TYPE_TIMESTAMP_TZ: 'TIMESTAMP',
+    }
+
+    @apply_defaults
+    def __init__(self, *, oracle_conn_id='oracle_default', ensure_utc=False, **kwargs):
+        super().__init__(**kwargs)
+        self.ensure_utc = ensure_utc
+        self.oracle_conn_id = oracle_conn_id
+
+    def query(self):
+        """Queries Oracle and returns a cursor to the results."""
+        oracle = OracleHook(oracle_conn_id=self.oracle_conn_id)
+        conn = oracle.get_conn()
+        cursor = conn.cursor()
+        if self.ensure_utc:
+            # Ensure TIMESTAMP results are in UTC
+            tz_query = "SET time_zone = '+00:00'"
+            self.log.info('Executing: %s', tz_query)
+            cursor.execute(tz_query)
+        self.log.info('Executing: %s', self.sql)
+        cursor.execute(self.sql)
+        return cursor
+
+    def field_to_bigquery(self, field) -> Dict[str, str]:
+        field_type = self.type_map.get(field[1], "STRING")
+
+        field_mode = "NULLABLE" if not field[6] or field_type == "TIMESTAMP" else "REQUIRED"
+        return {
+            'name': field[0],
+            'type': field_type,
+            'mode': field_mode,
+        }
+
+    def convert_type(self, value, schema_type):
+        """
+        Takes a value from Oracle db, and converts it to a value that's safe for
+        JSON/Google Cloud Storage/BigQuery.
+
+        * Datetimes are converted to UTC seconds.
+        * Decimals are converted to floats.
+        * Dates are converted to ISO formatted string if given schema_type is
+          DATE, or UTC seconds otherwise.
+        * Binary type fields are converted to integer if given schema_type is
+          INTEGER, or encoded with base64 otherwise. Imported BYTES data must
+          be base64-encoded according to BigQuery documentation:
+          https://cloud.google.com/bigquery/data-types
+
+        :param value: Oracle db column value
+        :type value: Any
+        :param schema_type: BigQuery data type
+        :type schema_type: str
+        """
+        if value is None:
+            return value
+        if isinstance(value, datetime):
+            value = calendar.timegm(value.timetuple())
+        elif isinstance(value, timedelta):
+            value = value.total_seconds()
+        elif isinstance(value, Decimal):
+            value = float(value)
+        elif isinstance(value, date):
+            if schema_type == "DATE":
+                value = value.isoformat()
+            else:
+                value = calendar.timegm(value.timetuple())
+        elif isinstance(value, bytes):
+            if schema_type == "INTEGER":
+                value = int.from_bytes(value, "big")
+            else:
+                value = base64.standard_b64encode(value).decode('ascii')
+        return value
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 190edba..4338a64 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -633,6 +633,10 @@ transfers:
     target-integration-name: Google Cloud Storage (GCS)
     python-module: airflow.providers.google.cloud.transfers.mysql_to_gcs
     how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/mysql_to_gcs.rst
+  - source-integration-name: Oracle
+    target-integration-name: Google Cloud Storage (GCS)
+    python-module: airflow.providers.google.cloud.transfers.oracle_to_gcs
+    how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst
   - source-integration-name: Google Cloud Storage (GCS)
     target-integration-name: Google Spreadsheet
     how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst
diff --git a/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst
new file mode 100644
index 0000000..861cc25
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst
@@ -0,0 +1,58 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Oracle To Google Cloud Storage Operator
+=======================================
+The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) service is
+used to store large data from various applications. This page shows how to copy
+data from Oracle to GCS.
+
+.. contents::
+  :depth: 1
+  :local:
+
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include::/howto/operator/google/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:OracleToGCSOperator:
+
+OracleToGCSOperator
+~~~~~~~~~~~~~~~~~~~
+
+:class:`~airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleToGCSOperator` allows you to upload
+data from Oracle database to GCS.
+
+When you use this operator, you can optionally compress the data being uploaded to gzip format.
+
+Below is an example of using this operator to upload data to GCS.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_oracle_to_gcs]
+    :end-before: [END howto_operator_oracle_to_gcs]
+
+
+Reference
+---------
+
+For further information, look at:
+* `cx_Oracle Documentation <https://cx-oracle.readthedocs.io/en/latest/>`__
+* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
diff --git a/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py b/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py
new file mode 100644
index 0000000..03f81d7
--- /dev/null
+++ b/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import cx_Oracle
+
+from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator
+
+TASK_ID = 'test-oracle-to-gcs'
+ORACLE_CONN_ID = 'oracle_conn_test'
+SQL = 'select 1'
+BUCKET = 'gs://test'
+JSON_FILENAME = 'test_{}.ndjson'
+GZIP = False
+
+ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)]
+CURSOR_DESCRIPTION = (
+    ('some_str', cx_Oracle.DB_TYPE_VARCHAR, None, None, None, None, None),
+    ('some_num', cx_Oracle.DB_TYPE_NUMBER, None, None, None, None, None),
+)
+NDJSON_LINES = [
+    b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
+    b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
+    b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
+]
+SCHEMA_FILENAME = 'schema_test.json'
+SCHEMA_JSON = [
+    b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
+    b'{"mode": "NULLABLE", "name": "some_num", "type": "NUMERIC"}]',
+]
+
+
+class TestOracleToGoogleCloudStorageOperator(unittest.TestCase):
+    def test_init(self):
+        """Test OracleToGoogleCloudStorageOperator instance is properly initialized."""
+        op = OracleToGCSOperator(task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME)
+        self.assertEqual(op.task_id, TASK_ID)
+        self.assertEqual(op.sql, SQL)
+        self.assertEqual(op.bucket, BUCKET)
+        self.assertEqual(op.filename, JSON_FILENAME)
+
+    @mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
+    @mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
+    def test_exec_success_json(self, gcs_hook_mock_class, oracle_hook_mock_class):
+        """Test successful run of execute function for JSON"""
+        op = OracleToGCSOperator(
+            task_id=TASK_ID, oracle_conn_id=ORACLE_CONN_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME
+        )
+
+        oracle_hook_mock = oracle_hook_mock_class.return_value
+        oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+        oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+        gcs_hook_mock = gcs_hook_mock_class.return_value
+
+        def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
+            self.assertEqual(BUCKET, bucket)
+            self.assertEqual(JSON_FILENAME.format(0), obj)
+            self.assertEqual('application/json', mime_type)
+            self.assertEqual(GZIP, gzip)
+            with open(tmp_filename, 'rb') as file:
+                self.assertEqual(b''.join(NDJSON_LINES), file.read())
+
+        gcs_hook_mock.upload.side_effect = _assert_upload
+
+        op.execute(None)
+
+        oracle_hook_mock_class.assert_called_once_with(oracle_conn_id=ORACLE_CONN_ID)
+        oracle_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL)
+
+    @mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
+    @mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
+    def test_file_splitting(self, gcs_hook_mock_class, oracle_hook_mock_class):
+        """Test that ndjson is split by approx_max_file_size_bytes param."""
+        oracle_hook_mock = oracle_hook_mock_class.return_value
+        oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+        oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+        gcs_hook_mock = gcs_hook_mock_class.return_value
+        expected_upload = {
+            JSON_FILENAME.format(0): b''.join(NDJSON_LINES[:2]),
+            JSON_FILENAME.format(1): NDJSON_LINES[2],
+        }
+
+        def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
+            self.assertEqual(BUCKET, bucket)
+            self.assertEqual('application/json', mime_type)
+            self.assertEqual(GZIP, gzip)
+            with open(tmp_filename, 'rb') as file:
+                self.assertEqual(expected_upload[obj], file.read())
+
+        gcs_hook_mock.upload.side_effect = _assert_upload
+
+        op = OracleToGCSOperator(
+            task_id=TASK_ID,
+            sql=SQL,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+            approx_max_file_size_bytes=len(expected_upload[JSON_FILENAME.format(0)]),
+        )
+        op.execute(None)
+
+    @mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
+    @mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
+    def test_schema_file(self, gcs_hook_mock_class, oracle_hook_mock_class):
+        """Test writing schema files."""
+        oracle_hook_mock = oracle_hook_mock_class.return_value
+        oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
+        oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
+
+        gcs_hook_mock = gcs_hook_mock_class.return_value
+
+        def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):  # pylint: disable=unused-argument
+            if obj == SCHEMA_FILENAME:
+                with open(tmp_filename, 'rb') as file:
+                    self.assertEqual(b''.join(SCHEMA_JSON), file.read())
+
+        gcs_hook_mock.upload.side_effect = _assert_upload
+
+        op = OracleToGCSOperator(
+            task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME, schema_filename=SCHEMA_FILENAME
+        )
+        op.execute(None)
+
+        # once for the file and once for the schema
+        self.assertEqual(2, gcs_hook_mock.upload.call_count)