You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jo...@apache.org on 2018/03/10 01:42:30 UTC
[1/2] incubator-airflow git commit: [AIRFLOW-2169] Add schema to
MySqlToGoogleCloudStorageOperator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 803767959 -> 398746d8f
[AIRFLOW-2169] Add schema to MySqlToGoogleCloudStorageOperator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6f96f0f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6f96f0f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6f96f0f7
Branch: refs/heads/master
Commit: 6f96f0f79b825f325026ab1f29defb203def3964
Parents: e28f6e2
Author: Hongyi Wang <ho...@wepay.com>
Authored: Fri Mar 9 16:21:16 2018 -0800
Committer: Hongyi Wang <ho...@wepay.com>
Committed: Fri Mar 9 16:21:16 2018 -0800
----------------------------------------------------------------------
airflow/contrib/operators/mysql_to_gcs.py | 56 +++++++++++++++++---------
1 file changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6f96f0f7/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index 41e23f5..9ba84c7 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -24,6 +24,7 @@ from datetime import date, datetime
from decimal import Decimal
from MySQLdb.constants import FIELD_TYPE
from tempfile import NamedTemporaryFile
+from six import string_types
PY3 = sys.version_info[0] == 3
@@ -32,7 +33,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
"""
Copy data from MySQL to Google cloud storage in JSON format.
"""
- template_fields = ('sql', 'bucket', 'filename', 'schema_filename')
+ template_fields = ('sql', 'bucket', 'filename', 'schema_filename', 'schema')
template_ext = ('.sql',)
ui_color = '#a0e08c'
@@ -45,6 +46,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
approx_max_file_size_bytes=1900000000,
mysql_conn_id='mysql_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
+ schema=None,
delegate_to=None,
*args,
**kwargs):
@@ -73,6 +75,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
:param google_cloud_storage_conn_id: Reference to a specific Google
cloud storage hook.
:type google_cloud_storage_conn_id: string
+ :param schema: The schema to use, if any. Should be a list of dict or
+ a str. Examples could be see: https://cloud.google.com/bigquery
+ /docs/schemas#specifying_a_json_schema_file
+ :type schema: str or list
:param delegate_to: The account to impersonate, if any. For this to
work, the service account making the request must have domain-wide
delegation enabled.
@@ -85,6 +91,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
self.approx_max_file_size_bytes = approx_max_file_size_bytes
self.mysql_conn_id = mysql_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+ self.schema = schema
self.delegate_to = delegate_to
def execute(self, context):
@@ -160,26 +167,36 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
contains the BigQuery schema fields in .json format.
"""
schema = []
- for field in cursor.description:
- # See PEP 249 for details about the description tuple.
- field_name = field[0]
- field_type = self.type_map(field[1])
- # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
- # for required fields because some MySQL timestamps can't be
- # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
- field_mode = 'NULLABLE' if field[6] or field_type == 'TIMESTAMP' else 'REQUIRED'
- schema.append({
- 'name': field_name,
- 'type': field_type,
- 'mode': field_mode,
- })
+ tmp_schema_file_handle = NamedTemporaryFile(delete=True)
+ if self.schema is not None and isinstance(self.schema, string_types):
+ schema = self.schema
+ tmp_schema_file_handle.write(schema)
+ else:
+ if self.schema is not None and isinstance(self.schema, list):
+ schema = self.schema
+ else:
+ for field in cursor.description:
+ # See PEP 249 for details about the description tuple.
+ field_name = field[0]
+ field_type = self.type_map(field[1])
+ # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
+ # for required fields because some MySQL timestamps can't be
+ # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
+ if field[6] or field_type == 'TIMESTAMP':
+ field_mode = 'NULLABLE'
+ else:
+ field_mode = 'REQUIRED'
+ schema.append({
+ 'name': field_name,
+ 'type': field_type,
+ 'mode': field_mode,
+ })
+ s = json.dumps(schema, tmp_schema_file_handle)
+ if PY3:
+ s = s.encode('utf-8')
+ tmp_schema_file_handle.write(s)
self.log.info('Using schema for %s: %s', self.schema_filename, schema)
- tmp_schema_file_handle = NamedTemporaryFile(delete=True)
- s = json.dumps(schema, tmp_schema_file_handle)
- if PY3:
- s = s.encode('utf-8')
- tmp_schema_file_handle.write(s)
return {self.schema_filename: tmp_schema_file_handle}
def _upload_to_gcs(self, files_to_upload):
@@ -214,6 +231,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
when a schema_filename is set.
"""
d = {
+ FIELD_TYPE.INT24: 'INTEGER',
FIELD_TYPE.TINY: 'INTEGER',
FIELD_TYPE.BIT: 'INTEGER',
FIELD_TYPE.DATETIME: 'TIMESTAMP',
[2/2] incubator-airflow git commit: Merge pull request #3091 from
whynick1/master
Posted by jo...@apache.org.
Merge pull request #3091 from whynick1/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/398746d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/398746d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/398746d8
Branch: refs/heads/master
Commit: 398746d8f61880b2621c97448cc5e3a86a9657b8
Parents: 8037679 6f96f0f
Author: Joy Gao <Jo...@apache.org>
Authored: Fri Mar 9 17:42:25 2018 -0800
Committer: Joy Gao <Jo...@apache.org>
Committed: Fri Mar 9 17:42:25 2018 -0800
----------------------------------------------------------------------
airflow/contrib/operators/mysql_to_gcs.py | 56 +++++++++++++++++---------
1 file changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------