You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/09/29 15:45:48 UTC

[airflow] branch main updated: Static start_date and default arg cleanup for misc. provider example DAGs (#18597)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ef037e7  Static start_date and default arg cleanup for misc. provider example DAGs (#18597)
ef037e7 is described below

commit ef037e702182e4370cb00c853c4fb0e246a0479c
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Wed Sep 29 11:45:27 2021 -0400

    Static start_date and default arg cleanup for misc. provider example DAGs (#18597)
---
 .../example_dags/example_airbyte_trigger_job.py    |  9 +++-----
 .../cloud/example_dags/example_oss_bucket.py       | 18 ++++++++--------
 .../cloud/example_dags/example_oss_object.py       | 19 +++++------------
 .../providers/asana/example_dags/example_asana.py  |  9 +++-----
 .../kubernetes/example_dags/example_kubernetes.py  |  5 +++--
 .../example_dags/example_spark_kubernetes.py       | 17 ++++-----------
 .../databricks/example_dags/example_databricks.py  | 13 ++++--------
 .../dingding/example_dags/example_dingding.py      | 17 +++------------
 .../docker/example_dags/example_docker.py          | 16 ++++-----------
 .../example_dags/example_docker_copy_data.py       | 16 ++++-----------
 .../docker/example_dags/example_docker_swarm.py    | 12 ++---------
 .../providers/http/example_dags/example_http.py    | 11 ++++------
 .../jdbc/example_dags/example_jdbc_queries.py      | 13 ++++--------
 .../providers/mysql/example_dags/example_mysql.py  | 10 +++++----
 .../providers/neo4j/example_dags/example_neo4j.py  |  8 ++++----
 .../papermill/example_dags/example_papermill.py    | 21 +++++++++++--------
 .../plexus/example_dags/example_plexus.py          |  7 +++----
 .../postgres/example_dags/example_postgres.py      |  7 +------
 .../qubole/example_dags/example_qubole.py          | 17 ++++++++-------
 .../example_dags/example_singularity.py            | 21 +++++++++----------
 .../providers/slack/example_dags/example_slack.py  | 14 ++++---------
 .../snowflake/example_dags/example_snowflake.py    | 17 ++++++---------
 .../sqlite/example_dags/example_sqlite.py          | 10 +++++----
 .../tableau/example_dags/example_tableau.py        | 24 +++++++---------------
 .../example_tableau_refresh_workbook.py            | 15 +++++++-------
 .../telegram/example_dags/example_telegram.py      |  9 +++-----
 .../example_dags/example_yandexcloud_dataproc.py   | 10 +++------
 27 files changed, 132 insertions(+), 233 deletions(-)

diff --git a/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
index ff6d711..55563ff 100644
--- a/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
+++ b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
@@ -18,25 +18,24 @@
 
 """Example DAG demonstrating the usage of the AirbyteTriggerSyncOperator."""
 
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
 from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
-from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id='example_airbyte_operator',
     schedule_interval=None,
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
     dagrun_timeout=timedelta(minutes=60),
     tags=['example'],
+    catchup=False,
 ) as dag:
 
     # [START howto_operator_airbyte_synchronous]
     sync_source_destination = AirbyteTriggerSyncOperator(
         task_id='airbyte_sync_source_dest_example',
-        airbyte_conn_id='airbyte_default',
         connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
     )
     # [END howto_operator_airbyte_synchronous]
@@ -44,7 +43,6 @@ with DAG(
     # [START howto_operator_airbyte_asynchronous]
     async_source_destination = AirbyteTriggerSyncOperator(
         task_id='airbyte_async_source_dest_example',
-        airbyte_conn_id='airbyte_default',
         connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
         asynchronous=True,
     )
@@ -52,7 +50,6 @@ with DAG(
     airbyte_sensor = AirbyteJobSensor(
         task_id='airbyte_sensor_source_dest_example',
         airbyte_job_id=async_source_destination.output,
-        airbyte_conn_id='airbyte_default',
     )
     # [END howto_operator_airbyte_asynchronous]
 
diff --git a/airflow/providers/alibaba/cloud/example_dags/example_oss_bucket.py b/airflow/providers/alibaba/cloud/example_dags/example_oss_bucket.py
index 51a524e..1ca52c8 100644
--- a/airflow/providers/alibaba/cloud/example_dags/example_oss_bucket.py
+++ b/airflow/providers/alibaba/cloud/example_dags/example_oss_bucket.py
@@ -14,26 +14,24 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from datetime import datetime
 
 from airflow.models.dag import DAG
 from airflow.providers.alibaba.cloud.operators.oss import OSSCreateBucketOperator, OSSDeleteBucketOperator
-from airflow.utils.dates import days_ago
 
+# [START howto_operator_oss_bucket]
 with DAG(
     dag_id='oss_bucket_dag',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    default_args={'region': 'your region', 'bucket_name': 'your bucket'},
     max_active_runs=1,
     tags=['example'],
+    catchup=False,
 ) as dag:
 
-    # [START howto_operator_oss_bucket]
-    create_bucket = OSSCreateBucketOperator(
-        oss_conn_id='oss_default', region='your region', task_id='task1', bucket_name='your bucket'
-    )
+    create_bucket = OSSCreateBucketOperator(task_id='task1')
 
-    delete_bucket = OSSDeleteBucketOperator(
-        oss_conn_id='oss_default', region='your region', task_id='task2', bucket_name='your bucket'
-    )
-    # [END howto_operator_oss_bucket]
+    delete_bucket = OSSDeleteBucketOperator(task_id='task2')
 
     create_bucket >> delete_bucket
+# [END howto_operator_oss_bucket]
diff --git a/airflow/providers/alibaba/cloud/example_dags/example_oss_object.py b/airflow/providers/alibaba/cloud/example_dags/example_oss_object.py
index 4b69015..92afaf7 100644
--- a/airflow/providers/alibaba/cloud/example_dags/example_oss_object.py
+++ b/airflow/providers/alibaba/cloud/example_dags/example_oss_object.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from datetime import datetime
+
 from airflow.models.dag import DAG
 from airflow.providers.alibaba.cloud.operators.oss import (
     OSSDeleteBatchObjectOperator,
@@ -22,47 +24,36 @@ from airflow.providers.alibaba.cloud.operators.oss import (
     OSSDownloadObjectOperator,
     OSSUploadObjectOperator,
 )
-from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id='oss_object_dag',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    default_args={'region': 'your region', 'bucket_name': 'your bucket'},
     max_active_runs=1,
     tags=['example'],
+    catchup=False,
 ) as dag:
 
     create_object = OSSUploadObjectOperator(
         file='your local file',
         key='your oss key',
-        oss_conn_id='oss_default',
-        region='your region',
         task_id='task1',
-        bucket_name='your bucket',
     )
 
     download_object = OSSDownloadObjectOperator(
         file='your local file',
         key='your oss key',
-        oss_conn_id='oss_default',
-        region='your region',
         task_id='task2',
-        bucket_name='your bucket',
     )
 
     delete_object = OSSDeleteObjectOperator(
         key='your oss key',
-        oss_conn_id='oss_default',
-        region='your region',
         task_id='task3',
-        bucket_name='your bucket',
     )
 
     delete_batch_object = OSSDeleteBatchObjectOperator(
         keys=['obj1', 'obj2', 'obj3'],
-        oss_conn_id='oss_default',
-        region='your region',
         task_id='task4',
-        bucket_name='your bucket',
     )
 
     create_object >> download_object >> delete_object >> delete_batch_object
diff --git a/airflow/providers/asana/example_dags/example_asana.py b/airflow/providers/asana/example_dags/example_asana.py
index 82aef7d..092a3b9 100644
--- a/airflow/providers/asana/example_dags/example_asana.py
+++ b/airflow/providers/asana/example_dags/example_asana.py
@@ -27,7 +27,6 @@ from airflow.providers.asana.operators.asana_tasks import (
     AsanaFindTaskOperator,
     AsanaUpdateTaskOperator,
 )
-from airflow.utils.dates import days_ago
 
 ASANA_TASK_TO_UPDATE = os.environ.get("ASANA_TASK_TO_UPDATE")
 ASANA_TASK_TO_DELETE = os.environ.get("ASANA_TASK_TO_DELETE")
@@ -41,8 +40,10 @@ CONN_ID = os.environ.get("ASANA_CONNECTION_ID")
 
 with DAG(
     "example_asana",
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    default_args={"conn_id": CONN_ID},
     tags=["example"],
+    catchup=False,
 ) as dag:
     # [START run_asana_create_task_operator]
     # Create a task. `task_parameters` is used to specify attributes the new task should have.
@@ -52,7 +53,6 @@ with DAG(
     create = AsanaCreateTaskOperator(
         task_id="run_asana_create_task",
         task_parameters={"notes": "Some notes about the task."},
-        conn_id=CONN_ID,
         name="New Task Name",
     )
     # [END run_asana_create_task_operator]
@@ -67,7 +67,6 @@ with DAG(
     find = AsanaFindTaskOperator(
         task_id="run_asana_find_task",
         search_parameters={"project": ASANA_PROJECT_ID_OVERRIDE, "modified_since": one_week_ago},
-        conn_id=CONN_ID,
     )
     # [END run_asana_find_task_operator]
 
@@ -78,7 +77,6 @@ with DAG(
         task_id="run_asana_update_task",
         asana_task_gid=ASANA_TASK_TO_UPDATE,
         task_parameters={"notes": "This task was updated!", "completed": True},
-        conn_id=CONN_ID,
     )
     # [END run_asana_update_task_operator]
 
@@ -86,7 +84,6 @@ with DAG(
     # Delete a task. This task will complete successfully even if `asana_task_gid` does not exist.
     delete = AsanaDeleteTaskOperator(
         task_id="run_asana_delete_task",
-        conn_id=CONN_ID,
         asana_task_gid=ASANA_TASK_TO_DELETE,
     )
     # [END run_asana_delete_task_operator]
diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
index 77a5498..07acbeb 100644
--- a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
@@ -19,13 +19,14 @@
 This is an example dag for using the KubernetesPodOperator.
 """
 
+from datetime import datetime
+
 from kubernetes.client import models as k8s
 
 from airflow import DAG
 from airflow.kubernetes.secret import Secret
 from airflow.operators.bash import BashOperator
 from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
-from airflow.utils.dates import days_ago
 
 # [START howto_operator_k8s_cluster_resources]
 secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
@@ -100,7 +101,7 @@ tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
 with DAG(
     dag_id='example_kubernetes_operator',
     schedule_interval=None,
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
     tags=['example'],
 ) as dag:
     k = KubernetesPodOperator(
diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py b/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py
index b2320ac..d01d4b1 100644
--- a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py
@@ -25,7 +25,7 @@ Spark-on-k8s operator is required to be already installed on Kubernetes
 https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
 """
 
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 # [START import_module]
 # The DAG object; we'll need this to instantiate a DAG
@@ -34,7 +34,6 @@ from airflow import DAG
 # Operators; we need this to operate!
 from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
 from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
-from airflow.utils.dates import days_ago
 
 # [END import_module]
 
@@ -43,24 +42,17 @@ from airflow.utils.dates import days_ago
 
 dag = DAG(
     'spark_pi',
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email': ['airflow@example.com'],
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'max_active_runs': 1,
-    },
+    default_args={'max_active_runs': 1},
     description='submit spark-pi as sparkApplication on kubernetes',
     schedule_interval=timedelta(days=1),
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
 )
 
 t1 = SparkKubernetesOperator(
     task_id='spark_pi_submit',
     namespace="default",
     application_file="example_spark_kubernetes_spark_pi.yaml",
-    kubernetes_conn_id="kubernetes_default",
     do_xcom_push=True,
     dag=dag,
 )
@@ -69,7 +61,6 @@ t2 = SparkKubernetesSensor(
     task_id='spark_pi_monitor',
     namespace="default",
     application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
-    kubernetes_conn_id="kubernetes_default",
     dag=dag,
 )
 t1 >> t2
diff --git a/airflow/providers/databricks/example_dags/example_databricks.py b/airflow/providers/databricks/example_dags/example_databricks.py
index 10d8996..2f1310c 100644
--- a/airflow/providers/databricks/example_dags/example_databricks.py
+++ b/airflow/providers/databricks/example_dags/example_databricks.py
@@ -31,22 +31,17 @@ For more information about the state of a run refer to
 https://docs.databricks.com/api/latest/jobs.html#runstate
 """
 
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
-from airflow.utils.dates import days_ago
-
-default_args = {
-    'owner': 'airflow',
-    'email': ['airflow@example.com'],
-    'depends_on_past': False,
-}
 
 with DAG(
     dag_id='example_databricks_operator',
-    default_args=default_args,
     schedule_interval='@daily',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
     tags=['example'],
+    catchup=False,
 ) as dag:
     new_cluster = {
         'spark_version': '2.1.0-db3-scala2.11',
diff --git a/airflow/providers/dingding/example_dags/example_dingding.py b/airflow/providers/dingding/example_dags/example_dingding.py
index 727fcc7..cddd7b8 100644
--- a/airflow/providers/dingding/example_dags/example_dingding.py
+++ b/airflow/providers/dingding/example_dags/example_dingding.py
@@ -18,11 +18,10 @@
 """
 This is an example dag for using the DingdingOperator.
 """
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.providers.dingding.operators.dingding import DingdingOperator
-from airflow.utils.dates import days_ago
 
 
 # [START howto_operator_dingding_failure_callback]
@@ -43,7 +42,6 @@ def failure_callback(context):
     )
     return DingdingOperator(
         task_id='dingding_success_callback',
-        dingding_conn_id='dingding_default',
         message_type='text',
         message=message,
         at_all=True,
@@ -57,14 +55,14 @@ with DAG(
     default_args={'retries': 3, 'on_failure_callback': failure_callback},
     schedule_interval='@once',
     dagrun_timeout=timedelta(minutes=60),
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
     tags=['example'],
+    catchup=False,
 ) as dag:
 
     # [START howto_operator_dingding]
     text_msg_remind_none = DingdingOperator(
         task_id='text_msg_remind_none',
-        dingding_conn_id='dingding_default',
         message_type='text',
         message='Airflow dingding text message remind none',
         at_mobiles=None,
@@ -74,7 +72,6 @@ with DAG(
 
     text_msg_remind_specific = DingdingOperator(
         task_id='text_msg_remind_specific',
-        dingding_conn_id='dingding_default',
         message_type='text',
         message='Airflow dingding text message remind specific users',
         at_mobiles=['156XXXXXXXX', '130XXXXXXXX'],
@@ -83,7 +80,6 @@ with DAG(
 
     text_msg_remind_include_invalid = DingdingOperator(
         task_id='text_msg_remind_include_invalid',
-        dingding_conn_id='dingding_default',
         message_type='text',
         message='Airflow dingding text message remind users including invalid',
         # 123 is invalid user or user not in the group
@@ -94,7 +90,6 @@ with DAG(
     # [START howto_operator_dingding_remind_users]
     text_msg_remind_all = DingdingOperator(
         task_id='text_msg_remind_all',
-        dingding_conn_id='dingding_default',
         message_type='text',
         message='Airflow dingding text message remind all users in group',
         # list of user phone/email here in the group
@@ -106,7 +101,6 @@ with DAG(
 
     link_msg = DingdingOperator(
         task_id='link_msg',
-        dingding_conn_id='dingding_default',
         message_type='link',
         message={
             'title': 'Airflow dingding link message',
@@ -119,7 +113,6 @@ with DAG(
     # [START howto_operator_dingding_rich_text]
     markdown_msg = DingdingOperator(
         task_id='markdown_msg',
-        dingding_conn_id='dingding_default',
         message_type='markdown',
         message={
             'title': 'Airflow dingding markdown message',
@@ -135,7 +128,6 @@ with DAG(
 
     single_action_card_msg = DingdingOperator(
         task_id='single_action_card_msg',
-        dingding_conn_id='dingding_default',
         message_type='actionCard',
         message={
             'title': 'Airflow dingding single actionCard message',
@@ -151,7 +143,6 @@ with DAG(
 
     multi_action_card_msg = DingdingOperator(
         task_id='multi_action_card_msg',
-        dingding_conn_id='dingding_default',
         message_type='actionCard',
         message={
             'title': 'Airflow dingding multi actionCard message',
@@ -169,7 +160,6 @@ with DAG(
 
     feed_card_msg = DingdingOperator(
         task_id='feed_card_msg',
-        dingding_conn_id='dingding_default',
         message_type='feedCard',
         message={
             "links": [
@@ -194,7 +184,6 @@ with DAG(
 
     msg_failure_callback = DingdingOperator(
         task_id='msg_failure_callback',
-        dingding_conn_id='dingding_default',
         message_type='not_support_msg_type',
         message="",
     )
diff --git a/airflow/providers/docker/example_dags/example_docker.py b/airflow/providers/docker/example_dags/example_docker.py
index 3cb1eb8..fba2c43 100644
--- a/airflow/providers/docker/example_dags/example_docker.py
+++ b/airflow/providers/docker/example_dags/example_docker.py
@@ -15,26 +15,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.operators.bash import BashOperator
 from airflow.providers.docker.operators.docker import DockerOperator
-from airflow.utils.dates import days_ago
 
 dag = DAG(
     'docker_sample',
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email': ['airflow@example.com'],
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5),
-    },
+    default_args={'retries': 1},
     schedule_interval=timedelta(minutes=10),
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
 )
 
 t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
diff --git a/airflow/providers/docker/example_dags/example_docker_copy_data.py b/airflow/providers/docker/example_dags/example_docker_copy_data.py
index a6ab50e..f03c4be 100644
--- a/airflow/providers/docker/example_dags/example_docker_copy_data.py
+++ b/airflow/providers/docker/example_dags/example_docker_copy_data.py
@@ -25,7 +25,7 @@ TODO: Review the workflow, change it accordingly to
       your environment & enable the code.
 """
 
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from docker.types import Mount
 
@@ -33,21 +33,13 @@ from airflow import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.python import ShortCircuitOperator
 from airflow.providers.docker.operators.docker import DockerOperator
-from airflow.utils.dates import days_ago
 
 dag = DAG(
     "docker_sample_copy_data",
-    default_args={
-        "owner": "airflow",
-        "depends_on_past": False,
-        "email": ["airflow@example.com"],
-        "email_on_failure": False,
-        "email_on_retry": False,
-        "retries": 1,
-        "retry_delay": timedelta(minutes=5),
-    },
+    default_args={"retries": 1},
     schedule_interval=timedelta(minutes=10),
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
 )
 
 locate_file_cmd = """
diff --git a/airflow/providers/docker/example_dags/example_docker_swarm.py b/airflow/providers/docker/example_dags/example_docker_swarm.py
index 36db9c3..365a4b4 100644
--- a/airflow/providers/docker/example_dags/example_docker_swarm.py
+++ b/airflow/providers/docker/example_dags/example_docker_swarm.py
@@ -15,23 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator
-from airflow.utils.dates import days_ago
 
 dag = DAG(
     'docker_swarm_sample',
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email': ['airflow@example.com'],
-        'email_on_failure': False,
-        'email_on_retry': False,
-    },
     schedule_interval=timedelta(minutes=10),
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
     catchup=False,
 )
 
diff --git a/airflow/providers/http/example_dags/example_http.py b/airflow/providers/http/example_dags/example_http.py
index 8c93765..59589b8 100644
--- a/airflow/providers/http/example_dags/example_http.py
+++ b/airflow/providers/http/example_dags/example_http.py
@@ -19,21 +19,18 @@
 """Example HTTP operator and sensor"""
 
 import json
-from datetime import timedelta
+from datetime import datetime
 
 from airflow import DAG
 from airflow.providers.http.operators.http import SimpleHttpOperator
 from airflow.providers.http.sensors.http import HttpSensor
-from airflow.utils.dates import days_ago
 
 dag = DAG(
     'example_http_operator',
-    default_args={
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5),
-    },
+    default_args={'retries': 1},
     tags=['example'],
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
 )
 
 dag.doc_md = __doc__
diff --git a/airflow/providers/jdbc/example_dags/example_jdbc_queries.py b/airflow/providers/jdbc/example_dags/example_jdbc_queries.py
index 651ca9c..eaf7908 100644
--- a/airflow/providers/jdbc/example_dags/example_jdbc_queries.py
+++ b/airflow/providers/jdbc/example_dags/example_jdbc_queries.py
@@ -18,25 +18,22 @@
 
 """Example DAG demonstrating the usage of the JdbcOperator."""
 
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.operators.dummy import DummyOperator
 from airflow.providers.jdbc.operators.jdbc import JdbcOperator
-from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id='example_jdbc_operator',
     schedule_interval='0 0 * * *',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
     dagrun_timeout=timedelta(minutes=60),
     tags=['example'],
+    catchup=False,
 ) as dag:
 
-    run_this_last = DummyOperator(
-        task_id='run_this_last',
-        dag=dag,
-    )
+    run_this_last = DummyOperator(task_id='run_this_last')
 
     # [START howto_operator_jdbc_template]
     delete_data = JdbcOperator(
@@ -44,7 +41,6 @@ with DAG(
         sql='delete from my_schema.my_table where dt = {{ ds }}',
         jdbc_conn_id='my_jdbc_connection',
         autocommit=True,
-        dag=dag,
     )
     # [END howto_operator_jdbc_template]
 
@@ -54,7 +50,6 @@ with DAG(
         sql='insert into my_schema.my_table select dt, value from my_schema.source_data',
         jdbc_conn_id='my_jdbc_connection',
         autocommit=True,
-        dag=dag,
     )
     # [END howto_operator_jdbc]
 
diff --git a/airflow/providers/mysql/example_dags/example_mysql.py b/airflow/providers/mysql/example_dags/example_mysql.py
index 029881f..0418536 100644
--- a/airflow/providers/mysql/example_dags/example_mysql.py
+++ b/airflow/providers/mysql/example_dags/example_mysql.py
@@ -19,20 +19,23 @@
 Example use of MySql related operators.
 """
 
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.mysql.operators.mysql import MySqlOperator
-from airflow.utils.dates import days_ago
 
 dag = DAG(
     'example_mysql',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    default_args={'mysql_conn_id': 'mysql_conn_id'},
     tags=['example'],
+    catchup=False,
 )
 
 # [START howto_operator_mysql]
 
 drop_table_mysql_task = MySqlOperator(
-    task_id='create_table_mysql', mysql_conn_id='mysql_conn_id', sql=r"""DROP TABLE table_name;""", dag=dag
+    task_id='create_table_mysql', sql=r"""DROP TABLE table_name;""", dag=dag
 )
 
 # [END howto_operator_mysql]
@@ -41,7 +44,6 @@ drop_table_mysql_task = MySqlOperator(
 
 mysql_task = MySqlOperator(
     task_id='create_table_mysql_external_file',
-    mysql_conn_id='mysql_conn_id',
     sql='/scripts/drop_table.sql',
     dag=dag,
 )
diff --git a/airflow/providers/neo4j/example_dags/example_neo4j.py b/airflow/providers/neo4j/example_dags/example_neo4j.py
index 3e8b674..16ad48d 100644
--- a/airflow/providers/neo4j/example_dags/example_neo4j.py
+++ b/airflow/providers/neo4j/example_dags/example_neo4j.py
@@ -19,14 +19,16 @@
 Example use of Neo4j related operators.
 """
 
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
-from airflow.utils.dates import days_ago
 
 dag = DAG(
     'example_neo4j',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
     tags=['example'],
+    catchup=False,
 )
 
 # [START run_query_neo4j_operator]
@@ -39,5 +41,3 @@ neo4j_task = Neo4jOperator(
 )
 
 # [END run_query_neo4j_operator]
-
-neo4j_task
diff --git a/airflow/providers/papermill/example_dags/example_papermill.py b/airflow/providers/papermill/example_dags/example_papermill.py
index 1adb0e8..c49b771 100644
--- a/airflow/providers/papermill/example_dags/example_papermill.py
+++ b/airflow/providers/papermill/example_dags/example_papermill.py
@@ -21,7 +21,7 @@ it will create an output notebook "out-<date>". All fields, including the keys i
 templated.
 """
 import os
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 import scrapbook as sb
 
@@ -29,14 +29,18 @@ from airflow import DAG
 from airflow.decorators import task
 from airflow.lineage import AUTO
 from airflow.providers.papermill.operators.papermill import PapermillOperator
-from airflow.utils.dates import days_ago
+
+START_DATE = datetime(2021, 1, 1)
+SCHEDULE_INTERVAL = '0 0 * * *'
+DAGRUN_TIMEOUT = timedelta(minutes=60)
 
 with DAG(
     dag_id='example_papermill_operator',
-    schedule_interval='0 0 * * *',
-    start_date=days_ago(2),
-    dagrun_timeout=timedelta(minutes=60),
+    schedule_interval=SCHEDULE_INTERVAL,
+    start_date=START_DATE,
+    dagrun_timeout=DAGRUN_TIMEOUT,
     tags=['example'],
+    catchup=False,
 ) as dag_1:
     # [START howto_operator_papermill]
     run_this = PapermillOperator(
@@ -65,9 +69,10 @@ def check_notebook(inlets, execution_date):
 
 with DAG(
     dag_id='example_papermill_operator_2',
-    schedule_interval='0 0 * * *',
-    start_date=days_ago(2),
-    dagrun_timeout=timedelta(minutes=60),
+    schedule_interval=SCHEDULE_INTERVAL,
+    start_date=START_DATE,
+    dagrun_timeout=DAGRUN_TIMEOUT,
+    catchup=False,
 ) as dag_2:
 
     run_this = PapermillOperator(
diff --git a/airflow/providers/plexus/example_dags/example_plexus.py b/airflow/providers/plexus/example_dags/example_plexus.py
index 2f0a492..68ddcb7 100644
--- a/airflow/providers/plexus/example_dags/example_plexus.py
+++ b/airflow/providers/plexus/example_dags/example_plexus.py
@@ -15,9 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.plexus.operators.job import PlexusJobOperator
-from airflow.utils.dates import days_ago
 
 HOME = '/home/acc'
 T3_PRERUN_SCRIPT = 'cp {home}/imdb/run_scripts/mlflow.sh {home}/ && chmod +x mlflow.sh'.format(home=HOME)
@@ -27,7 +28,7 @@ dag = DAG(
     'test',
     default_args={'owner': 'core scientific', 'retries': 1},
     description='testing plexus operator',
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
     schedule_interval='@once',
     catchup=False,
 )
@@ -44,5 +45,3 @@ t1 = PlexusJobOperator(
     },
     dag=dag,
 )
-
-t1
diff --git a/airflow/providers/postgres/example_dags/example_postgres.py b/airflow/providers/postgres/example_dags/example_postgres.py
index ac86b95..b86c394 100644
--- a/airflow/providers/postgres/example_dags/example_postgres.py
+++ b/airflow/providers/postgres/example_dags/example_postgres.py
@@ -33,7 +33,6 @@ with DAG(
     # [START postgres_operator_howto_guide_create_pet_table]
     create_pet_table = PostgresOperator(
         task_id="create_pet_table",
-        postgres_conn_id="postgres_default",
         sql="""
             CREATE TABLE IF NOT EXISTS pet (
             pet_id SERIAL PRIMARY KEY,
@@ -47,7 +46,6 @@ with DAG(
     # [START postgres_operator_howto_guide_populate_pet_table]
     populate_pet_table = PostgresOperator(
         task_id="populate_pet_table",
-        postgres_conn_id="postgres_default",
         sql="""
             INSERT INTO pet (name, pet_type, birth_date, OWNER)
             VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
@@ -61,14 +59,11 @@ with DAG(
     )
     # [END postgres_operator_howto_guide_populate_pet_table]
     # [START postgres_operator_howto_guide_get_all_pets]
-    get_all_pets = PostgresOperator(
-        task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
-    )
+    get_all_pets = PostgresOperator(task_id="get_all_pets", sql="SELECT * FROM pet;")
     # [END postgres_operator_howto_guide_get_all_pets]
     # [START postgres_operator_howto_guide_get_birth_date]
     get_birth_date = PostgresOperator(
         task_id="get_birth_date",
-        postgres_conn_id="postgres_default",
         sql="""
             SELECT * FROM pet
             WHERE birth_date
diff --git a/airflow/providers/qubole/example_dags/example_qubole.py b/airflow/providers/qubole/example_dags/example_qubole.py
index 3b3fc86..eacc127 100644
--- a/airflow/providers/qubole/example_dags/example_qubole.py
+++ b/airflow/providers/qubole/example_dags/example_qubole.py
@@ -19,6 +19,7 @@
 import filecmp
 import random
 import textwrap
+from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
@@ -26,12 +27,14 @@ from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import BranchPythonOperator
 from airflow.providers.qubole.operators.qubole import QuboleOperator
 from airflow.providers.qubole.sensors.qubole import QuboleFileSensor, QubolePartitionSensor
-from airflow.utils.dates import days_ago
+from airflow.utils.trigger_rule import TriggerRule
+
+START_DATE = datetime(2021, 1, 1)
 
 with DAG(
     dag_id='example_qubole_operator',
     schedule_interval=None,
-    start_date=days_ago(2),
+    start_date=START_DATE,
     tags=['example'],
 ) as dag:
     dag.doc_md = textwrap.dedent(
@@ -47,7 +50,7 @@ with DAG(
         """
     )
 
-    @task(trigger_rule='all_done')
+    @task(trigger_rule=TriggerRule.ALL_DONE)
     def compare_result(hive_show_table, hive_s3_location, ti=None):
         """
         Compares the results of two QuboleOperator tasks.
@@ -75,8 +78,6 @@ with DAG(
         # them into corresponding airflow task logs
         tags='airflow_example_run',
         # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
-        qubole_conn_id='qubole_default',
-        # Connection id to submit commands inside QDS, if not set "qubole_default" is used
         params={
             'cluster_label': 'default',
         },
@@ -98,7 +99,7 @@ with DAG(
 
     [hive_show_table, hive_s3_location] >> compare_result(hive_s3_location, hive_show_table) >> branching
 
-    join = DummyOperator(task_id='join', trigger_rule='one_success')
+    join = DummyOperator(task_id='join', trigger_rule=TriggerRule.ONE_SUCCESS)
 
     hadoop_jar_cmd = QuboleOperator(
         task_id='hadoop_jar_cmd',
@@ -201,8 +202,7 @@ with DAG(
 with DAG(
     dag_id='example_qubole_sensor',
     schedule_interval=None,
-    start_date=days_ago(2),
-    doc_md=__doc__,
+    start_date=START_DATE,
     tags=['example'],
 ) as dag2:
     dag2.doc_md = textwrap.dedent(
@@ -220,7 +220,6 @@ with DAG(
 
     check_s3_file = QuboleFileSensor(
         task_id='check_s3_file',
-        qubole_conn_id='qubole_default',
         poke_interval=60,
         timeout=600,
         data={
diff --git a/airflow/providers/singularity/example_dags/example_singularity.py b/airflow/providers/singularity/example_dags/example_singularity.py
index 83c9a7b..cf54a28 100644
--- a/airflow/providers/singularity/example_dags/example_singularity.py
+++ b/airflow/providers/singularity/example_dags/example_singularity.py
@@ -16,32 +16,31 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.operators.bash import BashOperator
 from airflow.providers.singularity.operators.singularity import SingularityOperator
-from airflow.utils.dates import days_ago
 
 with DAG(
     'singularity_sample',
-    default_args={
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5),
-    },
+    default_args={'retries': 1},
     schedule_interval=timedelta(minutes=10),
-    start_date=days_ago(0),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
 ) as dag:
 
-    t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
+    t1 = BashOperator(task_id='print_date', bash_command='date')
 
-    t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)
+    t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3)
 
     t3 = SingularityOperator(
-        command='/bin/sleep 30', image='docker://busybox:1.30.1', task_id='singularity_op_tester', dag=dag
+        command='/bin/sleep 30',
+        image='docker://busybox:1.30.1',
+        task_id='singularity_op_tester',
     )
 
-    t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"', dag=dag)
+    t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"')
 
     t1 >> [t2, t3]
     t3 >> t4
diff --git a/airflow/providers/slack/example_dags/example_slack.py b/airflow/providers/slack/example_dags/example_slack.py
index 494e714..e6516b7 100644
--- a/airflow/providers/slack/example_dags/example_slack.py
+++ b/airflow/providers/slack/example_dags/example_slack.py
@@ -15,14 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from datetime import datetime
+
 from airflow.models.dag import DAG
 from airflow.providers.slack.operators.slack import SlackAPIFileOperator
-from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id='slack_example_dag',
     schedule_interval=None,
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    default_args={'slack_conn_id': 'slack', 'channel': '#general', 'initial_comment': 'Hello World!'},
     max_active_runs=1,
     tags=['example'],
 ) as dag:
@@ -30,10 +32,6 @@ with DAG(
     # Send file with filename and filetype
     slack_operator_file = SlackAPIFileOperator(
         task_id="slack_file_upload_1",
-        dag=dag,
-        slack_conn_id="slack",
-        channel="#general",
-        initial_comment="Hello World!",
         filename="/files/dags/test.txt",
         filetype="txt",
     )
@@ -43,10 +41,6 @@ with DAG(
     # Send file content
     slack_operator_file_content = SlackAPIFileOperator(
         task_id="slack_file_upload_2",
-        dag=dag,
-        slack_conn_id="slack",
-        channel="#general",
-        initial_comment="Hello World!",
         content="file content in txt",
     )
     # [END slack_operator_howto_guide_send_file_content]
diff --git a/airflow/providers/snowflake/example_dags/example_snowflake.py b/airflow/providers/snowflake/example_dags/example_snowflake.py
index a15eb08..482b746 100644
--- a/airflow/providers/snowflake/example_dags/example_snowflake.py
+++ b/airflow/providers/snowflake/example_dags/example_snowflake.py
@@ -18,11 +18,12 @@
 """
 Example use of Snowflake related operators.
 """
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
 from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
 from airflow.providers.snowflake.transfers.snowflake_to_slack import SnowflakeToSlackOperator
-from airflow.utils.dates import days_ago
 
 SNOWFLAKE_CONN_ID = 'my_snowflake_conn'
 SLACK_CONN_ID = 'my_slack_conn'
@@ -50,8 +51,10 @@ SNOWFLAKE_SLACK_MESSAGE = (
 
 dag = DAG(
     'example_snowflake',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    default_args={'snowflake_conn_id': SNOWFLAKE_CONN_ID},
     tags=['example'],
+    catchup=False,
 )
 
 # [START howto_operator_snowflake]
@@ -59,7 +62,6 @@ dag = DAG(
 snowflake_op_sql_str = SnowflakeOperator(
     task_id='snowflake_op_sql_str',
     dag=dag,
-    snowflake_conn_id=SNOWFLAKE_CONN_ID,
     sql=CREATE_TABLE_SQL_STRING,
     warehouse=SNOWFLAKE_WAREHOUSE,
     database=SNOWFLAKE_DATABASE,
@@ -70,7 +72,6 @@ snowflake_op_sql_str = SnowflakeOperator(
 snowflake_op_with_params = SnowflakeOperator(
     task_id='snowflake_op_with_params',
     dag=dag,
-    snowflake_conn_id=SNOWFLAKE_CONN_ID,
     sql=SQL_INSERT_STATEMENT,
     parameters={"id": 56},
     warehouse=SNOWFLAKE_WAREHOUSE,
@@ -79,21 +80,17 @@ snowflake_op_with_params = SnowflakeOperator(
     role=SNOWFLAKE_ROLE,
 )
 
-snowflake_op_sql_list = SnowflakeOperator(
-    task_id='snowflake_op_sql_list', dag=dag, snowflake_conn_id=SNOWFLAKE_CONN_ID, sql=SQL_LIST
-)
+snowflake_op_sql_list = SnowflakeOperator(task_id='snowflake_op_sql_list', dag=dag, sql=SQL_LIST)
 
 snowflake_op_sql_multiple_stmts = SnowflakeOperator(
     task_id='snowflake_op_sql_multiple_stmts',
     dag=dag,
-    snowflake_conn_id=SNOWFLAKE_CONN_ID,
     sql=SQL_MULTIPLE_STMTS,
 )
 
 snowflake_op_template_file = SnowflakeOperator(
     task_id='snowflake_op_template_file',
     dag=dag,
-    snowflake_conn_id=SNOWFLAKE_CONN_ID,
     sql='/path/to/sql/<filename>.sql',
 )
 
@@ -103,7 +100,6 @@ snowflake_op_template_file = SnowflakeOperator(
 
 copy_into_table = S3ToSnowflakeOperator(
     task_id='copy_into_table',
-    snowflake_conn_id=SNOWFLAKE_CONN_ID,
     s3_keys=[S3_FILE_PATH],
     table=SNOWFLAKE_SAMPLE_TABLE,
     schema=SNOWFLAKE_SCHEMA,
@@ -120,7 +116,6 @@ slack_report = SnowflakeToSlackOperator(
     task_id="slack_report",
     sql=SNOWFLAKE_SLACK_SQL,
     slack_message=SNOWFLAKE_SLACK_MESSAGE,
-    snowflake_conn_id=SNOWFLAKE_CONN_ID,
     slack_conn_id=SLACK_CONN_ID,
     dag=dag,
 )
diff --git a/airflow/providers/sqlite/example_dags/example_sqlite.py b/airflow/providers/sqlite/example_dags/example_sqlite.py
index 5799871..b175599 100644
--- a/airflow/providers/sqlite/example_dags/example_sqlite.py
+++ b/airflow/providers/sqlite/example_dags/example_sqlite.py
@@ -23,16 +23,18 @@ which when triggered, is performed on the connected sqlite database.
 The second task is similar but instead calls the SQL command from an external file.
 """
 
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.sqlite.hooks.sqlite import SqliteHook
 from airflow.providers.sqlite.operators.sqlite import SqliteOperator
-from airflow.utils.dates import days_ago
 
 dag = DAG(
     dag_id='example_sqlite',
     schedule_interval='@daily',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
     tags=['example'],
+    catchup=False,
 )
 
 # [START howto_operator_sqlite]
@@ -55,7 +57,7 @@ create_table_sqlite_task = SqliteOperator(
 
 @dag.task(task_id="insert_sqlite_task")
 def insert_sqlite_hook():
-    sqlite_hook = SqliteHook("sqlite_default")
+    sqlite_hook = SqliteHook()
 
     rows = [('James', '11'), ('James', '22'), ('James', '33')]
     target_fields = ['first_name', 'last_name']
@@ -64,7 +66,7 @@ def insert_sqlite_hook():
 
 @dag.task(task_id="replace_sqlite_task")
 def replace_sqlite_hook():
-    sqlite_hook = SqliteHook("sqlite_default")
+    sqlite_hook = SqliteHook()
 
     rows = [('James', '11'), ('James', '22'), ('James', '33')]
     target_fields = ['first_name', 'last_name']
diff --git a/airflow/providers/tableau/example_dags/example_tableau.py b/airflow/providers/tableau/example_dags/example_tableau.py
index 04d87d2..0ad956a 100644
--- a/airflow/providers/tableau/example_dags/example_tableau.py
+++ b/airflow/providers/tableau/example_dags/example_tableau.py
@@ -20,27 +20,18 @@ This is an example dag that performs two refresh operations on a Tableau Workboo
 waits until it succeeds. The second does not wait since this is an asynchronous operation and we don't know
 when the operation actually finishes. That's why we have another task that checks only that.
 """
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.providers.tableau.operators.tableau import TableauOperator
 from airflow.providers.tableau.sensors.tableau_job_status import TableauJobStatusSensor
-from airflow.utils.dates import days_ago
-
-DEFAULT_ARGS = {
-    'owner': 'airflow',
-    'depends_on_past': False,
-    'email': ['airflow@example.com'],
-    'email_on_failure': False,
-    'email_on_retry': False,
-}
 
 with DAG(
     dag_id='example_tableau',
-    default_args=DEFAULT_ARGS,
+    default_args={'site_id': 'my_site'},
     dagrun_timeout=timedelta(hours=2),
     schedule_interval=None,
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
     tags=['example'],
 ) as dag:
     # Refreshes a workbook and waits until it succeeds.
@@ -50,7 +41,6 @@ with DAG(
         method='refresh',
         find='MyWorkbook',
         match_with='name',
-        site_id='my_site',
         blocking_refresh=True,
         task_id='refresh_tableau_workbook_blocking',
     )
@@ -61,14 +51,14 @@ with DAG(
         method='refresh',
         find='MyWorkbook',
         match_with='name',
-        site_id='my_site',
         blocking_refresh=False,
         task_id='refresh_tableau_workbook_non_blocking',
     )
     # The following task queries the status of the workbook refresh job until it succeeds.
     task_check_job_status = TableauJobStatusSensor(
-        site_id='my_site',
-        job_id="{{ ti.xcom_pull(task_ids='refresh_tableau_workbook_non_blocking') }}",
+        job_id=task_refresh_workbook_non_blocking.output,
         task_id='check_tableau_job_status',
     )
-    task_refresh_workbook_non_blocking >> task_check_job_status
+
+    # Task dependency created via XComArgs:
+    #   task_refresh_workbook_non_blocking >> task_check_job_status
diff --git a/airflow/providers/tableau/example_dags/example_tableau_refresh_workbook.py b/airflow/providers/tableau/example_dags/example_tableau_refresh_workbook.py
index 650f24a..3d4091c 100644
--- a/airflow/providers/tableau/example_dags/example_tableau_refresh_workbook.py
+++ b/airflow/providers/tableau/example_dags/example_tableau_refresh_workbook.py
@@ -20,38 +20,37 @@ This is an example dag that performs two refresh operations on a Tableau Workboo
 waits until it succeeds. The second does not wait since this is an asynchronous operation and we don't know
 when the operation actually finishes. That's why we have another task that checks only that.
 """
-from datetime import timedelta
+from datetime import datetime, timedelta
 
 from airflow import DAG
 from airflow.providers.tableau.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator
 from airflow.providers.tableau.sensors.tableau_job_status import TableauJobStatusSensor
-from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id='example_tableau_refresh_workbook',
     dagrun_timeout=timedelta(hours=2),
     schedule_interval=None,
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    default_args={'site_id': 'my_site'},
     tags=['example'],
 ) as dag:
     # Refreshes a workbook and waits until it succeeds.
     task_refresh_workbook_blocking = TableauRefreshWorkbookOperator(
-        site_id='my_site',
         workbook_name='MyWorkbook',
         blocking=True,
         task_id='refresh_tableau_workbook_blocking',
     )
     # Refreshes a workbook and does not wait until it succeeds.
     task_refresh_workbook_non_blocking = TableauRefreshWorkbookOperator(
-        site_id='my_site',
         workbook_name='MyWorkbook',
         blocking=False,
         task_id='refresh_tableau_workbook_non_blocking',
     )
     # The following task queries the status of the workbook refresh job until it succeeds.
     task_check_job_status = TableauJobStatusSensor(
-        site_id='my_site',
-        job_id="{{ ti.xcom_pull(task_ids='refresh_tableau_workbook_non_blocking') }}",
+        job_id=task_refresh_workbook_non_blocking.output,
         task_id='check_tableau_job_status',
     )
-    task_refresh_workbook_non_blocking >> task_check_job_status
+
+    # Task dependency created via XComArgs:
+    #   task_refresh_workbook_non_blocking >> task_check_job_status
diff --git a/airflow/providers/telegram/example_dags/example_telegram.py b/airflow/providers/telegram/example_dags/example_telegram.py
index 31d6629..76e7180 100644
--- a/airflow/providers/telegram/example_dags/example_telegram.py
+++ b/airflow/providers/telegram/example_dags/example_telegram.py
@@ -19,15 +19,12 @@
 Example use of Telegram operator.
 """
 
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.telegram.operators.telegram import TelegramOperator
-from airflow.utils.dates import days_ago
 
-dag = DAG(
-    'example_telegram',
-    start_date=days_ago(2),
-    tags=['example'],
-)
+dag = DAG('example_telegram', start_date=datetime(2021, 1, 1), tags=['example'])
 
 # [START howto_operator_telegram]
 
diff --git a/airflow/providers/yandex/example_dags/example_yandexcloud_dataproc.py b/airflow/providers/yandex/example_dags/example_yandexcloud_dataproc.py
index 6d9a384..e35fae5 100644
--- a/airflow/providers/yandex/example_dags/example_yandexcloud_dataproc.py
+++ b/airflow/providers/yandex/example_dags/example_yandexcloud_dataproc.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from datetime import datetime
+
 from airflow import DAG
 from airflow.providers.yandex.operators.yandexcloud_dataproc import (
     DataprocCreateClusterOperator,
@@ -24,14 +26,9 @@ from airflow.providers.yandex.operators.yandexcloud_dataproc import (
     DataprocCreateSparkJobOperator,
     DataprocDeleteClusterOperator,
 )
-from airflow.utils.dates import days_ago
 
 # should be filled with appropriate ids
 
-# Airflow connection with type "yandexcloud" must be created.
-# By default connection with id "yandexcloud_default" will be used
-CONNECTION_ID = 'yandexcloud_default'
-
 # Name of the datacenter where Dataproc cluster will be created
 AVAILABILITY_ZONE_ID = 'ru-central1-c'
 
@@ -42,13 +39,12 @@ S3_BUCKET_NAME_FOR_JOB_LOGS = ''
 with DAG(
     'example_yandexcloud_dataproc_operator',
     schedule_interval=None,
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
     tags=['example'],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id='create_cluster',
         zone=AVAILABILITY_ZONE_ID,
-        connection_id=CONNECTION_ID,
         s3_bucket=S3_BUCKET_NAME_FOR_JOB_LOGS,
         computenode_count=1,
         computenode_max_hosts_count=5,