You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/27 20:39:39 UTC

[02/11] incubator-airflow git commit: [AIRFLOW-1802] Convert database fields to timezone aware

[AIRFLOW-1802] Convert database fields to timezone aware


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b658c78f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b658c78f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b658c78f

Branch: refs/heads/master
Commit: b658c78f6705415f444bd206cc02cd51219b3f8d
Parents: 59aba30
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Nov 10 22:36:31 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:53:03 2017 +0100

----------------------------------------------------------------------
 .../0e2a74e0fc9f_add_time_zone_awareness.py     | 213 +++++++++++++++++++
 airflow/settings.py                             |  15 ++
 2 files changed, 228 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
new file mode 100644
index 0000000..bb65c1c
--- /dev/null
+++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
@@ -0,0 +1,213 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add time zone awareness
+
+Revision ID: 0e2a74e0fc9f
+Revises: d2ae31099d61
+Create Date: 2017-11-10 22:22:31.326152
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '0e2a74e0fc9f'
+down_revision = 'd2ae31099d61'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+from sqlalchemy.dialects import mysql
+import sqlalchemy as sa
+
+
+def upgrade():
+    conn = op.get_bind()
+    if conn.dialect.name == 'mysql':
+        conn.execute("SET time_zone = '+00:00'")
+        op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='job', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='job', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+    else:
+        # sqlite datetime is fine as is not converting
+        if conn.dialect.name == 'sqlite':
+            return
+
+        # we try to be database agnostic, but not every db (e.g. sqlserver)
+        # supports per session time zones
+        if conn.dialect.name == 'postgresql':
+            conn.execute("set timezone=UTC")
+
+        op.alter_column(table_name='chart', column_name='last_modified', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag', column_name='last_expired', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='job', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='job', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='known_event', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+
+
+def downgrade():
+    conn = op.get_bind()
+    if conn.dialect.name == 'mysql':
+        conn.execute("SET time_zone = '+00:00'")
+        op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='import_error', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='xcom', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+    else:
+        if conn.dialect.name == 'sqlite':
+            return
+
+        # we try to be database agnostic, but not every db (e.g. sqlserver)
+        # supports per session time zones
+        if conn.dialect.name == 'postgresql':
+            conn.execute("set timezone=UTC")
+
+        op.alter_column(table_name='chart', column_name='last_modified', type_=sa.DateTime())
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.DateTime())
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.DateTime())
+        op.alter_column(table_name='dag', column_name='last_expired', type_=sa.DateTime())
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.DateTime())
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.DateTime())
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.DateTime())
+
+        op.alter_column(table_name='job', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='job', column_name='end_date', type_=sa.DateTime())
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.DateTime())
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='known_event', column_name='end_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='log', column_name='dttm', type_=sa.DateTime())
+        op.alter_column(table_name='log', column_name='execution_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.DateTime(), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.DateTime())
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.DateTime(), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.DateTime())
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.DateTime())
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.DateTime())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 0dfbb15..ceb9b50 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -138,6 +138,20 @@ def configure_orm(disable_connection_pool=False):
         sessionmaker(autocommit=False, autoflush=False, bind=engine))
 
 
+def configure_adapters():
+    from pendulum import Pendulum
+    try:
+        from sqlite3 import register_adapter
+        register_adapter(Pendulum, lambda val: val.isoformat(' '))
+    except ImportError:
+        pass
+    try:
+        import MySQLdb.converters
+        MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
+    except ImportError:
+        pass
+
+
 try:
     from airflow_local_settings import *
 
@@ -147,6 +161,7 @@ except:
 
 configure_logging()
 configure_vars()
+configure_adapters()
 configure_orm()
 
 # Const stuff