You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/27 20:39:43 UTC

[06/11] incubator-airflow git commit: [AIRFLOW-1807] Force use of time zone aware db fields

[AIRFLOW-1807] Force use of time zone aware db fields

This change will check if all date times being stored are
indeed timezone aware.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2f168634
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2f168634
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2f168634

Branch: refs/heads/master
Commit: 2f168634aac4aa138f00634bbb9f4e3993346ffa
Parents: c857436
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Nov 11 13:32:02 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py      |  9 +++++----
 airflow/models.py    | 47 ++++++++++++++++++++++++-----------------------
 airflow/www/utils.py |  8 ++++++++
 airflow/www/views.py |  7 +++++++
 setup.py             |  1 +
 5 files changed, 45 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 4e1864e..868e785 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -33,9 +33,10 @@ import datetime
 from collections import defaultdict
 from past.builtins import basestring
 from sqlalchemy import (
-    Column, Integer, String, DateTime, func, Index, or_, and_, not_)
+    Column, Integer, String, func, Index, or_, and_, not_)
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
+from sqlalchemy_utc import UtcDateTime
 from tabulate import tabulate
 from time import sleep
 
@@ -77,9 +78,9 @@ class BaseJob(Base, LoggingMixin):
     dag_id = Column(String(ID_LEN),)
     state = Column(String(20))
     job_type = Column(String(30))
-    start_date = Column(DateTime())
-    end_date = Column(DateTime())
-    latest_heartbeat = Column(DateTime())
+    start_date = Column(UtcDateTime())
+    end_date = Column(UtcDateTime())
+    latest_heartbeat = Column(UtcDateTime())
     executor_class = Column(String(500))
     hostname = Column(String(500))
     unixname = Column(String(1000))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index fe62ac5..f8a5f0f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -55,6 +55,7 @@ from sqlalchemy import func, or_, and_
 from sqlalchemy.ext.declarative import declarative_base, declared_attr
 from sqlalchemy.dialects.mysql import LONGTEXT
 from sqlalchemy.orm import reconstructor, relationship, synonym
+from sqlalchemy_utc import UtcDateTime
 
 from croniter import croniter
 import six
