You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/27 20:39:38 UTC

[01/11] incubator-airflow git commit: [AIRFLOW-1804] Add time zone configuration options

Repository: incubator-airflow
Updated Branches:
  refs/heads/master d8115e982 -> d99053106


[AIRFLOW-1804] Add time zone configuration options

Time zone defaults to UTC as is the default now in order
to maintain backwards compatibility.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a47255fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a47255fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a47255fb

Branch: refs/heads/master
Commit: a47255fb2dad6035d03fe7acc50d2e0e65639c3e
Parents: b658c78
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Nov 11 11:27:48 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:53:03 2017 +0100

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg | 10 +++-
 airflow/settings.py                          | 14 +++++
 airflow/utils/timezone.py                    | 68 +++++++++++++++++++++++
 setup.py                                     |  2 +
 tests/utils/test_timezone.py                 | 48 ++++++++++++++++
 5 files changed, 139 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a47255fb/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index fd78253..a339673 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -54,6 +54,10 @@ logging_config_class =
 log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
 simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
 
+# Default timezone in case supplied date times are naive
+# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
+default_timezone = utc
+
 # The executor class that airflow should use. Choices include
 # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
 executor = SequentialExecutor
@@ -364,12 +368,12 @@ authenticate = False
 
 [ldap]
 # set this to ldaps://<your.ldap.server>:<port>
-uri = 
+uri =
 user_filter = objectClass=*
 user_name_attr = uid
 group_member_attr = memberOf
-superuser_filter = 
-data_profiler_filter = 
+superuser_filter =
+data_profiler_filter =
 bind_user = cn=Manager,dc=example,dc=com
 bind_password = insecure
 basedn = dc=example,dc=com

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a47255fb/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index ceb9b50..39342df 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -19,6 +19,8 @@ from __future__ import unicode_literals
 
 import logging
 import os
+import pendulum
+
 from sqlalchemy import create_engine
 from sqlalchemy.orm import scoped_session, sessionmaker
 from sqlalchemy.pool import NullPool
@@ -29,6 +31,18 @@ from airflow.logging_config import configure_logging
 log = logging.getLogger(__name__)
 
 
+TIMEZONE = pendulum.timezone('UTC')
+try:
+    tz = conf.get("core", "default_timezone")
+    if tz == "system":
+        TIMEZONE = pendulum.local_timezone()
+    else:
+        TIMEZONE = pendulum.timezone(tz)
+except:
+    pass
+log.info("Configured default timezone %s" % TIMEZONE)
+
+
 class DummyStatsLogger(object):
     @classmethod
     def incr(cls, stat, count=1, rate=1):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a47255fb/airflow/utils/timezone.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
new file mode 100644
index 0000000..b8fe89e
--- /dev/null
+++ b/airflow/utils/timezone.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import pendulum
+
+from airflow.settings import TIMEZONE
+
+
+# UTC time zone as a tzinfo instance.
+utc = pendulum.timezone('UTC')
+
+
+def is_localized(value):
+    """
+    Determine if a given datetime.datetime is aware.
+    The concept is defined in Python's docs:
+    http://docs.python.org/library/datetime.html#datetime.tzinfo
+    Assuming value.tzinfo is either None or a proper datetime.tzinfo,
+    value.utcoffset() implements the appropriate logic.
+    """
+    return value.utcoffset() is not None
+
+
+def is_naive(value):
+    """
+    Determine if a given datetime.datetime is naive.
+    The concept is defined in Python's docs:
+    http://docs.python.org/library/datetime.html#datetime.tzinfo
+    Assuming value.tzinfo is either None or a proper datetime.tzinfo,
+    value.utcoffset() implements the appropriate logic.
+    """
+    return value.utcoffset() is None
+
+
+def utcnow():
+    """
+    Get the current date and time in UTC
+    :return:
+    """
+
+    return pendulum.utcnow()
+
+
+def convert_to_utc(value):
+    """
+    Returns the datetime with the default timezone added if timezone
+    information was not associated
+    :param value: datetime
+    :return: datetime with tzinfo
+    """
+    if not value:
+        return value
+
+    if not is_localized(value):
+        value = pendulum.instance(value, TIMEZONE)
+
+    return value.astimezone(utc)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a47255fb/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index e9d68b3..9408192 100644
--- a/setup.py
+++ b/setup.py
@@ -226,6 +226,7 @@ def do_setup():
             'lxml>=3.6.0, <4.0',
             'markdown>=2.5.2, <3.0',
             'pandas>=0.17.1, <1.0.0',
+            'pendulum==1.3.1',
             'psutil>=4.2.0, <5.0.0',
             'pygments>=2.0.1, <3.0',
             'python-daemon>=2.1.1, <2.2',
@@ -236,6 +237,7 @@ def do_setup():
             'sqlalchemy>=0.9.8',
             'tabulate>=0.7.5, <0.8.0',
             'thrift>=0.9.2',
+            'tzlocal>=1.4',
             'zope.deprecation>=4.0, <5.0',
         ],
         setup_requires=[

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a47255fb/tests/utils/test_timezone.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py
new file mode 100644
index 0000000..778c772
--- /dev/null
+++ b/tests/utils/test_timezone.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import pendulum
+import unittest
+
+from airflow.utils import timezone
+
+CET = pendulum.timezone("Europe/Paris")
+EAT = pendulum.timezone('Africa/Nairobi')      # Africa/Nairobi
+ICT = pendulum.timezone('Asia/Bangkok')      # Asia/Bangkok
+UTC = timezone.utc
+
+
+class TimezoneTest(unittest.TestCase):
+    def test_is_aware(self):
+        self.assertTrue(timezone.is_localized(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)))
+        self.assertFalse(timezone.is_localized(datetime.datetime(2011, 9, 1, 13, 20, 30)))
+
+    def test_is_naive(self):
+        self.assertFalse(timezone.is_naive(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)))
+        self.assertTrue(timezone.is_naive(datetime.datetime(2011, 9, 1, 13, 20, 30)))
+
+    def test_utcnow(self):
+        now = timezone.utcnow()
+        self.assertTrue(timezone.is_localized(now))
+        self.assertEquals(now.replace(tzinfo=None), now.astimezone(UTC).replace(tzinfo=None))
+
+    def test_convert_to_utc(self):
+        naive = datetime.datetime(2011, 9, 1, 13, 20, 30)
+        utc = datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=UTC)
+        self.assertEquals(utc, timezone.convert_to_utc(naive))
+
+        eat = datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)
+        utc = datetime.datetime(2011, 9, 1, 10, 20, 30, tzinfo=UTC)
+        self.assertEquals(utc, timezone.convert_to_utc(eat))


[11/11] incubator-airflow git commit: Merge pull request #2781 from bolkedebruin/AIRFLOW-1802

Posted by bo...@apache.org.
Merge pull request #2781 from bolkedebruin/AIRFLOW-1802


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d9905310
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d9905310
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d9905310

Branch: refs/heads/master
Commit: d99053106e58ec377333c64f68ee84ed1dcdf61c
Parents: d8115e9 f1ab56c
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Nov 27 21:39:28 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 21:39:28 2017 +0100

----------------------------------------------------------------------
 airflow/api/common/experimental/mark_tasks.py   |   9 +-
 airflow/api/common/experimental/trigger_dag.py  |   6 +-
 airflow/config_templates/default_airflow.cfg    |  10 +-
 airflow/jobs.py                                 |  80 +++----
 .../0e2a74e0fc9f_add_time_zone_awareness.py     | 213 +++++++++++++++++++
 airflow/models.py                               | 160 +++++++++-----
 airflow/operators/dagrun_operator.py            |   5 +-
 airflow/operators/latest_only_operator.py       |   4 +-
 airflow/operators/sensors.py                    |  10 +-
 airflow/settings.py                             |  29 +++
 airflow/ti_deps/deps/not_in_retry_period_dep.py |   4 +-
 airflow/ti_deps/deps/runnable_exec_date_dep.py  |   4 +-
 airflow/utils/dag_processing.py                 |   8 +-
 airflow/utils/dates.py                          |  36 ++--
 airflow/utils/timezone.py                       | 152 +++++++++++++
 airflow/www/api/experimental/endpoints.py       |  16 +-
 airflow/www/forms.py                            |   4 +-
 airflow/www/utils.py                            |  14 +-
 airflow/www/views.py                            | 105 ++++++---
 docs/index.rst                                  |   1 +
 docs/timezone.rst                               | 143 +++++++++++++
 scripts/ci/requirements.txt                     |   1 +
 setup.py                                        |   4 +
 tests/api/client/test_local_client.py           |  31 +--
 tests/contrib/operators/test_druid_operator.py  |   6 +-
 tests/contrib/operators/test_fs_operator.py     |   5 +-
 .../operators/test_jira_operator_test.py        |   5 +-
 tests/contrib/operators/test_sftp_operator.py   |  21 +-
 .../operators/test_spark_submit_operator.py     |   9 +-
 tests/contrib/operators/test_ssh_operator.py    |  12 +-
 tests/contrib/sensors/test_jira_sensor_test.py  |   6 +-
 tests/contrib/sensors/test_redis_sensor.py      |   4 +-
 tests/core.py                                   |  32 +--
 tests/dags/test_cli_triggered_dags.py           |   4 +-
 tests/executors/dask_executor.py                |  13 +-
 tests/impersonation.py                          |   2 +-
 tests/jobs.py                                   |  33 +--
 tests/models.py                                 |  91 ++++----
 tests/operators/latest_only_operator.py         |  51 ++---
 tests/operators/operators.py                    |   7 +-
 tests/operators/python_operator.py              |  11 +-
 tests/operators/sensors.py                      |   8 +-
 tests/operators/subdag_operator.py              |   5 +-
 tests/operators/test_virtualenv_operator.py     |  11 +-
 .../deps/test_not_in_retry_period_dep.py        |   3 +-
 .../ti_deps/deps/test_runnable_exec_date_dep.py |   3 +-
 tests/utils/log/test_file_processor_handler.py  |  10 +-
 tests/utils/log/test_s3_task_handler.py         |   2 +-
 tests/utils/test_dates.py                       |  14 +-
 tests/utils/test_log_handlers.py                |   2 +-
 tests/utils/test_timezone.py                    |  67 ++++++
 tests/www/api/experimental/test_endpoints.py    |   9 +-
 tests/www/test_views.py                         |   4 +-
 53 files changed, 1104 insertions(+), 395 deletions(-)
----------------------------------------------------------------------



[05/11] incubator-airflow git commit: [AIRFLOW-1806] Use naive datetime for cron scheduling

Posted by bo...@apache.org.
[AIRFLOW-1806] Use naive datetime for cron scheduling

Converting to naive time is required in order to make sure
to run at exact times for crons.
E.g. if you specify to run at 8:00pm every day you do not
want suddenly to run at 7:00pm due to DST.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dcac3e97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dcac3e97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dcac3e97

Branch: refs/heads/master
Commit: dcac3e97a4e1b4429e4baf9d8ab2a7eb4139ad74
Parents: 2f16863
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Nov 11 13:38:59 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/models.py            | 20 +++++++++--
 airflow/utils/timezone.py    | 70 +++++++++++++++++++++++++++++++++++++++
 tests/utils/test_timezone.py | 19 +++++++++++
 3 files changed, 107 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcac3e97/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f8a5f0f..33f3636 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -46,6 +46,8 @@ import textwrap
 import traceback
 import warnings
 import hashlib
+
+from datetime import datetime
 from urllib.parse import urlparse
 
 from sqlalchemy import (
@@ -2996,16 +2998,30 @@ class DAG(BaseDag, LoggingMixin):
             num=num, delta=self._schedule_interval)
 
     def following_schedule(self, dttm):
+        """
+        Calculates the following schedule for this dag in local time
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
         if isinstance(self._schedule_interval, six.string_types):
+            dttm = timezone.make_naive(dttm, self.timezone)
             cron = croniter(self._schedule_interval, dttm)
-            return cron.get_next(datetime)
+            following = timezone.make_aware(cron.get_next(datetime), self.timezone)
+            return timezone.convert_to_utc(following)
         elif isinstance(self._schedule_interval, timedelta):
             return dttm + self._schedule_interval
 
     def previous_schedule(self, dttm):
+        """
+        Calculates the previous schedule for this dag in local time
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
         if isinstance(self._schedule_interval, six.string_types):
+            dttm = timezone.make_naive(dttm, self.timezone)
             cron = croniter(self._schedule_interval, dttm)
-            return cron.get_prev(datetime)
+            prev = timezone.make_aware(cron.get_prev(datetime), self.timezone)
+            return timezone.convert_to_utc(prev)
         elif isinstance(self._schedule_interval, timedelta):
             return dttm - self._schedule_interval
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcac3e97/airflow/utils/timezone.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index b8fe89e..5ae7802 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import datetime as dt
 import pendulum
 
 from airflow.settings import TIMEZONE
@@ -66,3 +67,72 @@ def convert_to_utc(value):
         value = pendulum.instance(value, TIMEZONE)
 
     return value.astimezone(utc)
+
+
+def make_aware(value, timezone=None):
+    """
+    Make a naive datetime.datetime in a given time zone aware.
+
+    :param value: datetime
+    :param timezone: timezone
+    :return: localized datetime in settings.TIMEZONE or timezone
+
+    """
+    if timezone is None:
+        timezone = TIMEZONE
+
+    # Check that we won't overwrite the timezone of an aware datetime.
+    if is_localized(value):
+        raise ValueError(
+            "make_aware expects a naive datetime, got %s" % value)
+
+    if hasattr(timezone, 'localize'):
+        # This method is available for pytz time zones.
+        return timezone.localize(value)
+    elif hasattr(timezone, 'convert'):
+        # For pendulum
+        return timezone.convert(value)
+    else:
+        # This may be wrong around DST changes!
+        return value.replace(tzinfo=timezone)
+
+
+def make_naive(value, timezone=None):
+    """
+    Make an aware datetime.datetime naive in a given time zone.
+
+    :param value: datetime
+    :param timezone: timezone
+    :return: naive datetime
+    """
+    if timezone is None:
+        timezone = TIMEZONE
+
+    # Emulate the behavior of astimezone() on Python < 3.6.
+    if is_naive(value):
+        raise ValueError("make_naive() cannot be applied to a naive datetime")
+
+    o = value.astimezone(timezone)
+
+    # cross library compatibility
+    naive = dt.datetime(o.year,
+                        o.month,
+                        o.day,
+                        o.hour,
+                        o.minute,
+                        o.second,
+                        o.microsecond)
+
+    return naive
+
+
+def datetime(*args, **kwargs):
+    """
+    Wrapper around datetime.datetime that adds settings.TIMEZONE if tzinfo not specified
+
+    :return: datetime.datetime
+    """
+    if 'tzinfo' not in kwargs:
+        kwargs['tzinfo'] = TIMEZONE
+
+    return dt.datetime(*args, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcac3e97/tests/utils/test_timezone.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py
index 778c772..3d4cc7c 100644
--- a/tests/utils/test_timezone.py
+++ b/tests/utils/test_timezone.py
@@ -46,3 +46,22 @@ class TimezoneTest(unittest.TestCase):
         eat = datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)
         utc = datetime.datetime(2011, 9, 1, 10, 20, 30, tzinfo=UTC)
         self.assertEquals(utc, timezone.convert_to_utc(eat))
+
+    def test_make_naive(self):
+        self.assertEqual(
+            timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT),
+            datetime.datetime(2011, 9, 1, 13, 20, 30))
+        self.assertEqual(
+            timezone.make_naive(datetime.datetime(2011, 9, 1, 17, 20, 30, tzinfo=ICT), EAT),
+            datetime.datetime(2011, 9, 1, 13, 20, 30))
+
+        with self.assertRaises(ValueError):
+            timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT)
+
+    def test_make_aware(self):
+        self.assertEqual(
+            timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT),
+            datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT))
+        with self.assertRaises(ValueError):
+            timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT)
+


[03/11] incubator-airflow git commit: [AIRFLOW-1808] Convert all utcnow() to time zone aware

Posted by bo...@apache.org.
[AIRFLOW-1808] Convert all utcnow() to time zone aware

datetime.utcnow() does not set time zone information.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c857436b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c857436b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c857436b

Branch: refs/heads/master
Commit: c857436b7565777695c8336dbac2ef60a74d71d1
Parents: a47255f
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Nov 27 15:54:20 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:20 2017 +0100

----------------------------------------------------------------------
 airflow/api/common/experimental/mark_tasks.py   |  9 +-
 airflow/api/common/experimental/trigger_dag.py  |  6 +-
 airflow/jobs.py                                 | 71 ++++++++--------
 airflow/models.py                               | 88 ++++++++++++--------
 airflow/operators/dagrun_operator.py            |  5 +-
 airflow/operators/latest_only_operator.py       |  4 +-
 airflow/operators/sensors.py                    | 10 +--
 airflow/ti_deps/deps/not_in_retry_period_dep.py |  4 +-
 airflow/ti_deps/deps/runnable_exec_date_dep.py  |  4 +-
 airflow/utils/dag_processing.py                 |  8 +-
 airflow/utils/dates.py                          |  5 +-
 airflow/www/forms.py                            |  4 +-
 airflow/www/utils.py                            |  7 +-
 airflow/www/views.py                            | 31 +++----
 setup.py                                        |  1 +
 15 files changed, 142 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/api/common/experimental/mark_tasks.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py
index e0ea313..e9366e0 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -12,16 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
-
 from airflow.jobs import BackfillJob
 from airflow.models import DagRun, TaskInstance
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.settings import Session
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 from sqlalchemy import or_
 
