You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/18 00:13:54 UTC

[airflow] branch v2-2-test updated (56d82fc -> eb87aeb)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 56d82fc  added explaining concept of logical date in DAG run docs (#21433)
     new 969a275  Clarify pendulum use in timezone cases (#21646)
     new eb87aeb  Add changelog for 2.2.4rc1

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.txt                                      | 74 ++++++++++++++++++++++
 CONTRIBUTING.rst                                   |  2 +-
 UPDATING.md                                        |  6 +-
 airflow/example_dags/example_bash_operator.py      |  8 ++-
 .../example_branch_datetime_operator.py            | 14 ++--
 .../example_branch_day_of_week_operator.py         |  4 +-
 airflow/example_dags/example_branch_labels.py      |  7 +-
 airflow/example_dags/example_branch_operator.py    |  5 +-
 .../example_branch_python_dop_operator_3.py        |  4 +-
 airflow/example_dags/example_complex.py            |  4 +-
 airflow/example_dags/example_dag_decorator.py      |  9 ++-
 .../example_external_task_marker_dag.py            |  4 +-
 .../example_dags/example_kubernetes_executor.py    |  5 +-
 .../example_latest_only_with_trigger.py            |  8 ++-
 airflow/example_dags/example_nested_branch_dag.py  |  4 +-
 .../example_passing_params_via_test_command.py     |  8 ++-
 airflow/example_dags/example_python_operator.py    |  5 +-
 .../example_dags/example_short_circuit_operator.py |  4 +-
 airflow/example_dags/example_skip_dag.py           |  9 ++-
 airflow/example_dags/example_sla_dag.py            |  8 ++-
 airflow/example_dags/example_task_group.py         |  7 +-
 .../example_dags/example_task_group_decorator.py   |  7 +-
 .../example_time_delta_sensor_async.py             |  8 ++-
 .../example_dags/example_trigger_controller_dag.py |  4 +-
 airflow/example_dags/example_trigger_target_dag.py |  4 +-
 airflow/example_dags/example_xcom.py               |  4 +-
 airflow/example_dags/example_xcomargs.py           |  7 +-
 airflow/example_dags/subdags/subdag.py             |  4 +-
 airflow/example_dags/tutorial_etl_dag.py           |  5 +-
 airflow/example_dags/tutorial_taskflow_api_etl.py  | 10 ++-
 airflow/models/dag.py                              |  3 +
 airflow/serialization/serialized_objects.py        |  6 +-
 docs/apache-airflow/best-practices.rst             | 14 ++--
 docs/apache-airflow/concepts/dags.rst              | 26 +++++---
 docs/apache-airflow/concepts/operators.rst         |  2 +-
 docs/apache-airflow/dag-run.rst                    | 11 ++--
 docs/apache-airflow/executor/kubernetes.rst        |  5 +-
 docs/apache-airflow/faq.rst                        | 24 +++++--
 docs/apache-airflow/howto/timetable.rst            |  8 +--
 docs/apache-airflow/lineage.rst                    |  7 +-
 .../logging-monitoring/callbacks.rst               |  8 ++-
 docs/apache-airflow/timezone.rst                   | 19 ++++--
 docs/apache-airflow/tutorial.rst                   | 15 ++++-
 .../extending/embedding-dags/test_dag.py           | 10 +--
 44 files changed, 290 insertions(+), 120 deletions(-)

[airflow] 02/02: Add changelog for 2.2.4rc1

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit eb87aeb1604a6ed1a51895f04fca2c5c7e39c223
Author: Jed Cunningham <je...@apache.org>
AuthorDate: Thu Feb 17 17:11:00 2022 -0700

    Add changelog for 2.2.4rc1
---
 CHANGELOG.txt | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 74 insertions(+)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 4c9a2da..4e7e548 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,3 +1,77 @@
+Airflow 2.2.4, 2021-02-22
+-------------------------
+
+Bug Fixes
+"""""""""
+
+- Adding missing login provider related methods from Flask-Appbuilder (#21294)
+- Fix slow DAG deletion due to missing ``dag_id`` index for job table (#20282)
+- Add a session backend to store session data in the database (#21478)
+- Show task status only for running dags or only for the last finished dag (#21352)
+- Use compat data interval shim in log handlers (#21289)
+- Fix mismatch in generated run_id and logical date of DAG run (#18707)
+- Fix TriggerDagRunOperator extra link (#19410)
+- Add possibility to create user in the Remote User mode (#19963)
+- Avoid deadlock when rescheduling task (#21362)
+- Fix the incorrect scheduling time for the first run of dag (#21011)
+- Fix Scheduler crash when executing task instances of missing DAG (#20349)
+- Deferred tasks does not cancel when DAG is marked fail (#20649)
+- Removed duplicated dag_run join in ``Dag.get_task_instances()`` (#20591)
+- Avoid unintentional data loss when deleting DAGs (#20758)
+- Fix session usage in ``/rendered-k8s`` view (#21006)
+- Fix ``airflow dags backfill --reset-dagruns`` errors when run twice (#21062)
+- Do not set ``TaskInstance.max_tries`` in ``refresh_from_task`` (#21018)
+- Don't require dag_id in body in dagrun REST API endpoint (#21024)
+- Add Roles from Azure OAUTH Response in internal Security Manager (#20707)
+- Allow Viewing DagRuns and TIs if a user has DAG "read" perms (#20663)
+- Fix running ``airflow dags test <dag_id> <execution_dt>`` results in error when run twice (#21031)
+- Switch to non-vendored latest connexion library (#20910)
+- Bump flask-appbuilder to ``>=3.3.4`` (#20628)
+- upgrade celery to ``5.2.3`` (#19703)
+- Bump croniter from ``<1.1`` to ``<1.2`` (#20489)
+- Lift off upper bound for MarkupSafe (#20113)
+- Avoid calling ``DAG.following_schedule()`` for ``TaskInstance.get_template_context()`` (#20486)
+- Fix(standalone): Remove hardcoded Webserver port (#20429)
+- Remove unnecssary logging in experimental API (#20356)
+- Un-ignore DeprecationWarning (#20322)
+- Deepcopying Kubernetes Secrets attributes causing issues (#20318)
+- Fix(dag-dependencies): fix arrow styling (#20303)
+- Adds retry on taskinstance retrieval lock (#20030)
+- Correctly send timing metrics when using dogstatsd (fix schedule_delay metric) (#19973)
+- Enhance ``multiple_outputs`` inference of dict typing (#19608)
+- Fixing ses email backend (#18042)
+
+Doc only changes
+""""""""""""""""
+
+- Added explaining concept of logical date in DAG run docs (#21433)
+- Add note about Variable precedence with env vars (#21568)
+- Update error docs to include before_send option (#21275)
+- Augment xcom docs (#20755)
+- Add documentation and release policy on "latest" constraints (#21093)
+- Add a link to the DAG model in the Python API reference (#21060)
+- Added an enum param example (#20841)
+- Compare taskgroup and subdag (#20700)
+- Add note about reserved ``params`` keyword (#20640)
+- Improve documentation on ``Params`` (#20567)
+- Fix typo in MySQL Database creation code (Set up DB docs)  (#20102)
+- Add requirements.txt description (#20048)
+- Clean up ``default_args`` usage in docs (#19803)
+- Add docker-compose explanation to conn localhost (#19076)
+- Update CSV ingest code for tutorial (#18960)
+- Adds Pendulum 1.x -> 2.x upgrade documentation (#18955)
+- Updating explicit arg example in TaskFlow API tutorial doc (#18907)
+- Adds back documentation about context usage in Python/@task (#18868)
+- Clean up dynamic `start_date` values from docs (#19607)
+- Docs for multiple pool slots (#20257)
+- Update upgrading.rst with detailed code example of how to resolve post-upgrade warning (#19993)
+
+Misc
+""""
+
+- Deprecate some functions in the experimental API (#19931)
+- Deprecate smart sensors (#20151)
+
 Airflow 2.2.3, 2021-12-20
 -------------------------
 

[airflow] 01/02: Clarify pendulum use in timezone cases (#21646)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 969a275df02a81d1f3176ca010e565fb950e6d35
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Fri Feb 18 01:08:42 2022 +0100

    Clarify pendulum use in timezone cases (#21646)
    
    It is important to use Pendulum in case timezone is used - because
    there are a number of limitations coming from using stdlib
    timezone implementation.
    
    However our documentation was not very clear about it, especially
    some examples shown using standard datetime in DAGs which could
    mislead our users to continue using datetime if they use timezone.
    
    This PR clarifies and stresses the use of pendulum is necessary
    when timezone is used. Also it points to the documentation
    in case serialization throws error about not using Pendulum
    so that the users can learn about the reasoning.
    
    This is the first part of the change - the follow up will be
    changing all provider examples to also use timezone and
    pendulum explicitly.
    
    See also #20070
    
    (cherry picked from commit f011da235f705411239d992bc3c92f1c072f89a9)
---
 CONTRIBUTING.rst                                   |  2 +-
 UPDATING.md                                        |  6 ++---
 airflow/example_dags/example_bash_operator.py      |  8 ++++---
 .../example_branch_datetime_operator.py            | 14 ++++++------
 .../example_branch_day_of_week_operator.py         |  4 ++--
 airflow/example_dags/example_branch_labels.py      |  7 ++++--
 airflow/example_dags/example_branch_operator.py    |  5 +++--
 .../example_branch_python_dop_operator_3.py        |  4 ++--
 airflow/example_dags/example_complex.py            |  4 ++--
 airflow/example_dags/example_dag_decorator.py      |  9 ++++++--
 .../example_external_task_marker_dag.py            |  4 ++--
 .../example_dags/example_kubernetes_executor.py    |  5 +++--
 .../example_latest_only_with_trigger.py            |  8 ++++---
 airflow/example_dags/example_nested_branch_dag.py  |  4 ++--
 .../example_passing_params_via_test_command.py     |  8 ++++---
 airflow/example_dags/example_python_operator.py    |  5 +++--
 .../example_dags/example_short_circuit_operator.py |  4 ++--
 airflow/example_dags/example_skip_dag.py           |  9 ++++++--
 airflow/example_dags/example_sla_dag.py            |  8 ++++---
 airflow/example_dags/example_task_group.py         |  7 ++++--
 .../example_dags/example_task_group_decorator.py   |  7 ++++--
 .../example_time_delta_sensor_async.py             |  8 ++++---
 .../example_dags/example_trigger_controller_dag.py |  4 ++--
 airflow/example_dags/example_trigger_target_dag.py |  4 ++--
 airflow/example_dags/example_xcom.py               |  4 ++--
 airflow/example_dags/example_xcomargs.py           |  7 +++---
 airflow/example_dags/subdags/subdag.py             |  4 ++--
 airflow/example_dags/tutorial_etl_dag.py           |  5 +++--
 airflow/example_dags/tutorial_taskflow_api_etl.py  | 10 +++++++--
 airflow/models/dag.py                              |  3 +++
 airflow/serialization/serialized_objects.py        |  6 ++++-
 docs/apache-airflow/best-practices.rst             | 14 +++++++-----
 docs/apache-airflow/concepts/dags.rst              | 26 +++++++++++++++-------
 docs/apache-airflow/concepts/operators.rst         |  2 +-
 docs/apache-airflow/dag-run.rst                    | 11 ++++-----
 docs/apache-airflow/executor/kubernetes.rst        |  5 +++--
 docs/apache-airflow/faq.rst                        | 24 +++++++++++++++-----
 docs/apache-airflow/howto/timetable.rst            |  8 +++----
 docs/apache-airflow/lineage.rst                    |  7 +++---
 .../logging-monitoring/callbacks.rst               |  8 ++++---
 docs/apache-airflow/timezone.rst                   | 19 +++++++++++-----
 docs/apache-airflow/tutorial.rst                   | 15 ++++++++++---
 .../extending/embedding-dags/test_dag.py           | 10 +++++----
 43 files changed, 216 insertions(+), 120 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index d598742..838f658 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -1376,7 +1376,7 @@ We are using certain prefixes for email subjects for different purposes. Start y
 Voting is governed by the rules described in `Voting <https://www.apache.org/foundation/voting.html>`_
 
 We are all devoting our time for community as individuals who except for being active in Apache Airflow have
-families, daily jobs, right for vacation. Sometimes we are in different time zones or simply are
+families, daily jobs, right for vacation. Sometimes we are in different timezones or simply are
 busy with day-to-day duties that our response time might be delayed. For us it's crucial
 to remember to respect each other in the project with no formal structure.
 There are no managers, departments, most of us is autonomous in our opinions, decisions.
diff --git a/UPDATING.md b/UPDATING.md
index 2ed4aac..0273d5a 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -184,7 +184,7 @@ Similarly, `DAG.concurrency` has been renamed to `DAG.max_active_tasks`.
 ```python
 dag = DAG(
     dag_id="example_dag",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     concurrency=3,
 )
@@ -195,7 +195,7 @@ dag = DAG(
 ```python
 dag = DAG(
     dag_id="example_dag",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     max_active_tasks=3,
 )
@@ -3216,7 +3216,7 @@ Type "help", "copyright", "credits" or "license" for more information.
 >>> from airflow.models.dag import DAG
 >>> from airflow.operators.dummy import DummyOperator
 >>>
->>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
+>>> dag = DAG('simple_dag', start_date=pendulum.datetime(2017, 9, 1, tz="UTC"))
 >>>
 >>> task = DummyOperator(task_id='task_1', dag=dag)
 >>>
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index f679f8d..8204592 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -18,7 +18,9 @@
 
 """Example DAG demonstrating the usage of the BashOperator."""
 
-from datetime import datetime, timedelta
+import datetime
+
+import pendulum
 
 from airflow import DAG
 from airflow.operators.bash import BashOperator
@@ -27,9 +29,9 @@ from airflow.operators.dummy import DummyOperator
 with DAG(
     dag_id='example_bash_operator',
     schedule_interval='0 0 * * *',
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
-    dagrun_timeout=timedelta(minutes=60),
+    dagrun_timeout=datetime.timedelta(minutes=60),
     tags=['example', 'example2'],
     params={"example_key": "example_value"},
 ) as dag:
diff --git a/airflow/example_dags/example_branch_datetime_operator.py b/airflow/example_dags/example_branch_datetime_operator.py
index bdc50ca..76b109f 100644
--- a/airflow/example_dags/example_branch_datetime_operator.py
+++ b/airflow/example_dags/example_branch_datetime_operator.py
@@ -20,7 +20,7 @@
 Example DAG demonstrating the usage of DateTimeBranchOperator with datetime as well as time objects as
 targets.
 """
-import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.operators.datetime import BranchDateTimeOperator
@@ -28,7 +28,7 @@ from airflow.operators.dummy import DummyOperator
 
 dag = DAG(
     dag_id="example_branch_datetime_operator",
-    start_date=datetime.datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=["example"],
     schedule_interval="@daily",
@@ -42,8 +42,8 @@ cond1 = BranchDateTimeOperator(
     task_id='datetime_branch',
     follow_task_ids_if_true=['date_in_range'],
     follow_task_ids_if_false=['date_outside_range'],
-    target_upper=datetime.datetime(2020, 10, 10, 15, 0, 0),
-    target_lower=datetime.datetime(2020, 10, 10, 14, 0, 0),
+    target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
+    target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
     dag=dag,
 )
 
@@ -54,7 +54,7 @@ cond1 >> [dummy_task_1, dummy_task_2]
 
 dag = DAG(
     dag_id="example_branch_datetime_operator_2",
-    start_date=datetime.datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=["example"],
     schedule_interval="@daily",
@@ -67,8 +67,8 @@ cond2 = BranchDateTimeOperator(
     task_id='datetime_branch',
     follow_task_ids_if_true=['date_in_range'],
     follow_task_ids_if_false=['date_outside_range'],
-    target_upper=datetime.time(0, 0, 0),
-    target_lower=datetime.time(15, 0, 0),
+    target_upper=pendulum.time(0, 0, 0),
+    target_lower=pendulum.time(15, 0, 0),
     dag=dag,
 )
 
diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py b/airflow/example_dags/example_branch_day_of_week_operator.py
index 6d1a331..dae303a 100644
--- a/airflow/example_dags/example_branch_day_of_week_operator.py
+++ b/airflow/example_dags/example_branch_day_of_week_operator.py
@@ -19,7 +19,7 @@
 """
 Example DAG demonstrating the usage of BranchDayOfWeekOperator.
 """
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
@@ -27,7 +27,7 @@ from airflow.operators.weekday import BranchDayOfWeekOperator
 
 with DAG(
     dag_id="example_weekday_branch_operator",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=["example"],
     schedule_interval="@daily",
diff --git a/airflow/example_dags/example_branch_labels.py b/airflow/example_dags/example_branch_labels.py
index bd6ce09..2215bcf 100644
--- a/airflow/example_dags/example_branch_labels.py
+++ b/airflow/example_dags/example_branch_labels.py
@@ -19,14 +19,17 @@
 """
 Example DAG demonstrating the usage of labels with different branches.
 """
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
 from airflow.utils.edgemodifier import Label
 
 with DAG(
-    "example_branch_labels", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False
+    "example_branch_labels",
+    schedule_interval="@daily",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
 ) as dag:
     ingest = DummyOperator(task_id="ingest")
     analyse = DummyOperator(task_id="analyze")
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index 69f939e..eaa1532 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -19,7 +19,8 @@
 """Example DAG demonstrating the usage of the BranchPythonOperator."""
 
 import random
-from datetime import datetime
+
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
@@ -29,7 +30,7 @@ from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
     dag_id='example_branch_operator',
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule_interval="@daily",
     tags=['example', 'example2'],
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index 09d96be..d85eda1 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -20,7 +20,7 @@
 Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
 or skipped on alternating runs.
 """
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
@@ -49,7 +49,7 @@ def should_run(**kwargs):
 with DAG(
     dag_id='example_branch_dop_operator_v3',
     schedule_interval='*/1 * * * *',
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     default_args={'depends_on_past': True},
     tags=['example'],
diff --git a/airflow/example_dags/example_complex.py b/airflow/example_dags/example_complex.py
index a141236..22e1906 100644
--- a/airflow/example_dags/example_complex.py
+++ b/airflow/example_dags/example_complex.py
@@ -19,7 +19,7 @@
 """
 Example Airflow DAG that shows the complex DAG structure.
 """
-from datetime import datetime
+import pendulum
 
 from airflow import models
 from airflow.models.baseoperator import chain
@@ -28,7 +28,7 @@ from airflow.operators.bash import BashOperator
 with models.DAG(
     dag_id="example_complex",
     schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=['example', 'example2', 'example3'],
 ) as dag:
diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py
index 66b0fa4..af1438c 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -15,10 +15,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from datetime import datetime
 from typing import Any, Dict
 
 import httpx
+import pendulum
 
 from airflow.decorators import dag, task
 from airflow.models.baseoperator import BaseOperator
@@ -37,7 +37,12 @@ class GetRequestOperator(BaseOperator):
 
 
 # [START dag_decorator_usage]
-@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
+@dag(
+    schedule_interval=None,
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+    tags=['example'],
+)
 def example_dag_decorator(email: str = 'example@example.com'):
     """
     DAG to send server IP to email.
diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py
index 851a7ad..eed2f72 100644
--- a/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow/example_dags/example_external_task_marker_dag.py
@@ -37,13 +37,13 @@ interval till one of the following will happen:
     exception
 """
 
-import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
 from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
 
-start_date = datetime.datetime(2015, 1, 1)
+start_date = pendulum.datetime(2021, 1, 1, tz="UTC")
 
 with DAG(
     dag_id="example_external_task_marker_parent",
diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py
index f984909..6318d51 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -20,7 +20,8 @@ This is an example dag for using a Kubernetes Executor Configuration.
 """
 import logging
 import os
-from datetime import datetime
+
+import pendulum
 
 from airflow import DAG
 from airflow.configuration import conf
@@ -45,7 +46,7 @@ if k8s:
     with DAG(
         dag_id='example_kubernetes_executor',
         schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         catchup=False,
         tags=['example3'],
     ) as dag:
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index 76b5f63..67f004a 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -20,7 +20,9 @@ Example LatestOnlyOperator and TriggerRule interactions
 """
 
 # [START example]
-import datetime as dt
+import datetime
+
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
@@ -29,8 +31,8 @@ from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
     dag_id='latest_only_with_trigger',
-    schedule_interval=dt.timedelta(hours=4),
-    start_date=dt.datetime(2021, 1, 1),
+    schedule_interval=datetime.timedelta(hours=4),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=['example3'],
 ) as dag:
diff --git a/airflow/example_dags/example_nested_branch_dag.py b/airflow/example_dags/example_nested_branch_dag.py
index add81a9..27e7105 100644
--- a/airflow/example_dags/example_nested_branch_dag.py
+++ b/airflow/example_dags/example_nested_branch_dag.py
@@ -21,7 +21,7 @@ Example DAG demonstrating a workflow with nested branching. The join tasks are c
 ``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding
 ``BranchPythonOperator`` are skipped.
 """
-from datetime import datetime
+import pendulum
 
 from airflow.models import DAG
 from airflow.operators.dummy import DummyOperator
@@ -30,7 +30,7 @@ from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
     dag_id="example_nested_branch_dag",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule_interval="@daily",
     tags=["example"],
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index d4781af..f97f941 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -18,10 +18,12 @@
 
 """Example DAG demonstrating the usage of the params arguments in templated arguments."""
 
+import datetime
 import os
-from datetime import datetime, timedelta
 from textwrap import dedent
 
+import pendulum
+
 from airflow import DAG
 from airflow.decorators import task
 from airflow.operators.bash import BashOperator
@@ -61,9 +63,9 @@ def print_env_vars(test_mode=None):
 with DAG(
     "example_passing_params_via_test_command",
     schedule_interval='*/1 * * * *',
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
-    dagrun_timeout=timedelta(minutes=4),
+    dagrun_timeout=datetime.timedelta(minutes=4),
     tags=['example'],
 ) as dag:
     run_this = my_py_command(params={"miff": "agg"})
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index d533d84..0f9a7fc 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -23,9 +23,10 @@ virtual environment.
 import logging
 import shutil
 import time
-from datetime import datetime
 from pprint import pprint
 
+import pendulum
+
 from airflow import DAG
 from airflow.decorators import task
 
@@ -34,7 +35,7 @@ log = logging.getLogger(__name__)
 with DAG(
     dag_id='example_python_operator',
     schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=['example'],
 ) as dag:
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index d349685..4c1187a 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -17,7 +17,7 @@
 # under the License.
 
 """Example DAG demonstrating the usage of the ShortCircuitOperator."""
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.models.baseoperator import chain
@@ -26,7 +26,7 @@ from airflow.operators.python import ShortCircuitOperator
 
 with DAG(
     dag_id='example_short_circuit_operator',
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=['example'],
 ) as dag:
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index cb664e7..0e67ed1 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -18,7 +18,7 @@
 
 """Example DAG demonstrating the DummyOperator and a custom DummySkipOperator which skips by default."""
 
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.exceptions import AirflowSkipException
@@ -54,6 +54,11 @@ def create_test_pipeline(suffix, trigger_rule):
     join >> final
 
 
-with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) as dag:
+with DAG(
+    dag_id='example_skip_dag',
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+    tags=['example'],
+) as dag:
     create_test_pipeline('1', TriggerRule.ALL_SUCCESS)
     create_test_pipeline('2', TriggerRule.ONE_SUCCESS)
diff --git a/airflow/example_dags/example_sla_dag.py b/airflow/example_dags/example_sla_dag.py
index 7a46bc4..0db6bc1 100644
--- a/airflow/example_dags/example_sla_dag.py
+++ b/airflow/example_dags/example_sla_dag.py
@@ -15,8 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
 import time
-from datetime import datetime, timedelta
+
+import pendulum
 
 from airflow.decorators import dag, task
 
@@ -39,13 +41,13 @@ def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
 
 @dag(
     schedule_interval="*/2 * * * *",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     sla_miss_callback=sla_callback,
     default_args={'email': "email@example.com"},
 )
 def example_sla_dag():
-    @task(sla=timedelta(seconds=10))
+    @task(sla=datetime.timedelta(seconds=10))
     def sleep_20():
         """Sleep for 20 seconds"""
         time.sleep(20)
diff --git a/airflow/example_dags/example_task_group.py b/airflow/example_dags/example_task_group.py
index d81bf00..46f709e 100644
--- a/airflow/example_dags/example_task_group.py
+++ b/airflow/example_dags/example_task_group.py
@@ -17,7 +17,7 @@
 # under the License.
 
 """Example DAG demonstrating the usage of the TaskGroup."""
-from datetime import datetime
+import pendulum
 
 from airflow.models.dag import DAG
 from airflow.operators.bash import BashOperator
@@ -26,7 +26,10 @@ from airflow.utils.task_group import TaskGroup
 
 # [START howto_task_group]
 with DAG(
-    dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]
+    dag_id="example_task_group",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+    tags=["example"],
 ) as dag:
     start = DummyOperator(task_id="start")
 
diff --git a/airflow/example_dags/example_task_group_decorator.py b/airflow/example_dags/example_task_group_decorator.py
index 0e53a98..30f9d6f 100644
--- a/airflow/example_dags/example_task_group_decorator.py
+++ b/airflow/example_dags/example_task_group_decorator.py
@@ -18,7 +18,7 @@
 
 """Example DAG demonstrating the usage of the @taskgroup decorator."""
 
-from datetime import datetime
+import pendulum
 
 from airflow.decorators import task, task_group
 from airflow.models.dag import DAG
@@ -65,7 +65,10 @@ def task_group_function(value):
 
 # Executing Tasks and TaskGroups
 with DAG(
-    dag_id="example_task_group_decorator", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]
+    dag_id="example_task_group_decorator",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+    tags=["example"],
 ) as dag:
     start_task = task_start()
     end_task = task_end()
diff --git a/airflow/example_dags/example_time_delta_sensor_async.py b/airflow/example_dags/example_time_delta_sensor_async.py
index ce8cab0..1a7126a 100644
--- a/airflow/example_dags/example_time_delta_sensor_async.py
+++ b/airflow/example_dags/example_time_delta_sensor_async.py
@@ -21,7 +21,9 @@ Example DAG demonstrating ``TimeDeltaSensorAsync``, a drop in replacement for ``
 defers and doesn't occupy a worker slot while it waits
 """
 
-from datetime import datetime, timedelta
+import datetime
+
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
@@ -30,10 +32,10 @@ from airflow.sensors.time_delta import TimeDeltaSensorAsync
 with DAG(
     dag_id="example_time_delta_sensor_async",
     schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=["example"],
 ) as dag:
-    wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=10))
+    wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10))
     finish = DummyOperator(task_id="finish")
     wait >> finish
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index 27df3d2..a017c9a 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -21,14 +21,14 @@ Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
 """
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 
 with DAG(
     dag_id="example_trigger_controller_dag",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule_interval="@once",
     tags=['example'],
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 41aecf1..64ccb59 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -21,7 +21,7 @@ Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
 """
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.decorators import task
@@ -41,7 +41,7 @@ def run_this_func(dag_run=None):
 
 with DAG(
     dag_id="example_trigger_target_dag",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule_interval=None,
     tags=['example'],
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 405d5c5..b55d4e5d 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -17,7 +17,7 @@
 # under the License.
 
 """Example DAG demonstrating the usage of XComs."""
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.decorators import task
@@ -64,7 +64,7 @@ def pull_value_from_bash_push(ti=None):
 with DAG(
     'example_xcom',
     schedule_interval="@once",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=['example'],
 ) as dag:
diff --git a/airflow/example_dags/example_xcomargs.py b/airflow/example_dags/example_xcomargs.py
index 7e0cdd9..00af472 100644
--- a/airflow/example_dags/example_xcomargs.py
+++ b/airflow/example_dags/example_xcomargs.py
@@ -18,7 +18,8 @@
 
 """Example DAG demonstrating the usage of the XComArgs."""
 import logging
-from datetime import datetime
+
+import pendulum
 
 from airflow import DAG
 from airflow.decorators import task
@@ -41,7 +42,7 @@ def print_value(value, ts=None):
 
 with DAG(
     dag_id='example_xcom_args',
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule_interval=None,
     tags=['example'],
@@ -50,7 +51,7 @@ with DAG(
 
 with DAG(
     "example_xcom_args_with_operators",
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule_interval=None,
     tags=['example'],
diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py
index d337a03..7c91309 100644
--- a/airflow/example_dags/subdags/subdag.py
+++ b/airflow/example_dags/subdags/subdag.py
@@ -19,7 +19,7 @@
 """Helper function to generate a DAG and operators given some arguments."""
 
 # [START subdag]
-from datetime import datetime
+import pendulum
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
@@ -38,7 +38,7 @@ def subdag(parent_dag_name, child_dag_name, args):
     dag_subdag = DAG(
         dag_id=f'{parent_dag_name}.{child_dag_name}',
         default_args=args,
-        start_date=datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         catchup=False,
         schedule_interval="@daily",
     )
diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py
index dd18449..d039a73 100644
--- a/airflow/example_dags/tutorial_etl_dag.py
+++ b/airflow/example_dags/tutorial_etl_dag.py
@@ -24,9 +24,10 @@ This ETL DAG is demonstrating an Extract -> Transform -> Load pipeline
 # [START tutorial]
 # [START import_module]
 import json
-from datetime import datetime
 from textwrap import dedent
 
+import pendulum
+
 # The DAG object; we'll need this to instantiate a DAG
 from airflow import DAG
 
@@ -45,7 +46,7 @@ with DAG(
     # [END default_args]
     description='ETL DAG tutorial',
     schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     tags=['example'],
 ) as dag:
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py
index 3b0ba51..f6af78f 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl.py
@@ -20,7 +20,8 @@
 # [START tutorial]
 # [START import_module]
 import json
-from datetime import datetime
+
+import pendulum
 
 from airflow.decorators import dag, task
 
@@ -28,7 +29,12 @@ from airflow.decorators import dag, task
 
 
 # [START instantiate_dag]
-@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
+@dag(
+    schedule_interval=None,
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+    tags=['example'],
+)
 def tutorial_taskflow_api_etl():
     """
     ### TaskFlow API Tutorial Documentation
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 477e597..150220c 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -187,6 +187,9 @@ class DAG(LoggingMixin):
     DAGs essentially act as namespaces for tasks. A task_id can only be
     added once to a DAG.
 
+    Note that if you plan to use time zones all the dates provided should be pendulum
+    dates. See :ref:`timezone_aware_dags`.
+
     :param dag_id: The id of the DAG; must consist exclusively of alphanumeric
         characters, dashes, dots and underscores (all ASCII)
     :type dag_id: str
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index bc8361d..ebe20b0 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -42,6 +42,7 @@ from airflow.serialization.json_schema import Validator, load_dag_schema
 from airflow.settings import json
 from airflow.timetables.base import Timetable
 from airflow.utils.code_utils import get_python_source
+from airflow.utils.docs import get_docs_url
 from airflow.utils.module_loading import as_importable_string, import_string
 from airflow.utils.task_group import TaskGroup
 
@@ -113,7 +114,10 @@ def encode_timezone(var: Timezone) -> Union[str, int]:
         return var.offset
     if isinstance(var, Timezone):
         return var.name
-    raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}")
+    raise ValueError(
+        f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}. "
+        f"See {get_docs_url('timezone.html#time-zone-aware-dags')}"
+    )
 
 
 def decode_timezone(var: Union[str, int]) -> Timezone:
diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst
index 951e6b4..3b01b3e 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -121,7 +121,7 @@ Bad example:
 
 .. code-block:: python
 
-  from datetime import datetime
+  import pendulum
 
   from airflow import DAG
   from airflow.operators.python import PythonOperator
@@ -131,7 +131,7 @@ Bad example:
   with DAG(
       dag_id="example_python_operator",
       schedule_interval=None,
-      start_date=datetime(2021, 1, 1),
+      start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       catchup=False,
       tags=["example"],
   ) as dag:
@@ -151,7 +151,7 @@ Good example:
 
 .. code-block:: python
 
-  from datetime import datetime
+  import pendulum
 
   from airflow import DAG
   from airflow.operators.python import PythonOperator
@@ -159,7 +159,7 @@ Good example:
   with DAG(
       dag_id="example_python_operator",
       schedule_interval=None,
-      start_date=datetime(2021, 1, 1),
+      start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       catchup=False,
       tags=["example"],
   ) as dag:
@@ -237,12 +237,13 @@ Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like tha
 
 .. code-block:: python
 
+    import pendulum
     from my_company_utils.common import ALL_TASKS
 
     with DAG(
         dag_id="my_dag",
         schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         catchup=False,
     ) as dag:
         for task in ALL_TASKS:
@@ -486,13 +487,14 @@ This is an example test want to verify the structure of a code-generated DAG aga
 .. code-block:: python
 
     import datetime
+    import pendulum
 
     import pytest
 
     from airflow.utils.state import DagRunState
     from airflow.utils.types import DagRunType
 
-    DATA_INTERVAL_START = datetime.datetime(2021, 9, 13)
+    DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
     DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
 
     TEST_DAG_ID = "my_custom_operator_dag"
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index e339abe..32d21ce 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -38,19 +38,22 @@ There are three ways to declare a DAG - either you can use a context manager,
 which will add the DAG to anything inside it implicitly::
 
     with DAG(
-        "my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False
+        "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule_interval="@daily", catchup=False
     ) as dag:
         op = DummyOperator(task_id="task")
 
 Or, you can use a standard constructor, passing the dag into any
 operators you use::
 
-    my_dag = DAG("my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False)
+    my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+                 schedule_interval="@daily", catchup=False)
     op = DummyOperator(task_id="task", dag=my_dag)
 
 Or, you can use the ``@dag`` decorator to :ref:`turn a function into a DAG generator <concepts:dag-decorator>`::
 
-    @dag(start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False)
+    @dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+         schedule_interval="@daily", catchup=False)
     def generate_dag():
         op = DummyOperator(task_id="task")
 
@@ -214,10 +217,11 @@ Default Arguments
 Often, many Operators inside a DAG need the same set of default arguments (such as their ``retries``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it::
 
 
+    import pendulum
 
     with DAG(
         dag_id='my_dag',
-        start_date=datetime(2016, 1, 1),
+        start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
         schedule_interval='@daily',
         catchup=False,
         default_args={'retries': 2},
@@ -390,7 +394,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
     .. code-block:: python
 
         # dags/branch_without_trigger.py
-        import datetime as dt
+        import pendulum
 
         from airflow.models import DAG
         from airflow.operators.dummy import DummyOperator
@@ -399,7 +403,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
         dag = DAG(
             dag_id="branch_without_trigger",
             schedule_interval="@once",
-            start_date=dt.datetime(2019, 2, 28),
+            start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
         )
 
         run_this_first = DummyOperator(task_id="run_this_first", dag=dag)
@@ -483,9 +487,11 @@ Dependency relationships can be applied across all tasks in a TaskGroup with the
 
 TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``default_args`` in DAG level::
 
+    import pendulum
+
     with DAG(
         dag_id='dag1',
-        start_date=datetime(2016, 1, 1),
+        start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
         schedule_interval="@daily",
         catchup=False,
         default_args={'retries': 1},
@@ -563,9 +569,13 @@ This is especially useful if your tasks are built dynamically from configuration
     """
     ### My great DAG
     """
+    import pendulum
 
     dag = DAG(
-        "my_dag", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False
+        "my_dag",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule_interval="@daily",
+        catchup=False,
     )
     dag.doc_md = __doc__
 
diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst
index 13020f1..30ec5ce 100644
--- a/docs/apache-airflow/concepts/operators.rst
+++ b/docs/apache-airflow/concepts/operators.rst
@@ -175,7 +175,7 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
     dag = DAG(
         dag_id="example_template_as_python_object",
         schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         catchup=False,
         render_template_as_native_obj=True,
     )
diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 62555b1..1a2bbe3 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -113,17 +113,18 @@ in the configuration file. When turned off, the scheduler creates a DAG run only
     """
     from airflow.models.dag import DAG
     from airflow.operators.bash import BashOperator
-    from datetime import datetime, timedelta
 
+    import datetime
+    import pendulum
 
     dag = DAG(
         "tutorial",
         default_args={
             "depends_on_past": True,
             "retries": 1,
-            "retry_delay": timedelta(minutes=3),
+            "retry_delay": datetime.timedelta(minutes=3),
         },
-        start_date=datetime(2015, 12, 1),
+        start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
         description="A simple tutorial DAG",
         schedule_interval="@daily",
         catchup=False,
@@ -225,7 +226,7 @@ Example of a parameterized DAG:
 
 .. code-block:: python
 
-    from datetime import datetime
+    import pendulum
 
     from airflow import DAG
     from airflow.operators.bash import BashOperator
@@ -233,7 +234,7 @@ Example of a parameterized DAG:
     dag = DAG(
         "example_parameterized_dag",
         schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         catchup=False,
     )
 
diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index 341f9fe..496e3e7 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -154,7 +154,8 @@ Here is an example of a task with both features:
 .. code-block:: python
 
     import os
-    from datetime import datetime
+
+    import pendulum
 
     from airflow import DAG
     from airflow.decorators import task
@@ -166,7 +167,7 @@ Here is an example of a task with both features:
     with DAG(
         dag_id="example_pod_template_file",
         schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         catchup=False,
         tags=["example3"],
     ) as dag:
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 7f72a0d..6f2e778 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -149,7 +149,8 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo
 
     from airflow import DAG
     from airflow.operators.python_operator import PythonOperator
-    from datetime import datetime
+
+    import pendulum
 
 
     def create_dag(dag_id, schedule, dag_number, default_args):
@@ -157,7 +158,12 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo
             print("Hello World")
             print("This is DAG: {}".format(str(dag_number)))
 
-        dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
+        dag = DAG(
+            dag_id,
+            schedule_interval=schedule,
+            default_args=default_args,
+            pendulum.datetime(2021, 9, 13, tz="UTC"),
+        )
 
         with dag:
             t1 = PythonOperator(task_id="hello_world", python_callable=hello_world_py)
@@ -213,6 +219,14 @@ backfill CLI command, gets overridden by the backfill's ``start_date`` commands.
 This allows for a backfill on tasks that have ``depends_on_past=True`` to
 actually start. If this were not the case, the backfill just would not start.
 
+Using time zones
+----------------
+
+Creating a time zone aware datetime (e.g. DAG's ``start_date``) is quite simple. Just make sure to supply
+a time zone aware dates using ``pendulum``. Don't try to use standard library
+`timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>`_ as they are known to
+have limitations and we deliberately disallow using them in DAGs.
+
 
 .. _faq:what-does-execution-date-mean:
 
@@ -360,12 +374,12 @@ upstream task.
 
 .. code-block:: python
 
+    import pendulum
+
     from airflow.decorators import dag, task
     from airflow.exceptions import AirflowException
     from airflow.utils.trigger_rule import TriggerRule
 
-    from datetime import datetime
-
 
     @task
     def a_func():
@@ -379,7 +393,7 @@ upstream task.
         pass
 
 
-    @dag(schedule_interval="@once", start_date=datetime(2021, 1, 1))
+    @dag(schedule_interval="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"))
     def my_dag():
         a = a_func()
         b = b_func()
diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst
index 1fd7102..ed902fc 100644
--- a/docs/apache-airflow/howto/timetable.rst
+++ b/docs/apache-airflow/howto/timetable.rst
@@ -69,7 +69,7 @@ file:
 
 .. code-block:: python
 
-    import datetime
+    import pendulum
 
     from airflow import DAG
     from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
@@ -77,7 +77,7 @@ file:
 
     with DAG(
         dag_id="example_after_workday_timetable_dag",
-        start_date=datetime.datetime(2021, 3, 10),
+        start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
         timetable=AfterWorkdayTimetable(),
         tags=["example", "timetable"],
     ) as dag:
@@ -190,7 +190,7 @@ For reference, here's our plugin and DAG files in their entirety:
 
 .. code-block:: python
 
-    import datetime
+    import pendulum
 
     from airflow import DAG
     from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
@@ -199,7 +199,7 @@ For reference, here's our plugin and DAG files in their entirety:
 
     with DAG(
         dag_id="example_workday_timetable",
-        start_date=datetime.datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         timetable=AfterWorkdayTimetable(),
         tags=["example", "timetable"],
     ) as dag:
diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst
index 9b8bb71..3a4db94 100644
--- a/docs/apache-airflow/lineage.rst
+++ b/docs/apache-airflow/lineage.rst
@@ -30,7 +30,8 @@ works.
 
 .. code-block:: python
 
-    from datetime import datetime, timedelta
+    import datetime
+    import pendulum
 
     from airflow.lineage import AUTO
     from airflow.lineage.entities import File
@@ -42,10 +43,10 @@ works.
 
     dag = DAG(
         dag_id="example_lineage",
-        start_date=datetime(2021, 1, 1),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         schedule_interval="0 0 * * *",
         catchup=False,
-        dagrun_timeout=timedelta(minutes=60),
+        dagrun_timeout=datetime.timedelta(minutes=60),
     )
 
     f_final = File(url="/tmp/final")
diff --git a/docs/apache-airflow/logging-monitoring/callbacks.rst b/docs/apache-airflow/logging-monitoring/callbacks.rst
index 77ac594..15bbacb 100644
--- a/docs/apache-airflow/logging-monitoring/callbacks.rst
+++ b/docs/apache-airflow/logging-monitoring/callbacks.rst
@@ -51,7 +51,9 @@ In the following example, failures in any task call the ``task_failure_alert`` f
 
 .. code-block:: python
 
-    from datetime import datetime, timedelta
+    import datetime
+    import pendulum
+
     from airflow import DAG
     from airflow.operators.dummy import DummyOperator
 
@@ -67,8 +69,8 @@ In the following example, failures in any task call the ``task_failure_alert`` f
     with DAG(
         dag_id="example_callback",
         schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        dagrun_timeout=timedelta(minutes=60),
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        dagrun_timeout=datetime.timedelta(minutes=60),
         catchup=False,
         on_success_callback=None,
         on_failure_callback=task_failure_alert,
diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst
index 32e5223..bcbbd11 100644
--- a/docs/apache-airflow/timezone.rst
+++ b/docs/apache-airflow/timezone.rst
@@ -40,6 +40,7 @@ The time zone is set in ``airflow.cfg``. By default it is set to utc, but you ch
 an arbitrary IANA time zone, e.g. ``Europe/Amsterdam``. It is dependent on ``pendulum``, which is more accurate than ``pytz``.
 Pendulum is installed when you install Airflow.
 
+
 Web UI
 ------
 
@@ -90,7 +91,11 @@ words if you have a default time zone setting of ``Europe/Amsterdam`` and create
 
 .. code-block:: python
 
-    dag = DAG("my_dag", start_date=datetime(2017, 1, 1), default_args={"retries": 3})
+    dag = DAG(
+        "my_dag",
+        start_date=pendulum.datetime(2017, 1, 1, tz="UTC"),
+        default_args={"retries": 3},
+    )
     op = BashOperator(task_id="dummy", bash_command="Hello World!", dag=dag)
     print(op.retries)  # 3
 
@@ -120,19 +125,21 @@ it is therefore important to make sure this setting is equal on all Airflow node
 .. note::
     For more information on setting the configuration, see :doc:`howto/set-config`
 
+.. _timezone_aware_dags:
+
 Time zone aware DAGs
 --------------------
 
 Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware ``start_date``
-using ``pendulum``.
+using ``pendulum``. Don't try to use standard library
+`timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>`_ as they are known to
+have limitations and we deliberately disallow using them in DAGs.
 
 .. code-block:: python
 
     import pendulum
 
-    local_tz = pendulum.timezone("Europe/Amsterdam")
-
-    dag = DAG("my_tz_dag", start_date=datetime(2016, 1, 1, tzinfo=local_tz))
+    dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam"))
     op = DummyOperator(task_id="dummy", dag=dag)
     print(dag.timezone)  # <Timezone [Europe/Amsterdam]>
 
@@ -170,6 +177,6 @@ Time deltas
 Time zone aware DAGs that use ``timedelta`` or ``relativedelta`` schedules
 respect daylight savings time for the start date but do not adjust for
 daylight savings time when scheduling subsequent runs. For example, a
-DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="US/Eastern")``
+DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="UTC")``
 and a schedule interval of ``timedelta(days=1)`` will run daily at 05:00
 UTC regardless of daylight savings time.
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index a8f76bc..085be42 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -230,6 +230,14 @@ Note that when executing your script, Airflow will raise exceptions when
 it finds cycles in your DAG or when a dependency is referenced more
 than once.
 
+Using time zones
+----------------
+
+Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware dates
+using ``pendulum``. Don't try to use standard library
+`timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>`_ as they are known to
+have limitations and we deliberately disallow using them in DAGs.
+
 Recap
 -----
 Alright, so we have a pretty basic DAG. At this point your code should look
@@ -474,7 +482,8 @@ Lets look at our DAG:
 
 .. code-block:: python
 
-  from datetime import datetime, timedelta
+  import datetime
+  import pendulum
 
   import requests
   from airflow.decorators import dag, task
@@ -483,9 +492,9 @@ Lets look at our DAG:
 
   @dag(
       schedule_interval="0 0 * * *",
-      start_date=datetime(2021, 1, 1),
+      start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       catchup=False,
-      dagrun_timeout=timedelta(minutes=60),
+      dagrun_timeout=datetime.timedelta(minutes=60),
   )
   def Etl():
       @task
diff --git a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py
index 25ceeba..a12f2f6 100644
--- a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py
+++ b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py
@@ -17,13 +17,15 @@
 # under the License.
 # [START dag]
 """This dag only runs some simple tasks to test Airflow's task execution."""
-from datetime import datetime, timedelta
+import datetime
+
+import pendulum
 
 from airflow.models.dag import DAG
 from airflow.operators.dummy import DummyOperator
 
-now = datetime.now()
-now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0)
+now = pendulum.now(tz="UTC")
+now_to_the_hour = (now - datetime.timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0)
 START_DATE = now_to_the_hour
 DAG_NAME = 'test_dag_v1'
 
@@ -31,7 +33,7 @@ dag = DAG(
     DAG_NAME,
     schedule_interval='*/10 * * * *',
     default_args={'depends_on_past': True},
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
 )