@@ -732,7 +733,7 @@ class DagPickle(Base):
     """
     id = Column(Integer, primary_key=True)
     pickle = Column(PickleType(pickler=dill))
-    created_dttm = Column(DateTime, default=func.now())
+    created_dttm = Column(UtcDateTime, default=func.now())
     pickle_hash = Column(Text)
 
     __tablename__ = "dag_pickle"
@@ -763,9 +764,9 @@ class TaskInstance(Base, LoggingMixin):
 
     task_id = Column(String(ID_LEN), primary_key=True)
     dag_id = Column(String(ID_LEN), primary_key=True)
-    execution_date = Column(DateTime, primary_key=True)
-    start_date = Column(DateTime)
-    end_date = Column(DateTime)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    start_date = Column(UtcDateTime)
+    end_date = Column(UtcDateTime)
     duration = Column(Float)
     state = Column(String(20))
     try_number = Column(Integer, default=0)
@@ -777,7 +778,7 @@ class TaskInstance(Base, LoggingMixin):
     queue = Column(String(50))
     priority_weight = Column(Integer)
     operator = Column(String(1000))
-    queued_dttm = Column(DateTime)
+    queued_dttm = Column(UtcDateTime)
     pid = Column(Integer)
 
     __table_args__ = (
@@ -1862,9 +1863,9 @@ class TaskFail(Base):
 
     task_id = Column(String(ID_LEN), primary_key=True)
     dag_id = Column(String(ID_LEN), primary_key=True)
-    execution_date = Column(DateTime, primary_key=True)
-    start_date = Column(DateTime)
-    end_date = Column(DateTime)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    start_date = Column(UtcDateTime)
+    end_date = Column(UtcDateTime)
     duration = Column(Float)
 
     def __init__(self, task, execution_date, start_date, end_date):
@@ -1884,11 +1885,11 @@ class Log(Base):
     __tablename__ = "log"
 
     id = Column(Integer, primary_key=True)
-    dttm = Column(DateTime)
+    dttm = Column(UtcDateTime)
     dag_id = Column(String(ID_LEN))
     task_id = Column(String(ID_LEN))
     event = Column(String(30))
-    execution_date = Column(DateTime)
+    execution_date = Column(UtcDateTime)
     owner = Column(String(500))
     extra = Column(Text)
 
@@ -2741,12 +2742,12 @@ class DagModel(Base):
     # Whether that DAG was seen on the last DagBag load
     is_active = Column(Boolean, default=False)
     # Last time the scheduler started
-    last_scheduler_run = Column(DateTime)
+    last_scheduler_run = Column(UtcDateTime)
     # Last time this DAG was pickled
-    last_pickled = Column(DateTime)
+    last_pickled = Column(UtcDateTime)
     # Time when the DAG last received a refresh signal
     # (e.g. the DAG's "refresh" button was clicked in the web UI)
-    last_expired = Column(DateTime)
+    last_expired = Column(UtcDateTime)
     # Whether (one  of) the scheduler is scheduling this DAG at the moment
     scheduler_lock = Column(Boolean)
     # Foreign key to the latest pickle_id
@@ -3904,7 +3905,7 @@ class Chart(Base):
         "User", cascade=False, cascade_backrefs=False, backref='charts')
     x_is_date = Column(Boolean, default=True)
     iteration_no = Column(Integer, default=0)
-    last_modified = Column(DateTime, default=func.now())
+    last_modified = Column(UtcDateTime, default=func.now())
 
     def __repr__(self):
         return self.label
@@ -3925,8 +3926,8 @@ class KnownEvent(Base):
 
     id = Column(Integer, primary_key=True)
     label = Column(String(200))
-    start_date = Column(DateTime)
-    end_date = Column(DateTime)
+    start_date = Column(UtcDateTime)
+    end_date = Column(UtcDateTime)
     user_id = Column(Integer(), ForeignKey('users.id'),)
     known_event_type_id = Column(Integer(), ForeignKey('known_event_type.id'),)
     reported_by = relationship(
@@ -4054,7 +4055,7 @@ class XCom(Base, LoggingMixin):
     value = Column(LargeBinary)
     timestamp = Column(
         DateTime, default=func.now(), nullable=False)
-    execution_date = Column(DateTime, nullable=False)
+    execution_date = Column(UtcDateTime, nullable=False)
 
     # source information
     task_id = Column(String(ID_LEN), nullable=False)
@@ -4372,9 +4373,9 @@ class DagRun(Base, LoggingMixin):
 
     id = Column(Integer, primary_key=True)
     dag_id = Column(String(ID_LEN))
-    execution_date = Column(DateTime, default=func.now())
-    start_date = Column(DateTime, default=func.now())
-    end_date = Column(DateTime)
+    execution_date = Column(UtcDateTime, default=func.now())
+    start_date = Column(UtcDateTime, default=func.now())
+    end_date = Column(UtcDateTime)
     _state = Column('state', String(50), default=State.RUNNING)
     run_id = Column(String(ID_LEN))
     external_trigger = Column(Boolean, default=True)
@@ -4790,9 +4791,9 @@ class SlaMiss(Base):
 
     task_id = Column(String(ID_LEN), primary_key=True)
     dag_id = Column(String(ID_LEN), primary_key=True)
-    execution_date = Column(DateTime, primary_key=True)
+    execution_date = Column(UtcDateTime, primary_key=True)
     email_sent = Column(Boolean, default=False)
-    timestamp = Column(DateTime)
+    timestamp = Column(UtcDateTime)
     description = Column(Text)
     notification_sent = Column(Boolean, default=False)
 
@@ -4804,6 +4805,6 @@ class SlaMiss(Base):
 class ImportError(Base):
     __tablename__ = "import_error"
     id = Column(Integer, primary_key=True)
-    timestamp = Column(DateTime)
+    timestamp = Column(UtcDateTime)
     filename = Column(String(1024))
     stacktrace = Column(Text)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index ae1fb5f..aba85fa 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -26,6 +26,8 @@ import json
 import time
 
 from flask import after_this_request, request, Response
+from flask_admin.contrib.sqla.filters import FilterConverter
+from flask_admin.model import filters
 from flask_login import current_user
 import wtforms
 from wtforms.compat import text_type
@@ -386,3 +388,9 @@ class AceEditorWidget(wtforms.widgets.TextArea):
             form_name=field.id,
         )
         return wtforms.widgets.core.HTMLString(html)
+
+
+class UtcFilterConverter(FilterConverter):
+    @filters.convert('utcdatetime')
+    def conv_utcdatetime(self, column, name, **kwargs):
+        return self.conv_datetime(column, name, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index a6378bf..550a7f8 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2055,6 +2055,7 @@ class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly):
     column_searchable_list = ('dag_id', 'task_id',)
     column_filters = (
         'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date')
+    filter_converter = wwwutils.UtcFilterConverter()
     form_widget_args = {
         'email_sent': {'disabled': True},
         'timestamp': {'disabled': True},
@@ -2349,6 +2350,7 @@ class XComView(wwwutils.SuperUserMixin, AirflowModelView):
 
     column_filters = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
     column_searchable_list = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
+    filter_converter = wwwutils.UtcFilterConverter()
 
 
 class JobModelView(ModelViewOnly):
@@ -2365,6 +2367,7 @@ class JobModelView(ModelViewOnly):
         hostname=nobr_f,
         state=state_f,
         latest_heartbeat=datetime_f)
+    filter_converter = wwwutils.UtcFilterConverter()
 
 
 class DagRunModelView(ModelViewOnly):
@@ -2387,6 +2390,7 @@ class DagRunModelView(ModelViewOnly):
     column_list = (
         'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
     column_filters = column_list
+    filter_converter = wwwutils.UtcFilterConverter()
     column_searchable_list = ('dag_id', 'state', 'run_id')
     column_formatters = dict(
         execution_date=datetime_f,
@@ -2453,6 +2457,7 @@ class LogModelView(ModelViewOnly):
     column_display_actions = False
     column_default_sort = ('dttm', True)
     column_filters = ('dag_id', 'task_id', 'execution_date')
+    filter_converter = wwwutils.UtcFilterConverter()
     column_formatters = dict(
         dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
 
@@ -2463,6 +2468,7 @@ class TaskInstanceModelView(ModelViewOnly):
     column_filters = (
         'state', 'dag_id', 'task_id', 'execution_date', 'hostname',
         'queue', 'pool', 'operator', 'start_date', 'end_date')
+    filter_converter = wwwutils.UtcFilterConverter()
     named_filter_urls = True
     column_formatters = dict(
         log_url=log_url_formatter,
@@ -2752,6 +2758,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
     column_filters = (
         'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag',
         'last_scheduler_run', 'last_expired')
+    filter_converter = wwwutils.UtcFilterConverter()
     form_widget_args = {
         'last_scheduler_run': {'disabled': True},
         'fileloc': {'disabled': True},

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index de2bd54..cfe0d92 100644
--- a/setup.py
+++ b/setup.py
@@ -236,6 +236,7 @@ def do_setup():
             'requests>=2.5.1, <3',
             'setproctitle>=1.1.8, <2',
             'sqlalchemy>=0.9.8',
+            'sqlalchemy-utc>=0.9.0',
             'tabulate>=0.7.5, <0.8.0',
             'thrift>=0.9.2',
             'tzlocal>=1.4',