+
 def _create_dagruns(dag, execution_dates, state, run_id_template):
     """
     Infers from the dates which dag runs need to be created and does so.
@@ -39,7 +39,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template):
         dr = dag.create_dagrun(
             run_id=run_id_template.format(date.isoformat()),
             execution_date=date,
-            start_date=datetime.datetime.utcnow(),
+            start_date=timezone.utcnow(),
             external_trigger=False,
             state=state,
         )
@@ -67,7 +67,7 @@ def set_state(task, execution_date, upstream=False, downstream=False,
     :param commit: Commit tasks to be altered to the database
     :return: list of tasks that have been created and updated
     """
-    assert isinstance(execution_date, datetime.datetime)
+    assert timezone.is_localized(execution_date)
 
     # microseconds are supported by the database, but is not handled
     # correctly by airflow on e.g. the filesystem and in other places
@@ -185,6 +185,7 @@ def set_state(task, execution_date, upstream=False, downstream=False,
 
     return tis_altered
 
+
 def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
     """
     Set the state of a dag run and all task instances associated with the dag

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index bfb6ad4..9d9934d 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -12,11 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import json
 
 from airflow.exceptions import AirflowException
 from airflow.models import DagRun, DagBag
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 
@@ -29,9 +29,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
     dag = dagbag.get_dag(dag_id)
 
     if not execution_date:
-        execution_date = datetime.datetime.utcnow()
+        execution_date = timezone.utcnow()
 
-    assert isinstance(execution_date, datetime.datetime)
+    assert timezone.is_localized(execution_date)
     execution_date = execution_date.replace(microsecond=0)
 
     if not run_id:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 664fab5..4e1864e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -28,8 +28,9 @@ import socket
 import sys
 import threading
 import time
+import datetime
+
 from collections import defaultdict
-from datetime import datetime
 from past.builtins import basestring
 from sqlalchemy import (
     Column, Integer, String, DateTime, func, Index, or_, and_, not_)
@@ -46,7 +47,7 @@ from airflow.models import DAG, DagRun
 from airflow.settings import Stats
 from airflow.task_runner import get_task_runner
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
-from airflow.utils import asciiart
+from airflow.utils import asciiart, timezone
 from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           DagFileProcessorManager,
                                           SimpleDag,
@@ -100,22 +101,22 @@ class BaseJob(Base, LoggingMixin):
         self.hostname = socket.getfqdn()
         self.executor = executor
         self.executor_class = executor.__class__.__name__
-        self.start_date = datetime.utcnow()
-        self.latest_heartbeat = datetime.utcnow()
+        self.start_date = timezone.utcnow()
+        self.latest_heartbeat = timezone.utcnow()
         self.heartrate = heartrate
         self.unixname = getpass.getuser()
         super(BaseJob, self).__init__(*args, **kwargs)
 
     def is_alive(self):
         return (
-            (datetime.utcnow() - self.latest_heartbeat).seconds <
+            (timezone.utcnow() - self.latest_heartbeat).seconds <
             (conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
         )
 
     @provide_session
     def kill(self, session=None):
         job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
-        job.end_date = datetime.utcnow()
+        job.end_date = timezone.utcnow()
         try:
             self.on_kill()
         except:
@@ -165,14 +166,14 @@ class BaseJob(Base, LoggingMixin):
         if job.latest_heartbeat:
             sleep_for = max(
                 0,
-                self.heartrate - (datetime.utcnow() - job.latest_heartbeat).total_seconds())
+                self.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds())
 
         sleep(sleep_for)
 
         # Update last heartbeat time
         with create_session() as session:
             job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
-            job.latest_heartbeat = datetime.utcnow()
+            job.latest_heartbeat = timezone.utcnow()
             session.merge(job)
             session.commit()
 
@@ -194,7 +195,7 @@ class BaseJob(Base, LoggingMixin):
             self._execute()
 
             # Marking the success in the DB
-            self.end_date = datetime.utcnow()
+            self.end_date = timezone.utcnow()
             self.state = State.SUCCESS
             session.merge(self)
             session.commit()
@@ -399,7 +400,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             self._pickle_dags,
             self._dag_id_white_list,
             "DagFileProcessor{}".format(self._instance_id))
-        self._start_time = datetime.utcnow()
+        self._start_time = timezone.utcnow()
 
     def terminate(self, sigkill=False):
         """
@@ -615,16 +616,16 @@ class SchedulerJob(BaseJob):
             TI.execution_date == sq.c.max_ti,
         ).all()
 
-        ts = datetime.utcnow()
+        ts = timezone.utcnow()
         SlaMiss = models.SlaMiss
         for ti in max_tis:
             task = dag.get_task(ti.task_id)
             dttm = ti.execution_date
             if task.sla:
                 dttm = dag.following_schedule(dttm)
-                while dttm < datetime.utcnow():
+                while dttm < timezone.utcnow():
                     following_schedule = dag.following_schedule(dttm)
-                    if following_schedule + task.sla < datetime.utcnow():
+                    if following_schedule + task.sla < timezone.utcnow():
                         session.merge(models.SlaMiss(
                             task_id=ti.task_id,
                             dag_id=ti.dag_id,
@@ -772,9 +773,9 @@ class SchedulerJob(BaseJob):
             for dr in active_runs:
                 if (
                         dr.start_date and dag.dagrun_timeout and
-                        dr.start_date < datetime.utcnow() - dag.dagrun_timeout):
+                        dr.start_date < timezone.utcnow() - dag.dagrun_timeout):
                     dr.state = State.FAILED
-                    dr.end_date = datetime.utcnow()
+                    dr.end_date = timezone.utcnow()
                     timedout_runs += 1
             session.commit()
             if len(active_runs) - timedout_runs >= dag.max_active_runs:
@@ -799,9 +800,9 @@ class SchedulerJob(BaseJob):
             # don't do scheduler catchup for dag's that don't have dag.catchup = True
             if not dag.catchup:
                 # The logic is that we move start_date up until
-                # one period before, so that datetime.utcnow() is AFTER
+                # one period before, so that timezone.utcnow() is AFTER
                 # the period end, and the job can be created...
-                now = datetime.utcnow()
+                now = timezone.utcnow()
                 next_start = dag.following_schedule(now)
                 last_start = dag.previous_schedule(now)
                 if next_start <= now:
@@ -847,7 +848,7 @@ class SchedulerJob(BaseJob):
                 )
 
             # don't ever schedule in the future
-            if next_run_date > datetime.utcnow():
+            if next_run_date > timezone.utcnow():
                 return
 
             # this structure is necessary to avoid a TypeError from concatenating
@@ -870,11 +871,11 @@ class SchedulerJob(BaseJob):
             if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
                 return
 
-            if next_run_date and period_end and period_end <= datetime.utcnow():
+            if next_run_date and period_end and period_end <= timezone.utcnow():
                 next_run = dag.create_dagrun(
                     run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
                     execution_date=next_run_date,
-                    start_date=datetime.utcnow(),
+                    start_date=timezone.utcnow(),
                     state=State.RUNNING,
                     external_trigger=False
                 )
@@ -894,7 +895,7 @@ class SchedulerJob(BaseJob):
         for run in dag_runs:
             self.log.info("Examining DAG run %s", run)
             # don't consider runs that are executed in the future
