You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/27 20:39:39 UTC
[02/11] incubator-airflow git commit: [AIRFLOW-1802] Convert database
fields to timezone aware
[AIRFLOW-1802] Convert database fields to timezone aware
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b658c78f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b658c78f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b658c78f
Branch: refs/heads/master
Commit: b658c78f6705415f444bd206cc02cd51219b3f8d
Parents: 59aba30
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Nov 10 22:36:31 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:53:03 2017 +0100
----------------------------------------------------------------------
.../0e2a74e0fc9f_add_time_zone_awareness.py | 213 +++++++++++++++++++
airflow/settings.py | 15 ++
2 files changed, 228 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
new file mode 100644
index 0000000..bb65c1c
--- /dev/null
+++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
@@ -0,0 +1,213 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add time zone awareness
+
+Revision ID: 0e2a74e0fc9f
+Revises: d2ae31099d61
+Create Date: 2017-11-10 22:22:31.326152
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '0e2a74e0fc9f'
+down_revision = 'd2ae31099d61'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+from sqlalchemy.dialects import mysql
+import sqlalchemy as sa
+
+
+def upgrade():
+ conn = op.get_bind()
+ if conn.dialect.name == 'mysql':
+ conn.execute("SET time_zone = '+00:00'")
+ op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='job', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='job', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))
+
+ op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+ op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+ else:
+ # sqlite datetime is fine as is not converting
+ if conn.dialect.name == 'sqlite':
+ return
+
+ # we try to be database agnostic, but not every db (e.g. sqlserver)
+ # supports per session time zones
+ if conn.dialect.name == 'postgresql':
+ conn.execute("set timezone=UTC")
+
+ op.alter_column(table_name='chart', column_name='last_modified', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='dag', column_name='last_expired', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='job', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='job', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='known_event', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='known_event', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+ op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True))
+
+ op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+ op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+
+
+def downgrade():
+ conn = op.get_bind()
+ if conn.dialect.name == 'mysql':
+ conn.execute("SET time_zone = '+00:00'")
+ op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='import_error', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+ op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='xcom', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+ else:
+ if conn.dialect.name == 'sqlite':
+ return
+
+ # we try to be database agnostic, but not every db (e.g. sqlserver)
+ # supports per session time zones
+ if conn.dialect.name == 'postgresql':
+ conn.execute("set timezone=UTC")
+
+ op.alter_column(table_name='chart', column_name='last_modified', type_=sa.DateTime())
+
+ op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.DateTime())
+ op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.DateTime())
+ op.alter_column(table_name='dag', column_name='last_expired', type_=sa.DateTime())
+
+ op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.DateTime())
+
+ op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.DateTime())
+ op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.DateTime())
+ op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.DateTime())
+
+ op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.DateTime())
+
+ op.alter_column(table_name='job', column_name='start_date', type_=sa.DateTime())
+ op.alter_column(table_name='job', column_name='end_date', type_=sa.DateTime())
+ op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.DateTime())
+
+ op.alter_column(table_name='known_event', column_name='start_date', type_=sa.DateTime())
+ op.alter_column(table_name='known_event', column_name='end_date', type_=sa.DateTime())
+
+ op.alter_column(table_name='log', column_name='dttm', type_=sa.DateTime())
+ op.alter_column(table_name='log', column_name='execution_date', type_=sa.DateTime())
+
+ op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.DateTime(), nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.DateTime())
+
+ op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.DateTime())
+ op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.DateTime())
+ op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.DateTime())
+
+ op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.DateTime(), nullable=False)
+ op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.DateTime())
+ op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.DateTime())
+ op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.DateTime())
+
+ op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.DateTime())
+ op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.DateTime())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 0dfbb15..ceb9b50 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -138,6 +138,20 @@ def configure_orm(disable_connection_pool=False):
sessionmaker(autocommit=False, autoflush=False, bind=engine))
+def configure_adapters():
+ from pendulum import Pendulum
+ try:
+ from sqlite3 import register_adapter
+ register_adapter(Pendulum, lambda val: val.isoformat(' '))
+ except ImportError:
+ pass
+ try:
+ import MySQLdb.converters
+ MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
+ except ImportError:
+ pass
+
+
try:
from airflow_local_settings import *
@@ -147,6 +161,7 @@ except:
configure_logging()
configure_vars()
+configure_adapters()
configure_orm()
# Const stuff