-            if run.execution_date > datetime.utcnow():
+            if run.execution_date > timezone.utcnow():
                 self.log.error(
                     "Execution date is in future: %s",
                     run.execution_date
@@ -1231,7 +1232,7 @@ class SchedulerJob(BaseJob):
         # set TIs to queued state
         for task_instance in tis_to_set_to_queued:
             task_instance.state = State.QUEUED
-            task_instance.queued_dttm = (datetime.utcnow()
+            task_instance.queued_dttm = (timezone.utcnow()
                                          if not task_instance.queued_dttm
                                          else task_instance.queued_dttm)
             session.merge(task_instance)
@@ -1468,7 +1469,7 @@ class SchedulerJob(BaseJob):
             last_runtime = processor_manager.get_last_runtime(file_path)
             processor_pid = processor_manager.get_pid(file_path)
             processor_start_time = processor_manager.get_start_time(file_path)
-            runtime = ((datetime.utcnow() - processor_start_time).total_seconds()
+            runtime = ((timezone.utcnow() - processor_start_time).total_seconds()
                        if processor_start_time else None)
             last_run = processor_manager.get_last_finish_time(file_path)
 
@@ -1585,34 +1586,34 @@ class SchedulerJob(BaseJob):
         self.log.info("Resetting orphaned tasks for active dag runs")
         self.reset_state_for_orphaned_tasks()
 
-        execute_start_time = datetime.utcnow()
+        execute_start_time = timezone.utcnow()
 
         # Last time stats were printed
-        last_stat_print_time = datetime(2000, 1, 1)
+        last_stat_print_time = datetime.datetime(2000, 1, 1, tzinfo=timezone.utc)
         # Last time that self.heartbeat() was called.
-        last_self_heartbeat_time = datetime.utcnow()
+        last_self_heartbeat_time = timezone.utcnow()
         # Last time that the DAG dir was traversed to look for files
-        last_dag_dir_refresh_time = datetime.utcnow()
+        last_dag_dir_refresh_time = timezone.utcnow()
 
         # Use this value initially
         known_file_paths = processor_manager.file_paths
 
         # For the execute duration, parse and schedule DAGs
-        while (datetime.utcnow() - execute_start_time).total_seconds() < \
+        while (timezone.utcnow() - execute_start_time).total_seconds() < \
                 self.run_duration or self.run_duration < 0:
             self.log.debug("Starting Loop...")
             loop_start_time = time.time()
 
             # Traverse the DAG directory for Python files containing DAGs
             # periodically
-            elapsed_time_since_refresh = (datetime.utcnow() -
+            elapsed_time_since_refresh = (timezone.utcnow() -
                                           last_dag_dir_refresh_time).total_seconds()
 
             if elapsed_time_since_refresh > self.dag_dir_list_interval:
                 # Build up a list of Python files that could contain DAGs
                 self.log.info("Searching for files in %s", self.subdir)
                 known_file_paths = list_py_file_paths(self.subdir)
-                last_dag_dir_refresh_time = datetime.utcnow()
+                last_dag_dir_refresh_time = timezone.utcnow()
                 self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
                 processor_manager.set_file_paths(known_file_paths)
 
@@ -1662,20 +1663,20 @@ class SchedulerJob(BaseJob):
             self._process_executor_events(simple_dag_bag)
 
             # Heartbeat the scheduler periodically
-            time_since_last_heartbeat = (datetime.utcnow() -
+            time_since_last_heartbeat = (timezone.utcnow() -
                                          last_self_heartbeat_time).total_seconds()
             if time_since_last_heartbeat > self.heartrate:
                 self.log.info("Heartbeating the scheduler")
                 self.heartbeat()
-                last_self_heartbeat_time = datetime.utcnow()
+                last_self_heartbeat_time = timezone.utcnow()
 
             # Occasionally print out stats about how fast the files are getting processed
-            if ((datetime.utcnow() - last_stat_print_time).total_seconds() >
+            if ((timezone.utcnow() - last_stat_print_time).total_seconds() >
                     self.print_stats_interval):
                 if len(known_file_paths) > 0:
                     self._log_file_processing_stats(known_file_paths,
                                                     processor_manager)
-                last_stat_print_time = datetime.utcnow()
+                last_stat_print_time = timezone.utcnow()
 
             loop_end_time = time.time()
             self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
@@ -2049,7 +2050,7 @@ class BackfillJob(BaseJob):
         run = run or self.dag.create_dagrun(
             run_id=run_id,
             execution_date=run_date,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING,
             external_trigger=False,
             session=session

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 37a49cf..fe62ac5 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -24,7 +24,8 @@ from builtins import str
 from builtins import object, bytes
 import copy
 from collections import namedtuple
-from datetime import datetime, timedelta
+from datetime import timedelta
+
 import dill
 import functools
 import getpass
@@ -69,6 +70,7 @@ from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
 
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
+from airflow.utils import timezone
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
 from airflow.utils.db import provide_session
 from airflow.utils.decorators import apply_defaults
@@ -154,7 +156,7 @@ def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
         ).all()
         for dr in drs:
             dr.state = State.RUNNING
-            dr.start_date = datetime.utcnow()
+            dr.start_date = timezone.utcnow()
 
 
 class DagBag(BaseDagBag, LoggingMixin):
@@ -341,7 +343,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         self.log.info("Finding 'running' jobs without a recent heartbeat")
         TI = TaskInstance
         secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
-        limit_dttm = datetime.utcnow() - timedelta(seconds=secs)
+        limit_dttm = timezone.utcnow() - timedelta(seconds=secs)
         self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
 
         tis = (
@@ -373,7 +375,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         """
         self.dags[dag.dag_id] = dag
         dag.resolve_template_files()
-        dag.last_loaded = datetime.utcnow()
+        dag.last_loaded = timezone.utcnow()
 
         for task in dag.tasks:
             settings.policy(task)
@@ -398,7 +400,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         ignoring files that match any of the regex patterns specified
         in the file.
         """
-        start_dttm = datetime.utcnow()
+        start_dttm = timezone.utcnow()
         dag_folder = dag_folder or self.dag_folder
 
         # Used to store stats around DagBag processing
@@ -426,11 +428,11 @@ class DagBag(BaseDagBag, LoggingMixin):
                             continue
                         if not any(
                                 [re.findall(p, filepath) for p in patterns]):
-                            ts = datetime.utcnow()
+                            ts = timezone.utcnow()
                             found_dags = self.process_file(
                                 filepath, only_if_updated=only_if_updated)
 
-                            td = datetime.utcnow() - ts
+                            td = timezone.utcnow() - ts
                             td = td.total_seconds() + (
                                 float(td.microseconds) / 1000000)
                             stats.append(FileLoadStat(
@@ -443,7 +445,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                     except Exception as e:
                         self.log.exception(e)
         Stats.gauge(
-            'collect_dags', (datetime.utcnow() - start_dttm).total_seconds(), 1)
+            'collect_dags', (timezone.utcnow() - start_dttm).total_seconds(), 1)
         Stats.gauge(
             'dagbag_size', len(self.dags), 1)
         Stats.gauge(
@@ -1065,8 +1067,8 @@ class TaskInstance(Base, LoggingMixin):
     @provide_session
     def set_state(self, state, session=None):
         self.state = state
-        self.start_date = datetime.utcnow()
-        self.end_date = datetime.utcnow()
+        self.start_date = timezone.utcnow()
+        self.end_date = timezone.utcnow()
         session.merge(self)
         session.commit()
 
@@ -1231,7 +1233,7 @@ class TaskInstance(Base, LoggingMixin):
         to be retried.
         """
         return (self.state == State.UP_FOR_RETRY and
-                self.next_retry_datetime() < datetime.utcnow())
+                self.next_retry_datetime() < timezone.utcnow())
 
     @provide_session
     def pool_full(self, session):
@@ -1339,7 +1341,7 @@ class TaskInstance(Base, LoggingMixin):
         msg = "Starting attempt {attempt} of {total}".format(
             attempt=self.try_number + 1,
             total=self.max_tries + 1)
-        self.start_date = datetime.utcnow()
+        self.start_date = timezone.utcnow()
 
         dep_context = DepContext(
             deps=RUN_DEPS - QUEUE_DEPS,
@@ -1363,7 +1365,7 @@ class TaskInstance(Base, LoggingMixin):
                 total=self.max_tries + 1)
             self.log.warning(hr + msg + hr)
 
-            self.queued_dttm = datetime.utcnow()
+            self.queued_dttm = timezone.utcnow()
             self.log.info("Queuing into pool %s", self.pool)
             session.merge(self)
             session.commit()
@@ -1508,7 +1510,7 @@ class TaskInstance(Base, LoggingMixin):
             raise
 
         # Recording SUCCESS
-        self.end_date = datetime.utcnow()
+        self.end_date = timezone.utcnow()
         self.set_duration()
         if not test_mode:
             session.add(Log(self.state, self))
@@ -1569,7 +1571,7 @@ class TaskInstance(Base, LoggingMixin):
     def handle_failure(self, error, test_mode=False, context=None, session=None):
         self.log.exception(error)
         task = self.task
-        self.end_date = datetime.utcnow()
+        self.end_date = timezone.utcnow()
         self.set_duration()
         Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 1)
         Stats.incr('ti_failures')
@@ -1891,7 +1893,7 @@ class Log(Base):
     extra = Column(Text)
 
     def __init__(self, event, task_instance, owner=None, extra=None, **kwargs):
-        self.dttm = datetime.utcnow()
+        self.dttm = timezone.utcnow()
         self.event = event
         self.extra = extra
 
@@ -1929,7 +1931,7 @@ class SkipMixin(LoggingMixin):
             return
 
         task_ids = [d.task_id for d in tasks]
-        now = datetime.utcnow()
+        now = timezone.utcnow()
 
         if dag_run:
             session.query(TaskInstance).filter(
@@ -2544,7 +2546,7 @@ class BaseOperator(LoggingMixin):
         range.
         """
         TI = TaskInstance
-        end_date = end_date or datetime.utcnow()
+        end_date = end_date or timezone.utcnow()
         return session.query(TI).filter(
             TI.dag_id == self.dag_id,
             TI.task_id == self.task_id,
@@ -2591,7 +2593,7 @@ class BaseOperator(LoggingMixin):
         Run a set of task instances for a date range.
         """
         start_date = start_date or self.start_date
-        end_date = end_date or self.end_date or datetime.utcnow()
+        end_date = end_date or self.end_date or timezone.utcnow()
 
         for dt in self.dag.date_range(start_date, end_date=end_date):
             TaskInstance(self, dt).run(
@@ -2883,8 +2885,28 @@ class DAG(BaseDag, LoggingMixin):
         # set file location to caller source path
         self.fileloc = sys._getframe().f_back.f_code.co_filename
         self.task_dict = dict()
-        self.start_date = start_date
-        self.end_date = end_date
+
+        # set timezone
+        if start_date and start_date.tzinfo:
+            self.timezone = start_date.tzinfo
+        elif 'start_date' in self.default_args and self.default_args['start_date'].tzinfo:
+            self.timezone = self.default_args['start_date'].tzinfo
+        else:
+            self.timezone = settings.TIMEZONE
+
+        self.start_date = timezone.convert_to_utc(start_date)
+        self.end_date = timezone.convert_to_utc(end_date)
+
+        # also convert tasks
+        if 'start_date' in self.default_args:
+            self.default_args['start_date'] = (
+                timezone.convert_to_utc(self.default_args['start_date'])
+            )
+        if 'end_date' in self.default_args:
+            self.default_args['end_date'] = (
+                timezone.convert_to_utc(self.default_args['end_date'])
+            )
+
         self.schedule_interval = schedule_interval
         if schedule_interval in cron_presets:
             self._schedule_interval = cron_presets.get(schedule_interval)
@@ -2896,7 +2918,7 @@ class DAG(BaseDag, LoggingMixin):
             template_searchpath = [template_searchpath]
         self.template_searchpath = template_searchpath
         self.parent_dag = None  # Gets set when DAGs are loaded
-        self.last_loaded = datetime.utcnow()
+        self.last_loaded = timezone.utcnow()
         self.safe_dag_id = dag_id.replace('.', '__dot__')
         self.max_active_runs = max_active_runs
         self.dagrun_timeout = dagrun_timeout
@@ -2965,7 +2987,7 @@ class DAG(BaseDag, LoggingMixin):
 
     # /Context Manager ----------------------------------------------
 
-    def date_range(self, start_date, num=None, end_date=datetime.utcnow()):
+    def date_range(self, start_date, num=None, end_date=timezone.utcnow()):
         if num:
             end_date = None
         return utils_date_range(
@@ -2993,7 +3015,7 @@ class DAG(BaseDag, LoggingMixin):
 
         :param start_date: the start date of the interval
         :type start_date: datetime
-        :param end_date: the end date of the interval, defaults to datetime.utcnow()
+        :param end_date: the end date of the interval, defaults to timezone.utcnow()
         :type end_date: datetime
         :return: a list of dates within the interval following the dag's schedule
         :rtype: list
@@ -3005,7 +3027,7 @@ class DAG(BaseDag, LoggingMixin):
 
         # dates for dag runs
         using_start_date = using_start_date or min([t.start_date for t in self.tasks])
-        using_end_date = using_end_date or datetime.utcnow()
+        using_end_date = using_end_date or timezone.utcnow()
 
         # next run date for a subdag isn't relevant (schedule_interval for subdags
         # is ignored) so we use the dag run's start date in the case of a subdag
@@ -3274,9 +3296,9 @@ class DAG(BaseDag, LoggingMixin):
             self, session, start_date=None, end_date=None, state=None):
         TI = TaskInstance
         if not start_date:
-            start_date = (datetime.utcnow() - timedelta(30)).date()
+            start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = datetime.combine(start_date, datetime.min.time())
-        end_date = end_date or datetime.utcnow()
+        end_date = end_date or timezone.utcnow()
         tis = session.query(TI).filter(
             TI.dag_id == self.dag_id,
             TI.execution_date >= start_date,
@@ -3536,10 +3558,10 @@ class DAG(BaseDag, LoggingMixin):
         d = {}
         d['is_picklable'] = True
         try:
-            dttm = datetime.utcnow()
+            dttm = timezone.utcnow()
             pickled = pickle.dumps(self)
             d['pickle_len'] = len(pickled)
-            d['pickling_duration'] = "{}".format(datetime.utcnow() - dttm)
+            d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm)
         except Exception as e:
             self.log.debug(e)
             d['is_picklable'] = False
@@ -3557,7 +3579,7 @@ class DAG(BaseDag, LoggingMixin):
         if not dp or dp.pickle != self:
             dp = DagPickle(dag=self)
             session.add(dp)
-            self.last_pickled = datetime.utcnow()
+            self.last_pickled = timezone.utcnow()
             session.commit()
             self.pickle_id = dp.id
 
@@ -3773,7 +3795,7 @@ class DAG(BaseDag, LoggingMixin):
         if owner is None:
             owner = self.owner
         if sync_time is None:
-            sync_time = datetime.utcnow()
+            sync_time = timezone.utcnow()
 
         orm_dag = session.query(
             DagModel).filter(DagModel.dag_id == self.dag_id).first()
@@ -4566,7 +4588,7 @@ class DagRun(Base, LoggingMixin):
 
         # pre-calculate
         # db is faster
-        start_dttm = datetime.utcnow()
+        start_dttm = timezone.utcnow()
         unfinished_tasks = self.get_task_instances(
             state=State.unfinished(),
             session=session
@@ -4590,7 +4612,7 @@ class DagRun(Base, LoggingMixin):
                     no_dependencies_met = False
                     break
 
-        duration = (datetime.utcnow() - start_dttm).total_seconds() * 1000
+        duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
         Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)
 
         # future: remove the check on adhoc tasks (=active_tasks)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 923b8a4..2b5a814 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -12,9 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime
-
 from airflow.models import BaseOperator, DagBag
+from airflow.utils import timezone
 from airflow.utils.db import create_session
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.state import State
@@ -59,7 +58,7 @@ class TriggerDagRunOperator(BaseOperator):
         self.trigger_dag_id = trigger_dag_id
 
     def execute(self, context):
-        dro = DagRunOrder(run_id='trig__' + datetime.utcnow().isoformat())
+        dro = DagRunOrder(run_id='trig__' + timezone.utcnow().isoformat())
         dro = self.python_callable(context, dro)
         if dro:
             with create_session() as session:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index 7abd92d..7b4e0ca 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -12,9 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 
 from airflow.models import BaseOperator, SkipMixin
+from airflow.utils import timezone
 
 
 class LatestOnlyOperator(BaseOperator, SkipMixin):
@@ -35,7 +35,7 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
             self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
             return
 
-        now = datetime.datetime.utcnow()
+        now = timezone.utcnow()
         left_window = context['dag'].following_schedule(
             context['execution_date'])
         right_window = context['dag'].following_schedule(left_window)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index bd073b8..c8a8df6 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -21,7 +21,7 @@ standard_library.install_aliases()
 from builtins import str
 from past.builtins import basestring
 
-from datetime import datetime
+from airflow.utils import timezone
 from urllib.parse import urlparse
 from time import sleep
 import re
@@ -75,9 +75,9 @@ class BaseSensorOperator(BaseOperator):
         raise AirflowException('Override me.')
 
     def execute(self, context):
-        started_at = datetime.utcnow()
+        started_at = timezone.utcnow()
         while not self.poke(context):
-            if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
+            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                 if self.soft_fail:
                     raise AirflowSkipException('Snap. Time is OUT.')
                 else:
@@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator):
 
     def poke(self, context):
         self.log.info('Checking if the time (%s) has come', self.target_time)
-        return datetime.utcnow().time() > self.target_time
+        return timezone.utcnow().time() > self.target_time
 
 
 class TimeDeltaSensor(BaseSensorOperator):
@@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
         target_dttm = dag.following_schedule(context['execution_date'])
         target_dttm += self.delta
         self.log.info('Checking if the time (%s) has come', target_dttm)
-        return datetime.utcnow() > target_dttm
+        return timezone.utcnow() > target_dttm
 
 
 class HttpSensor(BaseSensorOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/ti_deps/deps/not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/not_in_retry_period_dep.py b/airflow/ti_deps/deps/not_in_retry_period_dep.py
index 7f9bff6..6628ff3 100644
--- a/airflow/ti_deps/deps/not_in_retry_period_dep.py
+++ b/airflow/ti_deps/deps/not_in_retry_period_dep.py
@@ -11,9 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from datetime import datetime
 
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+from airflow.utils import timezone
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 
@@ -38,7 +38,7 @@ class NotInRetryPeriodDep(BaseTIDep):
 
         # Calculate the date first so that it is always smaller than the timestamp used by
         # ready_for_retry
-        cur_date = datetime.utcnow()
+        cur_date = timezone.utcnow()
         next_task_retry_date = ti.next_retry_datetime()
         if ti.is_premature:
             yield self._failing_status(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/ti_deps/deps/runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow/ti_deps/deps/runnable_exec_date_dep.py
index 13e5345..69321d9 100644
--- a/airflow/ti_deps/deps/runnable_exec_date_dep.py
+++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py
@@ -11,9 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from datetime import datetime
 
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+from airflow.utils import timezone
 from airflow.utils.db import provide_session
 
 
@@ -23,7 +23,7 @@ class RunnableExecDateDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        cur_date = datetime.utcnow()
+        cur_date = timezone.utcnow()
 
         if ti.execution_date > cur_date:
             yield self._failing_status(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 68cee76..965e88b 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -23,10 +23,10 @@ import time
 import zipfile
 from abc import ABCMeta, abstractmethod
 from collections import defaultdict
-from datetime import datetime
 
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
+from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
@@ -376,7 +376,7 @@ class DagFileProcessorManager(LoggingMixin):
         being processed
         """
         if file_path in self._processors:
-            return (datetime.utcnow() - self._processors[file_path].start_time)\
+            return (timezone.utcnow() - self._processors[file_path].start_time)\
                 .total_seconds()
         return None
 
@@ -466,7 +466,7 @@ class DagFileProcessorManager(LoggingMixin):
         for file_path, processor in self._processors.items():
             if processor.done:
                 self.log.info("Processor for %s finished", file_path)
-                now = datetime.utcnow()
+                now = timezone.utcnow()
                 finished_processors[file_path] = processor
                 self._last_runtime[file_path] = (now -
                                                  processor.start_time).total_seconds()
@@ -494,7 +494,7 @@ class DagFileProcessorManager(LoggingMixin):
             # If the file path is already being processed, or if a file was
             # processed recently, wait until the next batch
             file_paths_in_progress = self._processors.keys()
-            now = datetime.utcnow()
+            now = timezone.utcnow()
             file_paths_recently_processed = []
             for file_path in self._file_paths:
                 last_finish_time = self.get_last_finish_time(file_path)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 81e1c2c..7d0d9d9 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -17,6 +17,7 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
+from airflow.utils import timezone
 from datetime import datetime, timedelta
 from dateutil.relativedelta import relativedelta  # for doctest
 import six
@@ -66,7 +67,7 @@ def date_range(
     if end_date and num:
         raise Exception("Wait. Either specify end_date OR num")
     if not end_date and not num:
-        end_date = datetime.utcnow()
+        end_date = timezone.utcnow()
 
     delta_iscron = False
     if isinstance(delta, six.string_types):
@@ -219,7 +220,7 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
     Get a datetime object representing `n` days ago. By default the time is
     set to midnight.
     """
-    today = datetime.utcnow().replace(
+    today = timezone.utcnow().replace(
         hour=hour,
         minute=minute,
         second=second,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/www/forms.py
----------------------------------------------------------------------
diff --git a/airflow/www/forms.py b/airflow/www/forms.py
index 2c6118c..f5af35a 100644
--- a/airflow/www/forms.py
+++ b/airflow/www/forms.py
@@ -17,7 +17,7 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
-from datetime import datetime
+from airflow.utils import timezone
 from flask_admin.form import DateTimePickerWidget
 from wtforms import DateTimeField, SelectField
 from flask_wtf import Form
@@ -33,7 +33,7 @@ class DateTimeWithNumRunsForm(Form):
     # Date time and number of runs form for tree view, task duration
     # and landing times
     base_date = DateTimeField(
-        "Anchor date", widget=DateTimePickerWidget(), default=datetime.utcnow())
+        "Anchor date", widget=DateTimePickerWidget(), default=timezone.utcnow())
     num_runs = SelectField("Number of runs", default=25, choices=(
         (5, "5"),
         (25, "25"),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 52b22fc..ae1fb5f 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -21,7 +21,7 @@ from cgi import escape
 from io import BytesIO as IO
 import functools
 import gzip
-import dateutil.parser as dateparser
+import iso8601
 import json
 import time
 
@@ -46,6 +46,7 @@ DEFAULT_SENSITIVE_VARIABLE_FIELDS = (
     'access_token',
 )
 
+
 def should_hide_value_for_key(key_name):
     return any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
            and configuration.getboolean('admin', 'hide_sensitive_variable_fields')
@@ -252,8 +253,8 @@ def action_logging(f):
             dag_id=request.args.get('dag_id'))
 
         if 'execution_date' in request.args:
-            log.execution_date = dateparser.parse(
-                request.args.get('execution_date'))
+            log.execution_date = iso8601.parse_date(
+                request.args.get('execution_date'), settings.TIMEZONE)
 
         with create_session() as session:
             session.add(log)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1191bde..a6378bf 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -21,7 +21,7 @@ import os
 import pkg_resources
 import socket
 from functools import wraps
-from datetime import datetime, timedelta
+from datetime import timedelta
 import dateutil.parser
 import copy
 import math
@@ -72,6 +72,7 @@ from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
 from airflow.models import BaseOperator
 from airflow.operators.subdag_operator import SubDagOperator
 
+from airflow.utils import timezone
 from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.utils.db import create_session, provide_session
@@ -170,7 +171,7 @@ def duration_f(v, c, m, p):
 def datetime_f(v, c, m, p):
     attr = getattr(m, p)
     dttm = attr.isoformat() if attr else ''
-    if datetime.utcnow().isoformat()[:4] == dttm[:4]:
+    if timezone.utcnow().isoformat()[:4] == dttm[:4]:
         dttm = dttm[5:]
     return Markup("<nobr>{}</nobr>".format(dttm))
 
@@ -922,7 +923,7 @@ class Airflow(BaseView):
             flash("Cannot find dag {}".format(dag_id))
             return redirect(origin)
 
-        execution_date = datetime.utcnow()
+        execution_date = timezone.utcnow()
         run_id = "manual__{0}".format(execution_date.isoformat())
 
         dr = DagRun.find(dag_id=dag_id, run_id=run_id)
@@ -1161,7 +1162,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1217,7 +1218,7 @@ class Airflow(BaseView):
             def set_duration(tid):
                 if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
                             tid["start_date"] is not None):
-                    d = datetime.utcnow() - dateutil.parser.parse(tid["start_date"])
+                    d = timezone.utcnow() - dateutil.parser.parse(tid["start_date"])
                     tid["duration"] = d.total_seconds()
                 return tid
 
@@ -1314,7 +1315,7 @@ class Airflow(BaseView):
         if dttm:
             dttm = dateutil.parser.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or datetime.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow().date()
 
         DR = models.DagRun
         drs = (
@@ -1390,7 +1391,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1497,7 +1498,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1560,7 +1561,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1651,7 +1652,7 @@ class Airflow(BaseView):
             DagModel).filter(DagModel.dag_id == dag_id).first()
 
         if orm_dag:
-            orm_dag.last_expired = datetime.utcnow()
+            orm_dag.last_expired = timezone.utcnow()
             session.merge(orm_dag)
         session.commit()
 
@@ -1687,7 +1688,7 @@ class Airflow(BaseView):
         if dttm:
             dttm = dateutil.parser.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or datetime.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow().date()
 
         form = DateTimeForm(data={'execution_date': dttm})
 
@@ -1698,7 +1699,7 @@ class Airflow(BaseView):
 
         tasks = []
         for ti in tis:
-            end_date = ti.end_date if ti.end_date else datetime.utcnow()
+            end_date = ti.end_date if ti.end_date else timezone.utcnow()
             tasks.append({
                 'startDate': wwwutils.epoch(ti.start_date),
                 'endDate': wwwutils.epoch(end_date),
@@ -2172,7 +2173,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView):
             model.iteration_no += 1
         if not model.user_id and current_user and hasattr(current_user, 'id'):
             model.user_id = current_user.id
-        model.last_modified = datetime.utcnow()
+        model.last_modified = timezone.utcnow()
 
 
 chart_mapping = (
@@ -2433,9 +2434,9 @@ class DagRunModelView(ModelViewOnly):
                 count += 1
                 dr.state = target_state
                 if target_state == State.RUNNING:
-                    dr.start_date = datetime.utcnow()
+                    dr.start_date = timezone.utcnow()
                 else:
-                    dr.end_date = datetime.utcnow()
+                    dr.end_date = timezone.utcnow()
             session.commit()
             models.DagStat.update(dirty_ids, session=session)
             flash(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 9408192..de2bd54 100644
--- a/setup.py
+++ b/setup.py
@@ -222,6 +222,7 @@ def do_setup():
             'future>=0.16.0, <0.17',
             'gitpython>=2.0.2',
             'gunicorn>=19.4.0, <20.0',
+            'iso8601>=0.1.12',
             'jinja2>=2.7.3, <2.9.0',
             'lxml>=3.6.0, <4.0',
             'markdown>=2.5.2, <3.0',



[08/11] incubator-airflow git commit: [AIRFLOW-1806] Use naive datetime when using cron

Posted by bo...@apache.org.
[AIRFLOW-1806] Use naive datetime when using cron


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8aadc311
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8aadc311
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8aadc311

Branch: refs/heads/master
Commit: 8aadc3112539f760bbd8b0454137e7a40091458c
Parents: 9624f5f
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Nov 15 21:44:16 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/utils/dates.py | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8aadc311/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index dab2b0d..cb9c840 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -72,20 +72,28 @@ def date_range(
     delta_iscron = False
     if isinstance(delta, six.string_types):
         delta_iscron = True
+        tz = start_date.tzinfo
+        timezone.make_naive(start_date, tz)
         cron = croniter(delta, start_date)
     elif isinstance(delta, timedelta):
         delta = abs(delta)
     l = []
     if end_date:
         while start_date <= end_date:
+            if delta_iscron:
+                start_date = timezone.make_aware(start_date, tz)
             l.append(start_date)
+
             if delta_iscron:
                 start_date = cron.get_next(datetime)
             else:
                 start_date += delta
     else:
         for _ in range(abs(num)):
+            if delta_iscron:
+                start_date = timezone.make_aware(start_date, tz)
             l.append(start_date)
+
             if delta_iscron:
                 if num > 0:
                     start_date = cron.get_next(datetime)
@@ -122,12 +130,14 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
 
     if isinstance(delta, six.string_types):
         # It's cron based, so it's easy
+        tz = start_date.tzinfo
+        start_date = timezone.make_naive(start_date, tz)
         cron = croniter(delta, start_date)
         prev = cron.get_prev(datetime)
         if prev == start_date:
-            return start_date
+            return timezone.make_aware(start_date, tz)
         else:
-            return prev
+            return timezone.make_aware(prev, tz)
 
     # Ignore the microseconds of dt
     dt -= timedelta(microseconds=dt.microsecond)


[06/11] incubator-airflow git commit: [AIRFLOW-1807] Force use of time zone aware db fields

Posted by bo...@apache.org.
[AIRFLOW-1807] Force use of time zone aware db fields

This change will check if all date times being stored are
indeed timezone aware.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2f168634
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2f168634
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2f168634

Branch: refs/heads/master
Commit: 2f168634aac4aa138f00634bbb9f4e3993346ffa
Parents: c857436
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Nov 11 13:32:02 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py      |  9 +++++----
 airflow/models.py    | 47 ++++++++++++++++++++++++-----------------------
 airflow/www/utils.py |  8 ++++++++
 airflow/www/views.py |  7 +++++++
 setup.py             |  1 +
 5 files changed, 45 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 4e1864e..868e785 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -33,9 +33,10 @@ import datetime
 from collections import defaultdict
 from past.builtins import basestring
 from sqlalchemy import (
-    Column, Integer, String, DateTime, func, Index, or_, and_, not_)
+    Column, Integer, String, func, Index, or_, and_, not_)
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
+from sqlalchemy_utc import UtcDateTime
 from tabulate import tabulate
 from time import sleep
 
@@ -77,9 +78,9 @@ class BaseJob(Base, LoggingMixin):
     dag_id = Column(String(ID_LEN),)
     state = Column(String(20))
     job_type = Column(String(30))
-    start_date = Column(DateTime())
-    end_date = Column(DateTime())
-    latest_heartbeat = Column(DateTime())
+    start_date = Column(UtcDateTime())
+    end_date = Column(UtcDateTime())
+    latest_heartbeat = Column(UtcDateTime())
     executor_class = Column(String(500))
     hostname = Column(String(500))
     unixname = Column(String(1000))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index fe62ac5..f8a5f0f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -55,6 +55,7 @@ from sqlalchemy import func, or_, and_
 from sqlalchemy.ext.declarative import declarative_base, declared_attr
 from sqlalchemy.dialects.mysql import LONGTEXT
 from sqlalchemy.orm import reconstructor, relationship, synonym
+from sqlalchemy_utc import UtcDateTime
 
 from croniter import croniter
 import six
@@ -732,7 +733,7 @@ class DagPickle(Base):
     """
     id = Column(Integer, primary_key=True)
     pickle = Column(PickleType(pickler=dill))
-    created_dttm = Column(DateTime, default=func.now())
+    created_dttm = Column(UtcDateTime, default=func.now())
     pickle_hash = Column(Text)
 
     __tablename__ = "dag_pickle"
@@ -763,9 +764,9 @@ class TaskInstance(Base, LoggingMixin):
 
     task_id = Column(String(ID_LEN), primary_key=True)
     dag_id = Column(String(ID_LEN), primary_key=True)
-    execution_date = Column(DateTime, primary_key=True)
-    start_date = Column(DateTime)
-    end_date = Column(DateTime)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    start_date = Column(UtcDateTime)
+    end_date = Column(UtcDateTime)
     duration = Column(Float)
     state = Column(String(20))
     try_number = Column(Integer, default=0)
@@ -777,7 +778,7 @@ class TaskInstance(Base, LoggingMixin):
     queue = Column(String(50))
     priority_weight = Column(Integer)
     operator = Column(String(1000))
-    queued_dttm = Column(DateTime)
+    queued_dttm = Column(UtcDateTime)
     pid = Column(Integer)
 
     __table_args__ = (
@@ -1862,9 +1863,9 @@ class TaskFail(Base):
 
     task_id = Column(String(ID_LEN), primary_key=True)
     dag_id = Column(String(ID_LEN), primary_key=True)
-    execution_date = Column(DateTime, primary_key=True)
-    start_date = Column(DateTime)
-    end_date = Column(DateTime)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    start_date = Column(UtcDateTime)
+    end_date = Column(UtcDateTime)
     duration = Column(Float)
 
     def __init__(self, task, execution_date, start_date, end_date):
@@ -1884,11 +1885,11 @@ class Log(Base):
     __tablename__ = "log"
 
     id = Column(Integer, primary_key=True)
-    dttm = Column(DateTime)
+    dttm = Column(UtcDateTime)
     dag_id = Column(String(ID_LEN))
     task_id = Column(String(ID_LEN))
     event = Column(String(30))
-    execution_date = Column(DateTime)
+    execution_date = Column(UtcDateTime)
     owner = Column(String(500))
     extra = Column(Text)
 
@@ -2741,12 +2742,12 @@ class DagModel(Base):
     # Whether that DAG was seen on the last DagBag load
     is_active = Column(Boolean, default=False)
     # Last time the scheduler started
-    last_scheduler_run = Column(DateTime)
+    last_scheduler_run = Column(UtcDateTime)
     # Last time this DAG was pickled
-    last_pickled = Column(DateTime)
+    last_pickled = Column(UtcDateTime)
     # Time when the DAG last received a refresh signal
     # (e.g. the DAG's "refresh" button was clicked in the web UI)
-    last_expired = Column(DateTime)
+    last_expired = Column(UtcDateTime)
     # Whether (one  of) the scheduler is scheduling this DAG at the moment
     scheduler_lock = Column(Boolean)
     # Foreign key to the latest pickle_id
@@ -3904,7 +3905,7 @@ class Chart(Base):
         "User", cascade=False, cascade_backrefs=False, backref='charts')
     x_is_date = Column(Boolean, default=True)
     iteration_no = Column(Integer, default=0)
-    last_modified = Column(DateTime, default=func.now())
+    last_modified = Column(UtcDateTime, default=func.now())
 
     def __repr__(self):
         return self.label
@@ -3925,8 +3926,8 @@ class KnownEvent(Base):
 
     id = Column(Integer, primary_key=True)
     label = Column(String(200))
-    start_date = Column(DateTime)
-    end_date = Column(DateTime)
+    start_date = Column(UtcDateTime)
+    end_date = Column(UtcDateTime)
     user_id = Column(Integer(), ForeignKey('users.id'),)
     known_event_type_id = Column(Integer(), ForeignKey('known_event_type.id'),)
     reported_by = relationship(
@@ -4054,7 +4055,7 @@ class XCom(Base, LoggingMixin):
     value = Column(LargeBinary)
     timestamp = Column(
         DateTime, default=func.now(), nullable=False)
-    execution_date = Column(DateTime, nullable=False)
+    execution_date = Column(UtcDateTime, nullable=False)
 
     # source information
     task_id = Column(String(ID_LEN), nullable=False)
@@ -4372,9 +4373,9 @@ class DagRun(Base, LoggingMixin):
 
     id = Column(Integer, primary_key=True)
     dag_id = Column(String(ID_LEN))
-    execution_date = Column(DateTime, default=func.now())
-    start_date = Column(DateTime, default=func.now())
-    end_date = Column(DateTime)
+    execution_date = Column(UtcDateTime, default=func.now())
+    start_date = Column(UtcDateTime, default=func.now())
+    end_date = Column(UtcDateTime)
     _state = Column('state', String(50), default=State.RUNNING)
     run_id = Column(String(ID_LEN))
     external_trigger = Column(Boolean, default=True)
@@ -4790,9 +4791,9 @@ class SlaMiss(Base):
 
     task_id = Column(String(ID_LEN), primary_key=True)
     dag_id = Column(String(ID_LEN), primary_key=True)
-    execution_date = Column(DateTime, primary_key=True)
+    execution_date = Column(UtcDateTime, primary_key=True)
     email_sent = Column(Boolean, default=False)
-    timestamp = Column(DateTime)
+    timestamp = Column(UtcDateTime)
     description = Column(Text)
     notification_sent = Column(Boolean, default=False)
 
@@ -4804,6 +4805,6 @@ class SlaMiss(Base):
 class ImportError(Base):
     __tablename__ = "import_error"
     id = Column(Integer, primary_key=True)
-    timestamp = Column(DateTime)
+    timestamp = Column(UtcDateTime)
     filename = Column(String(1024))
     stacktrace = Column(Text)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index ae1fb5f..aba85fa 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -26,6 +26,8 @@ import json
 import time
 
 from flask import after_this_request, request, Response
+from flask_admin.contrib.sqla.filters import FilterConverter
+from flask_admin.model import filters
 from flask_login import current_user
 import wtforms
 from wtforms.compat import text_type
@@ -386,3 +388,9 @@ class AceEditorWidget(wtforms.widgets.TextArea):
             form_name=field.id,
         )
         return wtforms.widgets.core.HTMLString(html)
+
+
+class UtcFilterConverter(FilterConverter):
+    @filters.convert('utcdatetime')
+    def conv_utcdatetime(self, column, name, **kwargs):
+        return self.conv_datetime(column, name, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index a6378bf..550a7f8 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2055,6 +2055,7 @@ class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly):
     column_searchable_list = ('dag_id', 'task_id',)
     column_filters = (
         'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date')
+    filter_converter = wwwutils.UtcFilterConverter()
     form_widget_args = {
         'email_sent': {'disabled': True},
         'timestamp': {'disabled': True},
@@ -2349,6 +2350,7 @@ class XComView(wwwutils.SuperUserMixin, AirflowModelView):
 
     column_filters = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
     column_searchable_list = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
+    filter_converter = wwwutils.UtcFilterConverter()
 
 
 class JobModelView(ModelViewOnly):
@@ -2365,6 +2367,7 @@ class JobModelView(ModelViewOnly):
         hostname=nobr_f,
         state=state_f,
         latest_heartbeat=datetime_f)
+    filter_converter = wwwutils.UtcFilterConverter()
 
 
 class DagRunModelView(ModelViewOnly):
@@ -2387,6 +2390,7 @@ class DagRunModelView(ModelViewOnly):
     column_list = (
         'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
     column_filters = column_list
+    filter_converter = wwwutils.UtcFilterConverter()
     column_searchable_list = ('dag_id', 'state', 'run_id')
     column_formatters = dict(
         execution_date=datetime_f,
@@ -2453,6 +2457,7 @@ class LogModelView(ModelViewOnly):
     column_display_actions = False
     column_default_sort = ('dttm', True)
     column_filters = ('dag_id', 'task_id', 'execution_date')
+    filter_converter = wwwutils.UtcFilterConverter()
     column_formatters = dict(
         dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
 
@@ -2463,6 +2468,7 @@ class TaskInstanceModelView(ModelViewOnly):
     column_filters = (
         'state', 'dag_id', 'task_id', 'execution_date', 'hostname',
         'queue', 'pool', 'operator', 'start_date', 'end_date')
+    filter_converter = wwwutils.UtcFilterConverter()
     named_filter_urls = True
     column_formatters = dict(
         log_url=log_url_formatter,
@@ -2752,6 +2758,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
     column_filters = (
         'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag',
         'last_scheduler_run', 'last_expired')
+    filter_converter = wwwutils.UtcFilterConverter()
     form_widget_args = {
         'last_scheduler_run': {'disabled': True},
         'fileloc': {'disabled': True},

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index de2bd54..cfe0d92 100644
--- a/setup.py
+++ b/setup.py
@@ -236,6 +236,7 @@ def do_setup():
             'requests>=2.5.1, <3',
             'setproctitle>=1.1.8, <2',
             'sqlalchemy>=0.9.8',
+            'sqlalchemy-utc>=0.9.0',
             'tabulate>=0.7.5, <0.8.0',
             'thrift>=0.9.2',
             'tzlocal>=1.4',


[10/11] incubator-airflow git commit: [AIRFLOW-1803] Time zone documentation

Posted by bo...@apache.org.
[AIRFLOW-1803] Time zone documentation


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f1ab56cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f1ab56cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f1ab56cc

Branch: refs/heads/master
Commit: f1ab56cc6ad3b9419af94aaa333661c105185883
Parents: 518a41a
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Nov 18 14:04:15 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 docs/index.rst    |   1 +
 docs/timezone.rst | 143 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 144 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1ab56cc/docs/index.rst
----------------------------------------------------------------------
diff --git a/docs/index.rst b/docs/index.rst
index 2a1f1c1..42349ea 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -83,6 +83,7 @@ Content
     scheduler
     plugins
     security
+    timezone
     api
     integration
     faq

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1ab56cc/docs/timezone.rst
----------------------------------------------------------------------
diff --git a/docs/timezone.rst b/docs/timezone.rst
new file mode 100644
index 0000000..ca30686
--- /dev/null
+++ b/docs/timezone.rst
@@ -0,0 +1,143 @@
+Time zones
+==========
+
+Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database.
+ It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not them to the end 
+ user’s time zone in the user interface. Also templates used in Operators are not translated. Time zone information 
+ is exposed and it is left up to the writer of DAG what do with it.
+
+This is handy if your users live in more than one time zone and you want to display datetime information according to 
+each user’s wall clock.
+
+Even if you are running Airflow in only one time zone it is still good practice to store data in UTC in your database 
+(also before Airflow became time zone aware this was also to recommended or even required setup). The main reason is 
+Daylight Saving Time (DST). Many countries have a system of DST, where clocks are moved forward in spring and backward 
+in autumn. If you’re working in local time, you’re likely to encounter errors twice a year, when the transitions 
+happen. (The pendulum and pytz documentation discusses these issues in greater detail.) This probably doesn’t matter 
+for a simple DAG, but it’s a problem if you are in, for example, financial services where you have end of day 
+deadlines to meet. 
+
+The time zone is set in `airflow.cfg`. By default it is set to utc, but you change it to use the system’s settings or 
+an arbitrary IANA time zone, e.g. `Europe/Amsterdam`. It is dependent on `pendulum`, which is more accurate than `pytz`. 
+Pendulum is installed when you install Airflow.
+
+Please note that the Web UI currently only runs in UTC.
+
+Concepts
+--------
+Naïve and aware datetime objects
+''''''''''''''''''''''''''''''''
+
+Python’s datetime.datetime objects have a tzinfo attribute that can be used to store time zone information, 
+represented as an instance of a subclass of datetime.tzinfo. When this attribute is set and describes an offset, 
+a datetime object is aware. Otherwise, it’s naive.
+
+You can use timezone.is_aware() and timezone.is_naive() to determine whether datetimes are aware or naive.
+
+Because Airflow uses time-zone-aware datetime objects. If your code creates datetime objects they need to be aware too.
+
+.. code:: python
+
+    from airflow.utils import timezone
+    
+    now = timezone.utcnow()
+    a_date = timezone.datetime(2017,1,1)
+
+
+Interpretation of naive datetime objects
+''''''''''''''''''''''''''''''''''''''''
+
+Although Airflow operates fully time zone aware, it still accepts naive date time objects for `start_dates`
+and `end_dates` in your DAG definitions. This is mostly in order to preserve backwards compatibility. In
+case a naive `start_date` or `end_date` is encountered the default time zone is applied. It is applied 
+in such a way that it is assumed that the naive date time is already in the default time zone. In other
+words if you have a default time zone setting of `Europe/Amsterdam` and create a naive datetime `start_date` of 
+`datetime(2017,1,1)` it is assumed to be a `start_date` of Jan 1, 2017 Amsterdam time.
+
+.. code:: python
+
+    default_args=dict(
+        start_date=datetime(2016, 1, 1),
+        owner='Airflow'
+    )
+
+    dag = DAG('my_dag', default_args=default_args)
+    op = DummyOperator(task_id='dummy', dag=dag)
+    print(op.owner) # Airflow
+
+Unfortunately, during DST transitions, some datetimes don’t exist or are ambiguous. 
+In such situations, pendulum raises an exception. That’s why you should always create aware 
+datetime objects when time zone support is enabled.
+
+In practice, this is rarely an issue. Airflow gives you aware datetime objects in the models and DAGs, and most often, 
+new datetime objects are created from existing ones through timedelta arithmetic. The only datetime that’s often 
+created in application code is the current time, and timezone.utcnow() automatically does the right thing.
+
+
+Default time zone 
+'''''''''''''''''
+
+The default time zone is the time zone defined by the `default_timezone` setting under `[core]`. If
+you just installed Airflow it will be set to `utc`, which is recommended. You can also set it to
+`system` or an IANA time zone (e.g.`Europe/Amsterdam`). DAGs are also evaluated on Airflow workers,
+it is therefore important to make sure this setting is equal on all Airflow nodes.
+
+
+.. code:: python
+
+    [core]
+    default_timezone = utc
+
+
+Time zone aware DAGs
+--------------------
+
+Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware `start_date`. It is 
+recommended to use `pendulum` for this, but `pytz` (to be installed manually) can also be used for this.
+
+.. code:: python
+
+    import pendulum
+    
+    local_tz = pendulum.timezone("Europe/Amsterdam")
+    
+    default_args=dict(
+        start_date=datetime(2016, 1, 1, tzinfo=local_tz),
+        owner='Airflow'
+    )
+
+    dag = DAG('my_tz_dag', default_args=default_args)
+    op = DummyOperator(task_id='dummy', dag=dag)
+    print(dag.timezone) # <Timezone [Europe/Amsterdam]>
+
+
+
+Templates
+'''''''''
+
+Airflow returns time zone aware datetimes in templates, but does not convert them to local time so they remain in UTC. 
+It is left up to the DAG to handle this.
+
+.. code:: python
+
+    import pendulum
+    
+    local_tz = pendulum.timezone("Europe/Amsterdam")
+    local_tz.convert(execution_date)
+
+
+Cron schedules
+''''''''''''''
+
+In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will 
+then ignore day light savings time. Thus, if you have a schedule that says 
+run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, 
+regardless if day light savings time is in place. 
+
+
+Time deltas
+'''''''''''
+For schedules with time deltas Airflow assumes you always will want to run with the specified interval. So if you
+specify a timedelta(hours=2) you will always want to run to hours later. In this case day light savings time will
+be taken into account.
+


[07/11] incubator-airflow git commit: [AIRFLOW-1809] Update tests to use timezone aware objects

Posted by bo...@apache.org.
[AIRFLOW-1809] Update tests to use timezone aware objects


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9624f5f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9624f5f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9624f5f2

Branch: refs/heads/master
Commit: 9624f5f24e00299c66adfd799d2be59fabd17f03
Parents: dcac3e9
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Nov 22 16:09:50 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/models.py                               |  7 +-
 airflow/utils/dates.py                          | 11 +--
 airflow/utils/timezone.py                       | 18 +++-
 airflow/www/utils.py                            |  5 +-
 airflow/www/views.py                            | 43 ++++-----
 tests/api/client/test_local_client.py           | 31 +------
 tests/contrib/operators/test_fs_operator.py     |  5 +-
 .../operators/test_jira_operator_test.py        |  5 +-
 tests/contrib/operators/test_sftp_operator.py   | 21 ++---
 .../operators/test_spark_submit_operator.py     |  9 +-
 tests/contrib/operators/test_ssh_operator.py    | 12 +--
 tests/contrib/sensors/test_jira_sensor_test.py  |  6 +-
 tests/contrib/sensors/test_redis_sensor.py      |  4 +-
 tests/core.py                                   | 32 +++----
 tests/dags/test_cli_triggered_dags.py           |  4 +-
 tests/executors/dask_executor.py                | 13 ++-
 tests/impersonation.py                          |  2 +-
 tests/jobs.py                                   | 33 +++----
 tests/models.py                                 | 91 +++++++++++---------
 tests/operators/latest_only_operator.py         | 51 +++++------
 tests/operators/operators.py                    |  7 +-
 tests/operators/python_operator.py              | 11 +--
 tests/operators/sensors.py                      |  8 +-
 tests/operators/subdag_operator.py              |  5 +-
 tests/operators/test_virtualenv_operator.py     | 11 +--
 .../deps/test_not_in_retry_period_dep.py        |  3 +-
 .../ti_deps/deps/test_runnable_exec_date_dep.py |  3 +-
 tests/utils/log/test_file_processor_handler.py  | 10 +--
 tests/utils/log/test_s3_task_handler.py         |  2 +-
 tests/utils/test_dates.py                       | 14 +--
 tests/utils/test_log_handlers.py                |  2 +-
 tests/www/api/experimental/test_endpoints.py    |  9 +-
 tests/www/test_views.py                         |  2 +-
 33 files changed, 251 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 33f3636..e93e8a8 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2892,7 +2892,11 @@ class DAG(BaseDag, LoggingMixin):
         # set timezone
         if start_date and start_date.tzinfo:
             self.timezone = start_date.tzinfo
-        elif 'start_date' in self.default_args and self.default_args['start_date'].tzinfo:
+        elif 'start_date' in self.default_args:
+            if isinstance(self.default_args['start_date'], six.string_types):
+                self.default_args['start_date'] = (
+                    timezone.parse(self.default_args['start_date'])
+                )
             self.timezone = self.default_args['start_date'].tzinfo
         else:
             self.timezone = settings.TIMEZONE
@@ -3066,7 +3070,6 @@ class DAG(BaseDag, LoggingMixin):
         # in case of @once
         if not following:
             return dttm
-
         if self.previous_schedule(following) != dttm:
             return following
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 7d0d9d9..dab2b0d 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -99,7 +99,7 @@ def date_range(
     return sorted(l)
 
 
-def round_time(dt, delta, start_date=datetime.min):
+def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
     """
     Returns the datetime of the form start_date + i * delta
     which is closest to dt for any non-negative integer i.
@@ -232,11 +232,4 @@ def parse_execution_date(execution_date_str):
     """
     Parse execution date string to datetime object.
     """
-    try:
-        # Execution date follows execution date format of scheduled executions,
-        # e.g. '2017-11-02 00:00:00'
-        return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S')
-    except ValueError:
-        # Execution date follows execution date format of manually triggered executions,
-        # e.g. '2017-11-05 16:18:30..989729'
-        return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S..%f')
+    return timezone.parse(execution_date_str)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/utils/timezone.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 5ae7802..e384a14 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -50,7 +50,13 @@ def utcnow():
     :return:
     """
 
-    return pendulum.utcnow()
+    # pendulum utcnow() is not used as that sets a TimezoneInfo object
+    # instead of a Timezone. This is not pickable and also creates issues
+    # when using replace()
+    d = dt.datetime.utcnow()
+    d = d.replace(tzinfo=utc)
+
+    return d
 
 
 def convert_to_utc(value):
@@ -94,7 +100,7 @@ def make_aware(value, timezone=None):
         return timezone.convert(value)
     else:
         # This may be wrong around DST changes!
-        return value.replace(tzinfo=timezone)
+        return value.astimezone(tz=timezone)
 
 
 def make_naive(value, timezone=None):
@@ -136,3 +142,11 @@ def datetime(*args, **kwargs):
         kwargs['tzinfo'] = TIMEZONE
 
     return dt.datetime(*args, **kwargs)
+
+
+def parse(string):
+    """
+    Parse a time string and return an aware datetime
+    :param string: time string
+    """
+    return pendulum.parse(string, tz=TIMEZONE)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index aba85fa..a0833ee 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -21,7 +21,6 @@ from cgi import escape
 from io import BytesIO as IO
 import functools
 import gzip
-import iso8601
 import json
 import time
 
@@ -34,6 +33,7 @@ from wtforms.compat import text_type
 
 from airflow import configuration, models, settings
 from airflow.utils.db import create_session
+from airflow.utils import timezone
 from airflow.utils.json import AirflowJsonEncoder
 
 AUTHENTICATE = configuration.getboolean('webserver', 'AUTHENTICATE')
@@ -255,8 +255,7 @@ def action_logging(f):
             dag_id=request.args.get('dag_id'))
 
         if 'execution_date' in request.args:
-            log.execution_date = iso8601.parse_date(
-                request.args.get('execution_date'), settings.TIMEZONE)
+            log.execution_date = timezone.parse(request.args.get('execution_date'))
 
         with create_session() as session:
             session.add(log)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 550a7f8..5ecee42 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -22,11 +22,11 @@ import pkg_resources
 import socket
 from functools import wraps
 from datetime import timedelta
-import dateutil.parser
 import copy
 import math
 import json
 import bleach
+import pendulum
 from collections import defaultdict
 
 import inspect
@@ -78,6 +78,7 @@ from airflow.utils.state import State
 from airflow.utils.db import create_session, provide_session
 from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date
+from airflow.utils.timezone import datetime
 from airflow.www import utils as wwwutils
 from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
 from airflow.www.validators import GreaterEqualThan
@@ -669,7 +670,7 @@ class Airflow(BaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         task = copy.copy(dag.get_task(task_id))
@@ -705,7 +706,7 @@ class Airflow(BaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         ti = session.query(models.TaskInstance).filter(
@@ -746,7 +747,7 @@ class Airflow(BaseView):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
 
@@ -823,7 +824,7 @@ class Airflow(BaseView):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         if not dag or task_id not in dag.task_ids:
@@ -863,7 +864,7 @@ class Airflow(BaseView):
         task = dag.get_task(task_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         ignore_all_deps = request.args.get('ignore_all_deps') == "true"
         ignore_task_deps = request.args.get('ignore_task_deps') == "true"
         ignore_ti_state = request.args.get('ignore_ti_state') == "true"
@@ -987,7 +988,7 @@ class Airflow(BaseView):
         dag = dagbag.get_dag(dag_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         confirmed = request.args.get('confirmed') == "true"
         upstream = request.args.get('upstream') == "true"
         downstream = request.args.get('downstream') == "true"
@@ -1018,7 +1019,7 @@ class Airflow(BaseView):
         confirmed = request.args.get('confirmed') == "true"
 
         dag = dagbag.get_dag(dag_id)
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         start_date = execution_date
         end_date = execution_date
 
@@ -1062,7 +1063,7 @@ class Airflow(BaseView):
             flash('Invalid execution date', 'error')
             return redirect(origin)
 
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         dag = dagbag.get_dag(dag_id)
 
         if not dag:
@@ -1099,7 +1100,7 @@ class Airflow(BaseView):
         task.dag = dag
 
         execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         confirmed = request.args.get('confirmed') == "true"
         upstream = request.args.get('upstream') == "true"
         downstream = request.args.get('downstream') == "true"
@@ -1160,7 +1161,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1218,7 +1219,7 @@ class Airflow(BaseView):
             def set_duration(tid):
                 if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
                             tid["start_date"] is not None):
-                    d = timezone.utcnow() - dateutil.parser.parse(tid["start_date"])
+                    d = timezone.utcnow() - pendulum.parse(tid["start_date"])
                     tid["duration"] = d.total_seconds()
                 return tid
 
@@ -1313,9 +1314,9 @@ class Airflow(BaseView):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = dateutil.parser.parse(dttm)
+            dttm = pendulum.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or timezone.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow()
 
         DR = models.DagRun
         drs = (
@@ -1389,7 +1390,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1496,7 +1497,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1559,7 +1560,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1686,9 +1687,9 @@ class Airflow(BaseView):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = dateutil.parser.parse(dttm)
+            dttm = pendulum.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or timezone.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow()
 
         form = DateTimeForm(data={'execution_date': dttm})
 
@@ -1741,7 +1742,7 @@ class Airflow(BaseView):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = dateutil.parser.parse(dttm)
+            dttm = pendulum.parse(dttm)
         else:
             return ("Error: Invalid execution_date")
 
@@ -2586,7 +2587,7 @@ class TaskInstanceModelView(ModelViewOnly):
         https://github.com/flask-admin/flask-admin/issues/1226
         """
         task_id, dag_id, execution_date = iterdecode(id)
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         return self.session.query(self.model).get((task_id, dag_id, execution_date))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/api/client/test_local_client.py
----------------------------------------------------------------------
diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py
index 7a759fe..31a1712 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -12,45 +12,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import json
 import unittest
 
+from freezegun import freeze_time
 from mock import patch
 
 from airflow import AirflowException
 from airflow.api.client.local_client import Client
 from airflow import models
 from airflow import settings
+from airflow.utils import timezone
 from airflow.utils.state import State
 
-EXECDATE = datetime.datetime.now()
+EXECDATE = timezone.utcnow()
 EXECDATE_NOFRACTIONS = EXECDATE.replace(microsecond=0)
 EXECDATE_ISO = EXECDATE_NOFRACTIONS.isoformat()
 
-real_datetime_class = datetime.datetime
-
-
-def mock_datetime_now(target, dt):
-    class DatetimeSubclassMeta(type):
-        @classmethod
-        def __instancecheck__(mcs, obj):
-            return isinstance(obj, real_datetime_class)
-
-    class BaseMockedDatetime(real_datetime_class):
-        @classmethod
-        def now(cls, tz=None):
-            return target.replace(tzinfo=tz)
-
-        @classmethod
-        def utcnow(cls):
-            return target
-
-    # Python2 & Python3 compatible metaclass
-    MockedDatetime = DatetimeSubclassMeta('datetime', (BaseMockedDatetime,), {})
-
-    return patch.object(dt, 'datetime', MockedDatetime)
-
 
 class TestLocalClient(unittest.TestCase):
 
@@ -81,8 +59,7 @@ class TestLocalClient(unittest.TestCase):
         with self.assertRaises(AirflowException):
             client.trigger_dag(dag_id="blablabla")
 
-        import airflow.api.common.experimental.trigger_dag
-        with mock_datetime_now(EXECDATE, airflow.api.common.experimental.trigger_dag.datetime):
+        with freeze_time(EXECDATE):
             # no execution date, execution date should be set automatically
             client.trigger_dag(dag_id="test_start_date_scheduling")
             mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_fs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_fs_operator.py b/tests/contrib/operators/test_fs_operator.py
index f990157..2ef4286 100644
--- a/tests/contrib/operators/test_fs_operator.py
+++ b/tests/contrib/operators/test_fs_operator.py
@@ -14,12 +14,12 @@
 #
 
 import unittest
-from datetime import datetime
 
 from airflow import configuration
 from airflow.settings import Session
 from airflow import models, DAG
 from airflow.contrib.operators.fs_operator import FileSensor
+from airflow.utils.timezone import datetime
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2015, 1, 1)
@@ -33,8 +33,10 @@ def reset(dag_id=TEST_DAG_ID):
     session.commit()
     session.close()
 
+
 reset()
 
+
 class FileSensorTest(unittest.TestCase):
     def setUp(self):
         configuration.load_test_config()
@@ -60,5 +62,6 @@ class FileSensorTest(unittest.TestCase):
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_jira_operator_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_jira_operator_test.py b/tests/contrib/operators/test_jira_operator_test.py
index 6d615df..566cca4 100644
--- a/tests/contrib/operators/test_jira_operator_test.py
+++ b/tests/contrib/operators/test_jira_operator_test.py
@@ -14,7 +14,7 @@
 #
 
 import unittest
-import datetime
+
 from mock import Mock
 from mock import patch
 
@@ -22,8 +22,9 @@ from airflow import DAG, configuration
 from airflow.contrib.operators.jira_operator import JiraOperator
 from airflow import models
 from airflow.utils import db
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 jira_client_mock = Mock(
         name="jira_client_for_test"
 )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_sftp_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sftp_operator.py b/tests/contrib/operators/test_sftp_operator.py
index 39e8d88..81e0c9e 100644
--- a/tests/contrib/operators/test_sftp_operator.py
+++ b/tests/contrib/operators/test_sftp_operator.py
@@ -15,7 +15,6 @@
 import os
 import unittest
 from base64 import b64encode
-from datetime import datetime
 
 from airflow import configuration
 from airflow import models
@@ -23,6 +22,8 @@ from airflow.contrib.operators.sftp_operator import SFTPOperator, SFTPOperation
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.models import DAG, TaskInstance
 from airflow.settings import Session
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2017, 1, 1)
@@ -80,7 +81,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(put_test_task)
-        ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=put_test_task, execution_date=timezone.utcnow())
         ti2.run()
 
         # check the remote file content
@@ -92,7 +93,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(check_file_task)
-        ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now())
+        ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow())
         ti3.run()
         self.assertEqual(
                 ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(),
@@ -117,7 +118,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(put_test_task)
-        ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=put_test_task, execution_date=timezone.utcnow())
         ti2.run()
 
         # check the remote file content
@@ -129,7 +130,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(check_file_task)
-        ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now())
+        ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow())
         ti3.run()
         self.assertEqual(
                 ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(),
@@ -152,7 +153,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(create_file_task)
-        ti1 = TaskInstance(task=create_file_task, execution_date=datetime.now())
+        ti1 = TaskInstance(task=create_file_task, execution_date=timezone.utcnow())
         ti1.run()
 
         # get remote file to local
@@ -165,7 +166,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(get_test_task)
-        ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=get_test_task, execution_date=timezone.utcnow())
         ti2.run()
 
         # test the received content
@@ -190,7 +191,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(create_file_task)
-        ti1 = TaskInstance(task=create_file_task, execution_date=datetime.now())
+        ti1 = TaskInstance(task=create_file_task, execution_date=timezone.utcnow())
         ti1.run()
 
         # get remote file to local
@@ -203,7 +204,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(get_test_task)
-        ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=get_test_task, execution_date=timezone.utcnow())
         ti2.run()
 
         # test the received content
@@ -227,7 +228,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(remove_file_task)
-        ti3 = TaskInstance(task=remove_file_task, execution_date=datetime.now())
+        ti3 = TaskInstance(task=remove_file_task, execution_date=timezone.utcnow())
         ti3.run()
 
     def tearDown(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py
index 0731da9..4e72eea 100644
--- a/tests/contrib/operators/test_spark_submit_operator.py
+++ b/tests/contrib/operators/test_spark_submit_operator.py
@@ -14,15 +14,17 @@
 #
 
 import unittest
-import datetime
 import sys
 
 from airflow import DAG, configuration
 from airflow.models import TaskInstance
 
 from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+from datetime import timedelta
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
 class TestSparkSubmitOperator(unittest.TestCase):
@@ -146,7 +148,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         # Then
         expected_application_args = [u'-f', 'foo',
                                      u'--bar', 'bar',
-                                     u'--start', (DEFAULT_DATE - datetime.timedelta(days=1)).strftime("%Y-%m-%d"),
+                                     u'--start', (DEFAULT_DATE - timedelta(days=1)).strftime("%Y-%m-%d"),
                                      u'--end', DEFAULT_DATE.strftime("%Y-%m-%d"),
                                      u'--with-spaces', u'args should keep embdedded spaces',
                                      ]
@@ -154,5 +156,6 @@ class TestSparkSubmitOperator(unittest.TestCase):
         self.assertListEqual(expected_application_args, getattr(operator, '_application_args'))
         self.assertEqual(expected_name, getattr(operator, '_name'))
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_ssh_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_operator.py b/tests/contrib/operators/test_ssh_operator.py
index 019dfe4..4cec913 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -14,13 +14,14 @@
 
 import unittest
 from base64 import b64encode
-from datetime import datetime
 
 from airflow import configuration
 from airflow import models
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.models import DAG, TaskInstance
 from airflow.settings import Session
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2017, 1, 1)
@@ -65,7 +66,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-                task=task, execution_date=datetime.now())
+                task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'),
@@ -84,7 +85,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-                task=task, execution_date=datetime.now())
+                task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow')
@@ -102,7 +103,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-            task=task, execution_date=datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow')
@@ -120,10 +121,11 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-            task=task, execution_date=datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'')
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/sensors/test_jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_jira_sensor_test.py b/tests/contrib/sensors/test_jira_sensor_test.py
index 77ca97f..7c16188 100644
--- a/tests/contrib/sensors/test_jira_sensor_test.py
+++ b/tests/contrib/sensors/test_jira_sensor_test.py
@@ -14,16 +14,16 @@
 #
 
 import unittest
-import datetime
+
 from mock import Mock
 from mock import patch
 
 from airflow import DAG, configuration
 from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
 from airflow import models
-from airflow.utils import db
+from airflow.utils import db, timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 jira_client_mock = Mock(
         name="jira_client_for_test"
 )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/sensors/test_redis_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_redis_sensor.py b/tests/contrib/sensors/test_redis_sensor.py
index 8022a92..d627501 100644
--- a/tests/contrib/sensors/test_redis_sensor.py
+++ b/tests/contrib/sensors/test_redis_sensor.py
@@ -14,15 +14,15 @@
 
 
 import unittest
-import datetime
 
 from mock import patch
 
 from airflow import DAG
 from airflow import configuration
 from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
 class TestRedisSensor(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0bd0c87..a57f0ed 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -25,9 +25,10 @@ import multiprocessing
 import mock
 from numpy.testing import assert_array_almost_equal
 import tempfile
-from datetime import datetime, time, timedelta
+from datetime import time, timedelta
 from email.mime.multipart import MIMEMultipart
 from email.mime.application import MIMEApplication
+from freezegun import freeze_time
 import signal
 from six.moves.urllib.parse import urlencode
 from time import sleep
@@ -39,7 +40,6 @@ import sqlalchemy
 from airflow import configuration
 from airflow.executors import SequentialExecutor
 from airflow.models import Variable
-from tests.test_utils.fake_datetime import FakeDatetime
 
 configuration.load_test_config()
 from airflow import jobs, models, DAG, utils, macros, settings, exceptions
@@ -56,6 +56,8 @@ from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.bin import cli
 from airflow.www import app as application
 from airflow.settings import Session
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 from airflow.utils.state import State
 from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
 from lxml import html
@@ -208,7 +210,7 @@ class CoreTest(unittest.TestCase):
             owner='Also fake',
             start_date=datetime(2015, 1, 2, 0, 0)))
 
-        start_date = datetime.utcnow()
+        start_date = timezone.utcnow()
 
         run = dag.create_dagrun(
             run_id='test_' + start_date.isoformat(),
@@ -254,7 +256,7 @@ class CoreTest(unittest.TestCase):
 
         self.assertIsNone(additional_dag_run)
 
-    @mock.patch('airflow.jobs.datetime', FakeDatetime)
+    @freeze_time('2016-01-01')
     def test_schedule_dag_no_end_date_up_to_today_only(self):
         """
         Tests that a Dag created without an end_date can only be scheduled up
@@ -264,9 +266,6 @@ class CoreTest(unittest.TestCase):
         start_date of 2015-01-01, only jobs up to, but not including
         2016-01-01 should be scheduled.
         """
-        from datetime import datetime
-        FakeDatetime.utcnow = classmethod(lambda cls: datetime(2016, 1, 1))
-
         session = settings.Session()
         delta = timedelta(days=1)
         start_date = DEFAULT_DATE
@@ -332,7 +331,8 @@ class CoreTest(unittest.TestCase):
         self.assertNotEqual(dag_subclass, self.dag)
 
         # a dag should equal an unpickled version of itself
-        self.assertEqual(pickle.loads(pickle.dumps(self.dag)), self.dag)
+        d = pickle.dumps(self.dag)
+        self.assertEqual(pickle.loads(d), self.dag)
 
         # dags are ordered based on dag_id no matter what the type is
         self.assertLess(self.dag, dag_diff_name)
@@ -1637,7 +1637,7 @@ class SecurityTests(unittest.TestCase):
 
     def tearDown(self):
         configuration.conf.set("webserver", "expose_config", "False")
-        self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=datetime.utcnow())
+        self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
 
 class WebUiTests(unittest.TestCase):
     def setUp(self):
@@ -1657,23 +1657,23 @@ class WebUiTests(unittest.TestCase):
         self.example_xcom = self.dagbag.dags['example_xcom']
 
         self.dagrun_bash2 = self.dag_bash2.create_dagrun(
-            run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())),
+            run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
             execution_date=DEFAULT_DATE,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING
         )
 
         self.sub_dag.create_dagrun(
-            run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())),
+            run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
             execution_date=DEFAULT_DATE,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING
         )
 
         self.example_xcom.create_dagrun(
-            run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())),
+            run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
             execution_date=DEFAULT_DATE,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING
         )
 
@@ -1849,7 +1849,7 @@ class WebUiTests(unittest.TestCase):
 
     def tearDown(self):
         configuration.conf.set("webserver", "expose_config", "False")
-        self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=datetime.utcnow())
+        self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
         session = Session()
         session.query(models.DagRun).delete()
         session.query(models.TaskInstance).delete()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/dags/test_cli_triggered_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_cli_triggered_dags.py b/tests/dags/test_cli_triggered_dags.py
index 5af8fc8..94afe0e 100644
--- a/tests/dags/test_cli_triggered_dags.py
+++ b/tests/dags/test_cli_triggered_dags.py
@@ -13,9 +13,11 @@
 # limitations under the License.
 
 
-from datetime import datetime, timedelta
+from datetime import timedelta
+
 from airflow.models import DAG
 from airflow.operators.python_operator import PythonOperator
+from airflow.utils.timezone import datetime
 
 DEFAULT_DATE = datetime(2016, 1, 1)
 default_args = dict(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
index f66a272..decd663 100644
--- a/tests/executors/dask_executor.py
+++ b/tests/executors/dask_executor.py
@@ -12,15 +12,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import logging
-import time
 import unittest
 
 from airflow import configuration
 from airflow.models import DAG, DagBag, TaskInstance, State
 from airflow.jobs import BackfillJob
-from airflow.operators.python_operator import PythonOperator
+from airflow.utils import timezone
+
+from datetime import timedelta
 
 try:
     from airflow.executors.dask_executor import DaskExecutor
@@ -34,7 +34,7 @@ if 'sqlite' in configuration.get('core', 'sql_alchemy_conn'):
     logging.error('sqlite does not support concurrent access')
     SKIP_DASK = True
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
 class DaskExecutorTest(unittest.TestCase):
@@ -63,9 +63,9 @@ class DaskExecutorTest(unittest.TestCase):
             k for k, v in executor.futures.items() if v == 'fail')
 
         # wait for the futures to execute, with a timeout
-        timeout = datetime.datetime.now() + datetime.timedelta(seconds=30)
+        timeout = timezone.utcnow() + timedelta(seconds=30)
         while not (success_future.done() and fail_future.done()):
-            if datetime.datetime.now() > timeout:
+            if timezone.utcnow() > timeout:
                 raise ValueError(
                     'The futures should have finished; there is probably '
                     'an error communciating with the Dask cluster.')
@@ -80,7 +80,6 @@ class DaskExecutorTest(unittest.TestCase):
 
         cluster.close()
 
-
     @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
     def test_backfill_integration(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/impersonation.py
----------------------------------------------------------------------
diff --git a/tests/impersonation.py b/tests/impersonation.py
index 0777def..5355c9a 100644
--- a/tests/impersonation.py
+++ b/tests/impersonation.py
@@ -19,7 +19,7 @@ import unittest
 
 from airflow import jobs, models
 from airflow.utils.state import State
-from datetime import datetime
+from airflow.utils.timezone import datetime
 
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 119e1b4..ca2db2c 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,6 +37,7 @@ from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils import timezone
 from airflow.utils.dates import days_ago
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
@@ -63,7 +64,7 @@ except ImportError:
         mock = None
 
 DEV_NULL = '/dev/null'
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 # Include the words "airflow" and "dag" in the file contents, tricking airflow into thinking these
 # files contain a DAG (otherwise Airflow will skip them)
@@ -659,7 +660,7 @@ class BackfillJobTest(unittest.TestCase):
         subdag = subdag_op_task.subdag
         subdag.schedule_interval = '@daily'
 
-        start_date = datetime.datetime.now()
+        start_date = timezone.utcnow()
         executor = TestExecutor(do_update=True)
         job = BackfillJob(dag=subdag,
                           start_date=start_date,
@@ -1838,7 +1839,7 @@ class SchedulerJobTest(unittest.TestCase):
         """
         dag = DAG(
             'test_scheduler_dagrun_once',
-            start_date=datetime.datetime(2015, 1, 1),
+            start_date=timezone.datetime(2015, 1, 1),
             schedule_interval="@once")
 
         scheduler = SchedulerJob()
@@ -1912,7 +1913,7 @@ class SchedulerJobTest(unittest.TestCase):
     def test_scheduler_do_not_schedule_too_early(self):
         dag = DAG(
             dag_id='test_scheduler_do_not_schedule_too_early',
-            start_date=datetime.datetime(2200, 1, 1))
+            start_date=timezone.datetime(2200, 1, 1))
         dag_task1 = DummyOperator(
             task_id='dummy',
             dag=dag,
@@ -2059,7 +2060,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
-        dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+        dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
         session.merge(dr)
         session.commit()
 
@@ -2102,7 +2103,7 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertIsNone(new_dr)
 
         # Should be scheduled as dagrun_timeout has passed
-        dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+        dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
         session.merge(dr)
         session.commit()
         new_dr = scheduler.create_dag_run(dag)
@@ -2213,7 +2214,7 @@ class SchedulerJobTest(unittest.TestCase):
         """
         dag = DAG(
             dag_id='test_scheduler_auto_align_1',
-            start_date=datetime.datetime(2016, 1, 1, 10, 10, 0),
+            start_date=timezone.datetime(2016, 1, 1, 10, 10, 0),
             schedule_interval="4 5 * * *"
         )
         dag_task1 = DummyOperator(
@@ -2231,11 +2232,11 @@ class SchedulerJobTest(unittest.TestCase):
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
-        self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 4))
+        self.assertEquals(dr.execution_date, timezone.datetime(2016, 1, 2, 5, 4))
 
         dag = DAG(
             dag_id='test_scheduler_auto_align_2',
-            start_date=datetime.datetime(2016, 1, 1, 10, 10, 0),
+            start_date=timezone.datetime(2016, 1, 1, 10, 10, 0),
             schedule_interval="10 10 * * *"
         )
         dag_task1 = DummyOperator(
@@ -2253,7 +2254,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
-        self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10))
+        self.assertEquals(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 10))
 
     def test_scheduler_reschedule(self):
         """
@@ -2458,12 +2459,12 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertTrue(dag.start_date > DEFAULT_DATE)
 
         expected_run_duration = 5
-        start_time = datetime.datetime.now()
+        start_time = timezone.utcnow()
         scheduler = SchedulerJob(dag_id,
                                  run_duration=expected_run_duration,
                                  **self.default_scheduler_args)
         scheduler.run()
-        end_time = datetime.datetime.now()
+        end_time = timezone.utcnow()
 
         run_duration = (end_time - start_time).total_seconds()
         logging.info("Test ran in %.2fs, expected %.2fs",
@@ -2503,7 +2504,7 @@ class SchedulerJobTest(unittest.TestCase):
         Test to check that a DAG returns it's active runs
         """
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
 
         START_DATE = six_hours_ago_to_the_hour
@@ -2557,7 +2558,7 @@ class SchedulerJobTest(unittest.TestCase):
         Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date
         """
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
         three_minutes_ago = now - datetime.timedelta(minutes=3)
         two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2)
@@ -2618,7 +2619,7 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertGreater(dr.execution_date, three_minutes_ago)
 
         # The DR should be scheduled BEFORE now
-        self.assertLess(dr.execution_date, datetime.datetime.now())
+        self.assertLess(dr.execution_date, timezone.utcnow())
 
         dag3 = DAG(DAG_NAME3,
                    schedule_interval='@hourly',
@@ -2652,7 +2653,7 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago)
 
         # The DR should be scheduled BEFORE now
-        self.assertLess(dr.execution_date, datetime.datetime.now())
+        self.assertLess(dr.execution_date, timezone.utcnow())
 
     def test_add_unparseable_file_before_sched_start_creates_import_error(self):
         try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index a1de17d..cabcf3a 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -20,6 +20,7 @@ from __future__ import unicode_literals
 import datetime
 import logging
 import os
+import pendulum
 import unittest
 import time
 
@@ -36,13 +37,14 @@ from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
+from airflow.utils import timezone
 from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 from mock import patch
 from parameterized import parameterized
 
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')
 
@@ -320,7 +322,7 @@ class DagStatTest(unittest.TestCase):
         with dag:
             op1 = DummyOperator(task_id='A')
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         dr = dag.create_dagrun(
             run_id='manual__' + now.isoformat(),
             execution_date=now,
@@ -345,7 +347,7 @@ class DagStatTest(unittest.TestCase):
 class DagRunTest(unittest.TestCase):
 
     def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None):
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         if execution_date is None:
             execution_date = now
         dag_run = dag.create_dagrun(
@@ -367,14 +369,14 @@ class DagRunTest(unittest.TestCase):
 
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(
-            datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
+            timezone.datetime(2015, 1, 2, 3, 4, 5, 6))
         self.assertEqual(
             'scheduled__2015-01-02T03:04:05', run_id,
             'Generated run_id did not match expectations: {0}'.format(run_id))
 
     def test_dagrun_find(self):
         session = settings.Session()
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
 
         dag_id1 = "test_dagrun_find_externally_triggered"
         dag_run = models.DagRun(
@@ -411,7 +413,7 @@ class DagRunTest(unittest.TestCase):
         """
         dag = DAG(
             dag_id='test_dagrun_success_when_all_skipped',
-            start_date=datetime.datetime(2017, 1, 1)
+            start_date=timezone.datetime(2017, 1, 1)
         )
         dag_task1 = ShortCircuitOperator(
             task_id='test_short_circuit_false',
@@ -459,7 +461,7 @@ class DagRunTest(unittest.TestCase):
 
         dag.clear()
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         dr = dag.create_dagrun(run_id='test_dagrun_success_conditions',
                                state=State.RUNNING,
                                execution_date=now,
@@ -498,7 +500,7 @@ class DagRunTest(unittest.TestCase):
             op2.set_upstream(op1)
 
         dag.clear()
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         dr = dag.create_dagrun(run_id='test_dagrun_deadlock',
                                state=State.RUNNING,
                                execution_date=now,
@@ -556,7 +558,7 @@ class DagRunTest(unittest.TestCase):
         """
         dag = DAG(
             dag_id='test_get_task_instance_on_empty_dagrun',
-            start_date=datetime.datetime(2017, 1, 1)
+            start_date=timezone.datetime(2017, 1, 1)
         )
         dag_task1 = ShortCircuitOperator(
             task_id='test_short_circuit_false',
@@ -565,7 +567,7 @@ class DagRunTest(unittest.TestCase):
 
         session = settings.Session()
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
 
         # Don't use create_dagrun since it will create the task instances too which we
         # don't want
@@ -589,14 +591,14 @@ class DagRunTest(unittest.TestCase):
             dag_id='test_latest_runs_1',
             start_date=DEFAULT_DATE)
         dag_1_run_1 = self.create_dag_run(dag,
-                execution_date=datetime.datetime(2015, 1, 1))
+                execution_date=timezone.datetime(2015, 1, 1))
         dag_1_run_2 = self.create_dag_run(dag,
-                execution_date=datetime.datetime(2015, 1, 2))
+                execution_date=timezone.datetime(2015, 1, 2))
         dagruns = models.DagRun.get_latest_runs(session)
         session.close()
         for dagrun in dagruns:
             if dagrun.dag_id == 'test_latest_runs_1':
-                self.assertEqual(dagrun.execution_date, datetime.datetime(2015, 1, 2))
+                self.assertEqual(dagrun.execution_date, timezone.datetime(2015, 1, 2))
 
     def test_is_backfill(self):
         dag = DAG(dag_id='test_is_backfill', start_date=DEFAULT_DATE)
@@ -835,7 +837,7 @@ class TaskInstanceTest(unittest.TestCase):
                   max_active_runs=1, concurrency=2)
         task = DummyOperator(task_id='test_requeue_over_concurrency_op', dag=dag)
 
-        ti = TI(task=task, execution_date=datetime.datetime.now())
+        ti = TI(task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertEqual(ti.state, models.State.NONE)
 
@@ -852,9 +854,9 @@ class TaskInstanceTest(unittest.TestCase):
         dag = models.DAG(dag_id='test_run_pooling_task')
         task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag,
                              pool='test_run_pooling_task_pool', owner='airflow',
-                             start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+                             start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertEqual(ti.state, models.State.SUCCESS)
 
@@ -873,9 +875,9 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             pool='test_run_pooling_task_with_mark_success_pool',
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run(mark_success=True)
         self.assertEqual(ti.state, models.State.SUCCESS)
 
@@ -894,9 +896,9 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             python_callable=raise_skip_exception,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertEqual(models.State.SKIPPED, ti.state)
 
@@ -912,7 +914,7 @@ class TaskInstanceTest(unittest.TestCase):
             retry_delay=datetime.timedelta(seconds=3),
             dag=dag,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
 
         def run_with_error(ti):
             try:
@@ -921,7 +923,7 @@ class TaskInstanceTest(unittest.TestCase):
                 pass
 
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
 
         # first run -- up for retry
         run_with_error(ti)
@@ -953,7 +955,7 @@ class TaskInstanceTest(unittest.TestCase):
             retry_delay=datetime.timedelta(seconds=0),
             dag=dag,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
 
         def run_with_error(ti):
             try:
@@ -962,7 +964,7 @@ class TaskInstanceTest(unittest.TestCase):
                 pass
 
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
 
         # first run -- up for retry
         run_with_error(ti)
@@ -1002,25 +1004,28 @@ class TaskInstanceTest(unittest.TestCase):
             max_retry_delay=max_delay,
             dag=dag,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
             task=task, execution_date=DEFAULT_DATE)
-        ti.end_date = datetime.datetime.now()
+        ti.end_date = pendulum.instance(timezone.utcnow())
 
         ti.try_number = 1
         dt = ti.next_retry_datetime()
         # between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
-        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0))
+        period = ti.end_date.add(seconds=30) - ti.end_date.add(seconds=15)
+        self.assertTrue(dt in period)
 
         ti.try_number = 4
         dt = ti.next_retry_datetime()
         # between 30 * 2^2 and 30 * 2^3 (120 and 240)
-        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0))
+        period = ti.end_date.add(seconds=240) - ti.end_date.add(seconds=120)
+        self.assertTrue(dt in period)
 
         ti.try_number = 6
         dt = ti.next_retry_datetime()
         # between 30 * 2^4 and 30 * 2^5 (480 and 960)
-        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0))
+        period = ti.end_date.add(seconds=960) - ti.end_date.add(seconds=480)
+        self.assertTrue(dt in period)
 
         ti.try_number = 9
         dt = ti.next_retry_datetime()
@@ -1099,7 +1104,7 @@ class TaskInstanceTest(unittest.TestCase):
                                      failed, upstream_failed, done,
                                      flag_upstream_failed,
                                      expect_state, expect_completed):
-        start_date = datetime.datetime(2016, 2, 1, 0, 0, 0)
+        start_date = timezone.datetime(2016, 2, 1, 0, 0, 0)
         dag = models.DAG('test-dag', start_date=start_date)
         downstream = DummyOperator(task_id='downstream',
                                    dag=dag, owner='airflow',
@@ -1137,8 +1142,8 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             pool='test_xcom',
             owner='airflow',
-            start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
-        exec_date = datetime.datetime.now()
+            start_date=timezone.datetime(2016, 6, 2, 0, 0, 0))
+        exec_date = timezone.utcnow()
         ti = TI(
             task=task, execution_date=exec_date)
         ti.run(mark_success=True)
@@ -1171,8 +1176,8 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             pool='test_xcom',
             owner='airflow',
-            start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
-        exec_date = datetime.datetime.now()
+            start_date=timezone.datetime(2016, 6, 2, 0, 0, 0))
+        exec_date = timezone.utcnow()
         ti = TI(
             task=task, execution_date=exec_date)
         ti.run(mark_success=True)
@@ -1213,8 +1218,8 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             python_callable=lambda: 'error',
             owner='airflow',
-            start_date=datetime.datetime(2017, 2, 1))
-        ti = TI(task=task, execution_date=datetime.datetime.now())
+            start_date=timezone.datetime(2017, 2, 1))
+        ti = TI(task=task, execution_date=timezone.utcnow())
 
         with self.assertRaises(TestError):
             ti.run()
@@ -1223,7 +1228,7 @@ class TaskInstanceTest(unittest.TestCase):
         dag = models.DAG(dag_id='test_check_and_change_state_before_execution')
         task = DummyOperator(task_id='task', dag=dag, start_date=DEFAULT_DATE)
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         self.assertTrue(ti._check_and_change_state_before_execution())
 
     def test_check_and_change_state_before_execution_dep_not_met(self):
@@ -1232,7 +1237,7 @@ class TaskInstanceTest(unittest.TestCase):
         task2= DummyOperator(task_id='task2', dag=dag, start_date=DEFAULT_DATE)
         task >> task2
         ti = TI(
-            task=task2, execution_date=datetime.datetime.now())
+            task=task2, execution_date=timezone.utcnow())
         self.assertFalse(ti._check_and_change_state_before_execution())
 
     def test_get_num_running_task_instances(self):
@@ -1257,7 +1262,7 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEquals(1, ti1.get_num_running_task_instances(session=session))
         self.assertEquals(1, ti2.get_num_running_task_instances(session=session))
         self.assertEquals(1, ti3.get_num_running_task_instances(session=session))
-        
+
 
 class ClearTasksTest(unittest.TestCase):
     def test_clear_task_instances(self):
@@ -1457,7 +1462,7 @@ class ClearTasksTest(unittest.TestCase):
 
     def test_xcom_disable_pickle_type(self):
         json_obj = {"key": "value"}
-        execution_date = datetime.datetime.now()
+        execution_date = timezone.utcnow()
         key = "xcom_test1"
         dag_id = "test_dag1"
         task_id = "test_task1"
@@ -1479,7 +1484,7 @@ class ClearTasksTest(unittest.TestCase):
 
     def test_xcom_enable_pickle_type(self):
         json_obj = {"key": "value"}
-        execution_date = datetime.datetime.now()
+        execution_date = timezone.utcnow()
         key = "xcom_test2"
         dag_id = "test_dag2"
         task_id = "test_task2"
@@ -1508,12 +1513,12 @@ class ClearTasksTest(unittest.TestCase):
                           value=PickleRce(),
                           dag_id="test_dag3",
                           task_id="test_task3",
-                          execution_date=datetime.datetime.now(),
+                          execution_date=timezone.utcnow(),
                           enable_pickling=False)
 
     def test_xcom_get_many(self):
         json_obj = {"key": "value"}
-        execution_date = datetime.datetime.now()
+        execution_date = timezone.utcnow()
         key = "xcom_test4"
         dag_id1 = "test_dag4"
         task_id1 = "test_task4"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/latest_only_operator.py b/tests/operators/latest_only_operator.py
index 225d24f..44fff23 100644
--- a/tests/operators/latest_only_operator.py
+++ b/tests/operators/latest_only_operator.py
@@ -23,13 +23,14 @@ from airflow.jobs import BackfillJob
 from airflow.models import TaskInstance
 from airflow.operators.latest_only_operator import LatestOnlyOperator
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
 from airflow.utils.state import State
 from freezegun import freeze_time
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
-END_DATE = datetime.datetime(2016, 1, 2)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
 INTERVAL = datetime.timedelta(hours=12)
-FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
 
 
 def get_task_instances(task_id):
@@ -85,27 +86,27 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         exec_date_to_latest_state = {
             ti.execution_date: ti.state for ti in latest_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'success',
-            datetime.datetime(2016, 1, 1, 12): 'success',
-            datetime.datetime(2016, 1, 2): 'success', },
+            timezone.datetime(2016, 1, 1): 'success',
+            timezone.datetime(2016, 1, 1, 12): 'success',
+            timezone.datetime(2016, 1, 2): 'success', },
             exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)
 
         downstream_instances = get_task_instances('downstream_2')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)
 
     def test_skipping_dagrun(self):
@@ -124,21 +125,21 @@ class LatestOnlyOperatorTest(unittest.TestCase):
 
         dr1 = self.dag.create_dagrun(
             run_id="manual__1",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
             state=State.RUNNING
         )
 
         dr2 = self.dag.create_dagrun(
             run_id="manual__2",
-            start_date=datetime.datetime.now(),
-            execution_date=datetime.datetime(2016, 1, 1, 12),
+            start_date=timezone.utcnow(),
+            execution_date=timezone.datetime(2016, 1, 1, 12),
             state=State.RUNNING
         )
 
         dr2 = self.dag.create_dagrun(
             run_id="manual__3",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=END_DATE,
             state=State.RUNNING
         )
@@ -151,25 +152,25 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         exec_date_to_latest_state = {
             ti.execution_date: ti.state for ti in latest_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'success',
-            datetime.datetime(2016, 1, 1, 12): 'success',
-            datetime.datetime(2016, 1, 2): 'success', },
+            timezone.datetime(2016, 1, 1): 'success',
+            timezone.datetime(2016, 1, 1, 12): 'success',
+            timezone.datetime(2016, 1, 2): 'success', },
             exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)
 
         downstream_instances = get_task_instances('downstream_2')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 0f5abd5..40f0ffd 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -14,16 +14,15 @@
 
 from __future__ import print_function
 
-import datetime
-
 from airflow import DAG, configuration, operators
 from airflow.utils.tests import skipUnlessImported
+from airflow.utils import timezone
 
 configuration.load_test_config()
 
 import unittest
 
-DEFAULT_DATE = datetime.datetime(2015, 1, 1)
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
 DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
 DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
 TEST_DAG_ID = 'unit_test_dag'
@@ -251,7 +250,7 @@ class TransferTests(unittest.TestCase):
     def test_clear(self):
         self.dag.clear(
             start_date=DEFAULT_DATE,
-            end_date=datetime.datetime.now())
+            end_date=timezone.utcnow())
 
     def test_mysql_to_hive(self):
         # import airflow.operators

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/python_operator.py b/tests/operators/python_operator.py
index 74120fe..6fa5e5a 100644
--- a/tests/operators/python_operator.py
+++ b/tests/operators/python_operator.py
@@ -23,15 +23,16 @@ from airflow.operators.python_operator import PythonOperator, BranchPythonOperat
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.settings import Session
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 from airflow.exceptions import AirflowException
 import logging
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
-END_DATE = datetime.datetime(2016, 1, 2)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
 INTERVAL = datetime.timedelta(hours=12)
-FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
 
 
 class PythonOperatorTest(unittest.TestCase):
@@ -127,7 +128,7 @@ class BranchOperatorTest(unittest.TestCase):
     def test_with_dag_run(self):
         dr = self.dag.create_dagrun(
             run_id="manual__",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
             state=State.RUNNING
         )
@@ -225,7 +226,7 @@ class ShortCircuitOperatorTest(unittest.TestCase):
         logging.error("Tasks {}".format(dag.tasks))
         dr = dag.create_dagrun(
             run_id="manual__",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
             state=State.RUNNING
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index ee67524..d09dabe 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -15,7 +15,7 @@ import logging
 import sys
 import time
 import unittest
-from datetime import datetime, timedelta
+from datetime import timedelta
 from mock import patch
 
 from airflow import DAG, configuration, settings
@@ -28,6 +28,8 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.sensors import HttpSensor, BaseSensorOperator, HdfsSensor, ExternalTaskSensor
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.state import State
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 
 try:
     from unittest import mock
@@ -64,12 +66,12 @@ class TimeoutTestSensor(BaseSensorOperator):
         return self.return_value
 
     def execute(self, context):
-        started_at = datetime.now()
+        started_at = timezone.utcnow()
         time_jump = self.params.get('time_jump')
         while not self.poke(context):
             if time_jump:
                 started_at -= time_jump
-            if (datetime.now() - started_at).total_seconds() > self.timeout:
+            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                 if self.soft_fail:
                     raise AirflowSkipException('Snap. Time is OUT.')
                 else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 9224f63..026eb3c 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import os
 import unittest
 
@@ -25,14 +24,16 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.jobs import BackfillJob
 from airflow.exceptions import AirflowException
+from airflow.utils.timezone import datetime
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+DEFAULT_DATE = datetime(2016, 1, 1)
 
 default_args = dict(
     owner='airflow',
     start_date=DEFAULT_DATE,
 )
 
+
 class SubDagOperatorTests(unittest.TestCase):
 
     def test_subdag_name(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/test_virtualenv_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_virtualenv_operator.py b/tests/operators/test_virtualenv_operator.py
index fdd2742..03623a6 100644
--- a/tests/operators/test_virtualenv_operator.py
+++ b/tests/operators/test_virtualenv_operator.py
@@ -15,6 +15,7 @@
 from __future__ import print_function, unicode_literals
 
 import datetime
+
 import funcsigs
 import sys
 import unittest
@@ -25,15 +26,15 @@ from airflow import configuration, DAG
 from airflow.models import TaskInstance
 from airflow.operators.python_operator import PythonVirtualenvOperator
 from airflow.settings import Session
-from airflow.utils.state import State
+from airflow.utils import timezone
 
 from airflow.exceptions import AirflowException
 import logging
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
-END_DATE = datetime.datetime(2016, 1, 2)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
 INTERVAL = datetime.timedelta(hours=12)
-FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
 
 
 class TestPythonVirtualenvOperator(unittest.TestCase):
@@ -185,7 +186,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
     def test_nonimported_as_arg(self):
         def f(a):
             return None
-        self._run_as_operator(f, op_args=[datetime.datetime.now()])
+        self._run_as_operator(f, op_args=[datetime.datetime.utcnow()])
 
     def test_context(self):
         def f(**kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/ti_deps/deps/test_not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
index 0f23aab..38502fb 100644
--- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py
+++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
@@ -13,13 +13,14 @@
 # limitations under the License.
 
 import unittest
-from datetime import datetime, timedelta
+from datetime import timedelta
 from freezegun import freeze_time
 from mock import Mock
 
 from airflow.models import TaskInstance
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.utils.state import State
+from airflow.utils.timezone import datetime
 
 
 class NotInRetryPeriodDepTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/ti_deps/deps/test_runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
index e1a396c..28b285f 100644
--- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py
+++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
@@ -13,13 +13,12 @@
 # limitations under the License.
 
 import unittest
-from datetime import datetime
 from freezegun import freeze_time
 from mock import Mock
 
 from airflow.models import TaskInstance
 from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
-
+from airflow.utils.timezone import datetime
 
 class RunnableExecDateDepTest(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/log/test_file_processor_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_file_processor_handler.py b/tests/utils/log/test_file_processor_handler.py
index defe623..8a3bbd2 100644
--- a/tests/utils/log/test_file_processor_handler.py
+++ b/tests/utils/log/test_file_processor_handler.py
@@ -17,7 +17,7 @@ import os
 import unittest
 
 from airflow.utils.log.file_processor_handler import FileProcessorHandler
-from datetime import datetime
+from airflow.utils import timezone
 from datetime import timedelta
 from freezegun import freeze_time
 
@@ -31,7 +31,7 @@ class TestFileProcessorHandler(unittest.TestCase):
         self.dag_dir = "/dags"
 
     def test_non_template(self):
-        date = datetime.utcnow().strftime("%Y-%m-%d")
+        date = timezone.utcnow().strftime("%Y-%m-%d")
         handler = FileProcessorHandler(base_log_folder=self.base_log_folder,
                                        filename_template=self.filename)
         handler.dag_dir = self.dag_dir
@@ -44,7 +44,7 @@ class TestFileProcessorHandler(unittest.TestCase):
         self.assertTrue(os.path.exists(os.path.join(path, "logfile")))
 
     def test_template(self):
-        date = datetime.utcnow().strftime("%Y-%m-%d")
+        date = timezone.utcnow().strftime("%Y-%m-%d")
         handler = FileProcessorHandler(base_log_folder=self.base_log_folder,
                                        filename_template=self.filename_template)
         handler.dag_dir = self.dag_dir
@@ -61,8 +61,8 @@ class TestFileProcessorHandler(unittest.TestCase):
                                        filename_template=self.filename)
         handler.dag_dir = self.dag_dir
 
-        date1 = (datetime.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d")
-        date2 = (datetime.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d")
+        date1 = (timezone.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d")
+        date2 = (timezone.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d")
 
         p1 = os.path.join(self.base_log_folder, date1, "log1")
         p2 = os.path.join(self.base_log_folder, date1, "log2")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py
index b1354cd..dc32b5a 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -12,12 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime
 import mock
 import unittest
 
 from airflow import configuration
 from airflow.utils.log.s3_task_handler import S3TaskHandler
+from airflow.utils.timezone import datetime
 from airflow.hooks.S3_hook import S3Hook
 from airflow.models import TaskInstance, DAG
 from airflow.operators.dummy_operator import DummyOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/test_dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py
index 50e76ba..199de4a 100644
--- a/tests/utils/test_dates.py
+++ b/tests/utils/test_dates.py
@@ -13,16 +13,18 @@
 # limitations under the License.
 
 from datetime import datetime, timedelta
+import pendulum
 import unittest
 
 from airflow.utils import dates
+from airflow.utils import timezone
 
 
 class Dates(unittest.TestCase):
 
     def test_days_ago(self):
-        today = datetime.today()
-        today_midnight = datetime.fromordinal(today.date().toordinal())
+        today = pendulum.today()
+        today_midnight = pendulum.instance(datetime.fromordinal(today.date().toordinal()))
 
         self.assertTrue(dates.days_ago(0) == today_midnight)
 
@@ -43,9 +45,9 @@ class Dates(unittest.TestCase):
 
     def test_parse_execution_date(self):
         execution_date_str_wo_ms = '2017-11-02 00:00:00'
-        execution_date_str_w_ms = '2017-11-05 16:18:30..989729'
-        bad_execution_date_str = '2017-11-06T00:00:00Z'
+        execution_date_str_w_ms = '2017-11-05 16:18:30.989729'
+        bad_execution_date_str = '2017-11-06TXX:00:00Z'
 
-        self.assertEqual(datetime(2017, 11, 2, 0, 0, 0), dates.parse_execution_date(execution_date_str_wo_ms))
-        self.assertEqual(datetime(2017, 11, 5, 16, 18, 30, 989729), dates.parse_execution_date(execution_date_str_w_ms))
+        self.assertEqual(timezone.datetime(2017, 11, 2, 0, 0, 0), dates.parse_execution_date(execution_date_str_wo_ms))
+        self.assertEqual(timezone.datetime(2017, 11, 5, 16, 18, 30, 989729), dates.parse_execution_date(execution_date_str_w_ms))
         self.assertRaises(ValueError, dates.parse_execution_date, bad_execution_date_str)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 25faa7c..fd5006c 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -19,10 +19,10 @@ import mock
 import os
 import unittest
 
-from datetime import datetime
 from airflow.models import TaskInstance, DAG
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.timezone import datetime
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
 DEFAULT_DATE = datetime(2016, 1, 1)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/www/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py
index 65a6f75..2c510a6 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime, timedelta
+from datetime import timedelta
 import json
 import unittest
 from urllib.parse import quote_plus
@@ -21,6 +21,7 @@ from airflow import configuration
 from airflow.api.common.experimental.trigger_dag import trigger_dag
 from airflow.models import DagBag, DagRun, Pool, TaskInstance
 from airflow.settings import Session
+from airflow.utils.timezone import datetime, utcnow
 from airflow.www import app as application
 
 
@@ -75,7 +76,7 @@ class TestApiExperimental(unittest.TestCase):
         url_template = '/api/experimental/dags/{}/dag_runs'
         response = self.app.post(
             url_template.format('example_bash_operator'),
-            data=json.dumps({'run_id': 'my_run' + datetime.now().isoformat()}),
+            data=json.dumps({'run_id': 'my_run' + utcnow().isoformat()}),
             content_type="application/json"
         )
 
@@ -91,7 +92,7 @@ class TestApiExperimental(unittest.TestCase):
     def test_trigger_dag_for_date(self):
         url_template = '/api/experimental/dags/{}/dag_runs'
         dag_id = 'example_bash_operator'
-        hour_from_now = datetime.now() + timedelta(hours=1)
+        hour_from_now = utcnow() + timedelta(hours=1)
         execution_date = datetime(hour_from_now.year,
                                   hour_from_now.month,
                                   hour_from_now.day,
@@ -133,7 +134,7 @@ class TestApiExperimental(unittest.TestCase):
         url_template = '/api/experimental/dags/{}/dag_runs/{}/tasks/{}'
         dag_id = 'example_bash_operator'
         task_id = 'also_run_this'
-        execution_date = datetime.now().replace(microsecond=0)
+        execution_date = utcnow().replace(microsecond=0)
         datetime_string = quote_plus(execution_date.isoformat())
         wrong_datetime_string = quote_plus(
             datetime(1990, 1, 1, 1, 1, 1).isoformat()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 4931487..a9bb28f 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -18,7 +18,6 @@ import os
 import shutil
 import tempfile
 import unittest
-from datetime import datetime
 import sys
 
 from airflow import models, configuration, settings
@@ -26,6 +25,7 @@ from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONF
 from airflow.models import DAG, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.settings import Session
+from airflow.utils.timezone import datetime
 from airflow.www import app as application
 from airflow import configuration as conf
 


[02/11] incubator-airflow git commit: [AIRFLOW-1802] Convert database fields to timezone aware

Posted by bo...@apache.org.
[AIRFLOW-1802] Convert database fields to timezone aware


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b658c78f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b658c78f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b658c78f

Branch: refs/heads/master
Commit: b658c78f6705415f444bd206cc02cd51219b3f8d
Parents: 59aba30
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Nov 10 22:36:31 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:53:03 2017 +0100

----------------------------------------------------------------------
 .../0e2a74e0fc9f_add_time_zone_awareness.py     | 213 +++++++++++++++++++
 airflow/settings.py                             |  15 ++
 2 files changed, 228 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
new file mode 100644
index 0000000..bb65c1c
--- /dev/null
+++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
@@ -0,0 +1,213 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add time zone awareness
+
+Revision ID: 0e2a74e0fc9f
+Revises: d2ae31099d61
+Create Date: 2017-11-10 22:22:31.326152
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '0e2a74e0fc9f'
+down_revision = 'd2ae31099d61'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+from sqlalchemy.dialects import mysql
+import sqlalchemy as sa
+
+
+def upgrade():
+    conn = op.get_bind()
+    if conn.dialect.name == 'mysql':
+        conn.execute("SET time_zone = '+00:00'")
+        op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='job', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='job', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
+    else:
+        # sqlite datetime is fine as is not converting
+        if conn.dialect.name == 'sqlite':
+            return
+
+        # we try to be database agnostic, but not every db (e.g. sqlserver)
+        # supports per session time zones
+        if conn.dialect.name == 'postgresql':
+            conn.execute("set timezone=UTC")
+
+        op.alter_column(table_name='chart', column_name='last_modified', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag', column_name='last_expired', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='job', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='job', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='known_event', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True))
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
+
+
+def downgrade():
+    conn = op.get_bind()
+    if conn.dialect.name == 'mysql':
+        conn.execute("SET time_zone = '+00:00'")
+        op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='import_error', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='xcom', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+    else:
+        if conn.dialect.name == 'sqlite':
+            return
+
+        # we try to be database agnostic, but not every db (e.g. sqlserver)
+        # supports per session time zones
+        if conn.dialect.name == 'postgresql':
+            conn.execute("set timezone=UTC")
+
+        op.alter_column(table_name='chart', column_name='last_modified', type_=sa.DateTime())
+
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.DateTime())
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.DateTime())
+        op.alter_column(table_name='dag', column_name='last_expired', type_=sa.DateTime())
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.DateTime())
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.DateTime())
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.DateTime())
+
+        op.alter_column(table_name='job', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='job', column_name='end_date', type_=sa.DateTime())
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.DateTime())
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='known_event', column_name='end_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='log', column_name='dttm', type_=sa.DateTime())
+        op.alter_column(table_name='log', column_name='execution_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.DateTime(), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.DateTime())
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.DateTime())
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.DateTime(), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.DateTime())
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.DateTime())
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.DateTime())
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.DateTime())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 0dfbb15..ceb9b50 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -138,6 +138,20 @@ def configure_orm(disable_connection_pool=False):
         sessionmaker(autocommit=False, autoflush=False, bind=engine))
 
 
+def configure_adapters():
+    from pendulum import Pendulum
+    try:
+        from sqlite3 import register_adapter
+        register_adapter(Pendulum, lambda val: val.isoformat(' '))
+    except ImportError:
+        pass
+    try:
+        import MySQLdb.converters
+        MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
+    except ImportError:
+        pass
+
+
 try:
     from airflow_local_settings import *
 
@@ -147,6 +161,7 @@ except:
 
 configure_logging()
 configure_vars()
+configure_adapters()
 configure_orm()
 
 # Const stuff


[09/11] incubator-airflow git commit: [AIRFLOW-1827] Fix api endpoint date parsing

Posted by bo...@apache.org.
[AIRFLOW-1827] Fix api endpoint date parsing


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f43c0e9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f43c0e9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f43c0e9b

Branch: refs/heads/master
Commit: f43c0e9ba59b9e89f2932f3a34254bf675a291ff
Parents: 8aadc31
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Nov 16 07:10:45 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/www/api/experimental/endpoints.py | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f43c0e9b/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index b5a3052..d1e2d19 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -19,13 +19,13 @@ from airflow.api.common.experimental.get_task import get_task
 from airflow.api.common.experimental.get_task_instance import get_task_instance
 from airflow.exceptions import AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils import timezone
 from airflow.www.app import csrf
 
 from flask import (
     g, Markup, Blueprint, redirect, jsonify, abort,
     request, current_app, send_file, url_for
 )
-from datetime import datetime
 
 _log = LoggingMixin().log
 
@@ -58,12 +58,11 @@ def trigger_dag(dag_id):
 
         # Convert string datetime into actual datetime
         try:
-            execution_date = datetime.strptime(execution_date,
-                                               '%Y-%m-%dT%H:%M:%S')
+            execution_date = timezone.parse(execution_date)
         except ValueError:
             error_message = (
                 'Given execution date, {}, could not be identified '
-                'as a date. Example date format: 2015-11-16T14:34:15'
+                'as a date. Example date format: 2015-11-16T14:34:15+00:00'
                 .format(execution_date))
             _log.info(error_message)
             response = jsonify({'error': error_message})
@@ -123,12 +122,11 @@ def task_instance_info(dag_id, execution_date, task_id):
 
     # Convert string datetime into actual datetime
     try:
-        execution_date = datetime.strptime(execution_date,
-                                           '%Y-%m-%dT%H:%M:%S')
+        execution_date = timezone.parse(execution_date)
     except ValueError:
         error_message = (
             'Given execution date, {}, could not be identified '
-            'as a date. Example date format: 2015-11-16T14:34:15'
+            'as a date. Example date format: 2015-11-16T14:34:15+00:00'
             .format(execution_date))
         _log.info(error_message)
         response = jsonify({'error': error_message})
@@ -162,9 +160,9 @@ def latest_dag_runs():
         if dagrun.execution_date:
             payload.append({
                 'dag_id': dagrun.dag_id,
-                'execution_date': dagrun.execution_date.strftime("%Y-%m-%d %H:%M"),
+                'execution_date': dagrun.execution_date.isoformat(),
                 'start_date': ((dagrun.start_date or '') and
-                               dagrun.start_date.strftime("%Y-%m-%d %H:%M")),
+                               dagrun.start_date.isoformat()),
                 'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id,
                                        execution_date=dagrun.execution_date)
             })


[04/11] incubator-airflow git commit: [AIRFLOW-1826] Update views to use timezone aware objects

Posted by bo...@apache.org.
[AIRFLOW-1826] Update views to use timezone aware objects


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/518a41ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/518a41ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/518a41ac

Branch: refs/heads/master
Commit: 518a41acf319af27d49bdc0c84bda64b6b8af0b3
Parents: f43c0e9
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Nov 16 18:54:56 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/utils/dates.py                         | 16 ++++++-----
 airflow/utils/timezone.py                      |  2 +-
 airflow/www/views.py                           | 32 +++++++++++++++++++--
 scripts/ci/requirements.txt                    |  1 +
 setup.py                                       |  2 +-
 tests/contrib/operators/test_druid_operator.py |  6 ++--
 tests/www/test_views.py                        |  2 +-
 7 files changed, 46 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518a41ac/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index cb9c840..2ca2b2c 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -73,16 +73,17 @@ def date_range(
     if isinstance(delta, six.string_types):
         delta_iscron = True
         tz = start_date.tzinfo
-        timezone.make_naive(start_date, tz)
+        start_date = timezone.make_naive(start_date, tz)
         cron = croniter(delta, start_date)
     elif isinstance(delta, timedelta):
         delta = abs(delta)
     l = []
     if end_date:
         while start_date <= end_date:
-            if delta_iscron:
-                start_date = timezone.make_aware(start_date, tz)
-            l.append(start_date)
+            if timezone.is_naive(start_date):
+                l.append(timezone.make_aware(start_date, tz))
+            else:
+                l.append(start_date)
 
             if delta_iscron:
                 start_date = cron.get_next(datetime)
@@ -90,9 +91,10 @@ def date_range(
                 start_date += delta
     else:
         for _ in range(abs(num)):
-            if delta_iscron:
-                start_date = timezone.make_aware(start_date, tz)
-            l.append(start_date)
+            if timezone.is_naive(start_date):
+                l.append(timezone.make_aware(start_date, tz))
+            else:
+                l.append(start_date)
 
             if delta_iscron:
                 if num > 0:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518a41ac/airflow/utils/timezone.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index e384a14..75c8454 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -100,7 +100,7 @@ def make_aware(value, timezone=None):
         return timezone.convert(value)
     else:
         # This may be wrong around DST changes!
-        return value.astimezone(tz=timezone)
+        return value.replace(tzinfo=timezone)
 
 
 def make_naive(value, timezone=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518a41ac/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5ecee42..6bcb66d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -16,6 +16,7 @@
 from past.builtins import basestring, unicode
 
 import ast
+import datetime as dt
 import logging
 import os
 import pkg_resources
@@ -54,7 +55,9 @@ import markdown
 import nvd3
 
 from wtforms import (
-    Form, SelectField, TextAreaField, PasswordField, StringField, validators)
+    Form, SelectField, TextAreaField, PasswordField,
+    StringField, validators)
+from flask_admin.form.fields import DateTimeField
 
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
@@ -160,6 +163,13 @@ def state_token(state):
         '{state}</span>'.format(**locals()))
 
 
+def parse_datetime_f(value):
+    if not isinstance(value, dt.datetime):
+        return value
+
+    return timezone.make_aware(value)
+
+
 def state_f(v, c, m, p):
     return state_token(m.state)
 
@@ -1161,7 +1171,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = pendulum.parse(base_date)
+            base_date = timezone.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -2217,12 +2227,18 @@ class KnownEventView(wwwutils.DataProfilingMixin, AirflowModelView):
             'validators': [
                 validators.DataRequired(),
             ],
+            'filters': [
+                parse_datetime_f,
+            ],
         },
         'end_date': {
             'validators': [
                 validators.DataRequired(),
                 GreaterEqualThan(fieldname='start_date'),
             ],
+            'filters': [
+                parse_datetime_f,
+            ]
         },
         'reported_by': {
             'validators': [
@@ -2240,11 +2256,14 @@ class KnownEventView(wwwutils.DataProfilingMixin, AirflowModelView):
     column_default_sort = ("start_date", True)
     column_sortable_list = (
         'label',
+        # todo: yes this has a spelling error
         ('event_type', 'event_type.know_event_type'),
         'start_date',
         'end_date',
         ('reported_by', 'reported_by.username'),
     )
+    filter_converter = wwwutils.UtcFilterConverter()
+    form_overrides = dict(start_date=DateTimeField, end_date=DateTimeField)
 
 
 class KnownEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView):
@@ -2349,9 +2368,18 @@ class XComView(wwwutils.SuperUserMixin, AirflowModelView):
         'value': StringField('Value'),
     }
 
+    form_args = {
+        'execution_date': {
+            'filters': [
+                parse_datetime_f,
+            ]
+        }
+    }
+
     column_filters = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
     column_searchable_list = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
     filter_converter = wwwutils.UtcFilterConverter()
+    form_overrides = dict(execution_date=DateTimeField)
 
 
 class JobModelView(ModelViewOnly):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518a41ac/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 1ab69a1..2b5a8c9 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -64,6 +64,7 @@ pandas
 pandas-gbq
 parameterized
 paramiko>=2.1.1
+pendulum>=1.3.2
 psutil>=4.2.0, <5.0.0
 psycopg2
 pygments

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518a41ac/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index cfe0d92..4e616d7 100644
--- a/setup.py
+++ b/setup.py
@@ -227,7 +227,7 @@ def do_setup():
             'lxml>=3.6.0, <4.0',
             'markdown>=2.5.2, <3.0',
             'pandas>=0.17.1, <1.0.0',
-            'pendulum==1.3.1',
+            'pendulum==1.3.2',
             'psutil>=4.2.0, <5.0.0',
             'pygments>=2.0.1, <3.0',
             'python-daemon>=2.1.1, <2.2',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518a41ac/tests/contrib/operators/test_druid_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_druid_operator.py b/tests/contrib/operators/test_druid_operator.py
index 9df6c48..c8f92f5 100644
--- a/tests/contrib/operators/test_druid_operator.py
+++ b/tests/contrib/operators/test_druid_operator.py
@@ -13,15 +13,15 @@
 # limitations under the License.
 #
 
-import datetime
 import mock
 import unittest
 
 from airflow import DAG, configuration
 from airflow.contrib.operators.druid_operator import DruidOperator
+from airflow.utils import timezone
 from airflow.models import TaskInstance
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
 class TestDruidOperator(unittest.TestCase):
@@ -29,7 +29,7 @@ class TestDruidOperator(unittest.TestCase):
         configuration.load_test_config()
         args = {
             'owner': 'airflow',
-            'start_date': datetime.datetime(2017, 1, 1)
+            'start_date': timezone.datetime(2017, 1, 1)
         }
         self.dag = DAG('test_dag_id', default_args=args)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518a41ac/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index a9bb28f..f5b015e 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -374,7 +374,7 @@ class TestLogView(unittest.TestCase):
             follow_redirects=True,
         )
         self.assertEqual(response.status_code, 200)
-        self.assertIn('<pre id="attempt-1">*** Reading local log.\nLog for testing.\n</pre>',
+        self.assertIn('Log file isn',
                       response.data.decode('utf-8'))