You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/12/14 21:46:29 UTC

[airflow] branch v2-2-test updated (05a91b3 -> 4989eea)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 05a91b3  Lazy Jinja2 context (#20217)
     new eb799ba  Lazy Jinja2 context (#20217)
     new 4989eea  Warn without tracebacks when example_dags are missing deps (#20295)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (05a91b3)
            \
             N -- N -- N   refs/heads/v2-2-test (4989eea)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../example_dags/example_kubernetes_executor.py    | 363 +++++++++++----------
 airflow/example_dags/example_python_operator.py    |  51 +--
 .../tutorial_taskflow_api_etl_virtualenv.py        | 112 ++++---
 airflow/ti_deps/deps/trigger_rule_dep.py           |  15 +-
 4 files changed, 274 insertions(+), 267 deletions(-)

[airflow] 01/02: Lazy Jinja2 context (#20217)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit eb799ba3588d181ba5aeb8996a99e79eb040bc40
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Dec 14 15:27:59 2021 +0800

    Lazy Jinja2 context (#20217)
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    (cherry picked from commit 181d60cdd182a9523890bf4822a76ea80666ea92)
---
 airflow/models/baseoperator.py           | 25 ++++++++++------
 airflow/models/param.py                  |  3 +-
 airflow/models/xcom_arg.py               |  5 ++--
 airflow/ti_deps/deps/trigger_rule_dep.py |  2 +-
 airflow/utils/context.py                 | 49 ++++++++++++++++++++++--------
 airflow/utils/helpers.py                 | 51 ++++++++++++++++++++++++++++----
 airflow/utils/log/file_task_handler.py   | 21 ++++++-------
 tests/conftest.py                        |  1 +
 tests/models/test_taskinstance.py        | 21 +++++++++++++
 9 files changed, 136 insertions(+), 42 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 04aebee..9e4cedd 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -69,7 +69,7 @@ from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
 from airflow.utils.edgemodifier import EdgeModifier
-from airflow.utils.helpers import validate_key
+from airflow.utils.helpers import render_template_as_native, render_template_to_string, validate_key
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.operator_resources import Resources
 from airflow.utils.session import provide_session
@@ -1042,7 +1042,11 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         self.__dict__ = state
         self._log = logging.getLogger("airflow.task.operators")
 
-    def render_template_fields(self, context: Dict, jinja_env: Optional[jinja2.Environment] = None) -> None:
+    def render_template_fields(
+        self,
+        context: Context,
+        jinja_env: Optional[jinja2.Environment] = None,
+    ) -> None:
         """
         Template all attributes listed in template_fields. Note this operation is irreversible.
 
@@ -1060,7 +1064,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         self,
         parent: Any,
         template_fields: Iterable[str],
-        context: Dict,
+        context: Context,
         jinja_env: jinja2.Environment,
         seen_oids: Set,
     ) -> None:
@@ -1073,7 +1077,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
     def render_template(
         self,
         content: Any,
-        context: Dict,
+        context: Context,
         jinja_env: Optional[jinja2.Environment] = None,
         seen_oids: Optional[Set] = None,
     ) -> Any:
@@ -1100,11 +1104,14 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         from airflow.models.xcom_arg import XComArg
 
         if isinstance(content, str):
-            if any(content.endswith(ext) for ext in self.template_ext):
-                # Content contains a filepath
-                return jinja_env.get_template(content).render(**context)
+            if any(content.endswith(ext) for ext in self.template_ext):  # Content contains a filepath.
+                template = jinja_env.get_template(content)
             else:
-                return jinja_env.from_string(content).render(**context)
+                template = jinja_env.from_string(content)
+            if self.has_dag() and self.dag.render_template_as_native_obj:
+                return render_template_as_native(template, context)
+            return render_template_to_string(template, context)
+
         elif isinstance(content, (XComArg, DagParam)):
             return content.resolve(context)
 
@@ -1133,7 +1140,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
             return content
 
     def _render_nested_template_fields(
-        self, content: Any, context: Dict, jinja_env: jinja2.Environment, seen_oids: Set
+        self, content: Any, context: Context, jinja_env: jinja2.Environment, seen_oids: Set
     ) -> None:
         if id(content) not in seen_oids:
             seen_oids.add(id(content))
diff --git a/airflow/models/param.py b/airflow/models/param.py
index 53ac79a..6ae6593 100644
--- a/airflow/models/param.py
+++ b/airflow/models/param.py
@@ -21,6 +21,7 @@ from jsonschema import FormatChecker
 from jsonschema.exceptions import ValidationError
 
 from airflow.exceptions import AirflowException
+from airflow.utils.context import Context
 
 
 class NoValueSentinel:
@@ -215,7 +216,7 @@ class DagParam:
         self._name = name
         self._default = default
 
-    def resolve(self, context: Dict) -> Any:
+    def resolve(self, context: Context) -> Any:
         """Pull DagParam value from DagRun context. This method is run during ``op.execute()``."""
         default = self._default
         if not self._default:
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index dd08ab3..6503106 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -15,12 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Any, Dict, List, Optional, Sequence, Union
+from typing import Any, List, Optional, Sequence, Union
 
 from airflow.exceptions import AirflowException
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.taskmixin import TaskMixin
 from airflow.models.xcom import XCOM_RETURN_KEY
+from airflow.utils.context import Context
 from airflow.utils.edgemodifier import EdgeModifier
 
 
@@ -128,7 +129,7 @@ class XComArg(TaskMixin):
         """Proxy to underlying operator set_downstream method. Required by TaskMixin."""
         self.operator.set_downstream(task_or_task_list, edge_modifier)
 
-    def resolve(self, context: Dict) -> Any:
+    def resolve(self, context: Context) -> Any:
         """
         Pull XCom value for the existing arg. This method is run during ``op.execute()``
         in respectable context.
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 5d72410..f685235 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -82,7 +82,7 @@ class TriggerRuleDep(BaseTIDep):
 
     @provide_session
     def _evaluate_trigger_rule(
-        self, ti, successes, skipped, failed, upstream_failed, done, flag_upstream_failed, session
+        self, ti, successes, skipped, failed, upstream_failed, done, flag_upstream_failed, *, session
     ):
         """
         Yields a dependency status that indicate whether the given task instance's trigger
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index fca55c1..61f9319 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -19,8 +19,20 @@
 """Jinja2 template rendering context helper."""
 
 import contextlib
+import copy
 import warnings
-from typing import Any, Container, Dict, Iterable, Iterator, List, MutableMapping, Tuple
+from typing import (
+    AbstractSet,
+    Any,
+    Container,
+    Dict,
+    Iterator,
+    List,
+    MutableMapping,
+    Optional,
+    Tuple,
+    ValuesView,
+)
 
 _NOT_SET: Any = object()
 
@@ -74,16 +86,20 @@ class ConnectionAccessor:
             return default_conn
 
 
+class AirflowContextDeprecationWarning(DeprecationWarning):
+    """Warn for usage of deprecated context variables in a task."""
+
+
 def _create_deprecation_warning(key: str, replacements: List[str]) -> DeprecationWarning:
     message = f"Accessing {key!r} from the template is deprecated and will be removed in a future version."
     if not replacements:
-        return DeprecationWarning(message)
+        return AirflowContextDeprecationWarning(message)
     display_except_last = ", ".join(repr(r) for r in replacements[:-1])
     if display_except_last:
         message += f" Please use {display_except_last} or {replacements[-1]!r} instead."
     else:
         message += f" Please use {replacements[-1]!r} instead."
-    return DeprecationWarning(message)
+    return AirflowContextDeprecationWarning(message)
 
 
 class Context(MutableMapping[str, Any]):
@@ -108,8 +124,10 @@ class Context(MutableMapping[str, Any]):
         "yesterday_ds_nodash": [],
     }
 
-    def __init__(self, context: MutableMapping[str, Any]) -> None:
-        self._context = context
+    def __init__(self, context: Optional[MutableMapping[str, Any]] = None, **kwargs: Any) -> None:
+        self._context = context or {}
+        if kwargs:
+            self._context.update(kwargs)
         self._deprecation_replacements = self._DEPRECATION_REPLACEMENTS.copy()
 
     def __repr__(self) -> str:
@@ -124,9 +142,14 @@ class Context(MutableMapping[str, Any]):
         items = [(key, self[key]) for key in self._context]
         return dict, (items,)
 
+    def __copy__(self) -> "Context":
+        new = type(self)(copy.copy(self._context))
+        new._deprecation_replacements = self._deprecation_replacements.copy()
+        return new
+
     def __getitem__(self, key: str) -> Any:
         with contextlib.suppress(KeyError):
-            warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]), stacklevel=2)
+            warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
         with contextlib.suppress(KeyError):
             return self._context[key]
         raise KeyError(key)
@@ -139,7 +162,7 @@ class Context(MutableMapping[str, Any]):
         self._deprecation_replacements.pop(key, None)
         del self._context[key]
 
-    def __contains__(self, key: str) -> bool:
+    def __contains__(self, key: object) -> bool:
         return key in self._context
 
     def __iter__(self) -> Iterator[str]:
@@ -158,14 +181,16 @@ class Context(MutableMapping[str, Any]):
             return NotImplemented
         return self._context != other._context
 
-    def keys(self) -> Iterable[str]:
+    def keys(self) -> AbstractSet[str]:
         return self._context.keys()
 
-    def items(self) -> Iterable[Tuple[str, Any]]:
+    def items(self) -> AbstractSet[Tuple[str, Any]]:
         return self._context.items()
 
-    def values(self) -> Iterable[Any]:
+    def values(self) -> ValuesView[Any]:
         return self._context.values()
 
-    def copy_only(self, keys: Container[str]) -> "Context[str, Any]":
-        return type(self)({k: v for k, v in self._context.items() if k in keys})
+    def copy_only(self, keys: Container[str]) -> "Context":
+        new = type(self)({k: v for k, v in self._context.items() if k in keys})
+        new._deprecation_replacements = self._deprecation_replacements.copy()
+        return new
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index e6ab39a..c5f9f27 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import copy
 import re
 import warnings
 from datetime import datetime
@@ -24,11 +24,13 @@ from itertools import filterfalse, tee
 from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, TypeVar
 from urllib import parse
 
-from flask import url_for
-from jinja2 import Template
+import flask
+import jinja2
+import jinja2.nativetypes
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
+from airflow.utils.context import Context
 from airflow.utils.module_loading import import_string
 
 if TYPE_CHECKING:
@@ -146,7 +148,7 @@ def as_flattened_list(iterable: Iterable[Iterable[T]]) -> List[T]:
 def parse_template_string(template_string):
     """Parses Jinja template string."""
     if "{{" in template_string:  # jinja mode
-        return None, Template(template_string)
+        return None, jinja2.Template(template_string)
     else:
         return template_string, None
 
@@ -228,5 +230,44 @@ def build_airflow_url_with_query(query: Dict[str, Any]) -> str:
     'http://0.0.0.0:8000/base/graph?dag_id=my-task&root=&execution_date=2020-10-27T10%3A59%3A25.615587
     """
     view = conf.get('webserver', 'dag_default_view').lower()
-    url = url_for(f"Airflow.{view}")
+    url = flask.url_for(f"Airflow.{view}")
     return f"{url}?{parse.urlencode(query)}"
+
+
+# The 'template' argument is typed as Any because the jinja2.Template is too
+# dynamic to be effectively type-checked.
+def render_template(template: Any, context: Context, *, native: bool) -> Any:
+    """Render a Jinja2 template with given Airflow context.
+
+    The default implementation of ``jinja2.Template.render()`` converts the
+    input context into dict eagerly many times, which triggers deprecation
+    messages in our custom context class. This takes the implementation apart
+    and retain the context mapping without resolving instead.
+
+    :param template: A Jinja2 template to render.
+    :param context: The Airflow task context to render the template with.
+    :param native: If set to *True*, render the template into a native type. A
+        DAG can enable this with ``render_template_as_native_obj=True``.
+    :returns: The render result.
+    """
+    context = copy.copy(context)
+    env = template.environment
+    if template.globals:
+        context.update((k, v) for k, v in template.globals.items() if k not in context)
+    try:
+        nodes = template.root_render_func(env.context_class(env, context, template.name, template.blocks))
+    except Exception:
+        env.handle_exception()  # Rewrite traceback to point to the template.
+    if native:
+        return jinja2.nativetypes.native_concat(nodes)
+    return "".join(nodes)
+
+
+def render_template_to_string(template: jinja2.Template, context: Context) -> str:
+    """Shorthand to ``render_template(native=False)`` with better typing support."""
+    return render_template(template, context, native=False)
+
+
+def render_template_as_native(template: jinja2.Template, context: Context) -> Any:
+    """Shorthand to ``render_template(native=True)`` with better typing support."""
+    return render_template(template, context, native=True)
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6d88c20..6e57c67 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -25,7 +25,8 @@ import httpx
 from itsdangerous import TimedJSONWebSignatureSerializer
 
 from airflow.configuration import AirflowConfigException, conf
-from airflow.utils.helpers import parse_template_string
+from airflow.utils.context import Context
+from airflow.utils.helpers import parse_template_string, render_template_to_string
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 
 if TYPE_CHECKING:
@@ -73,23 +74,19 @@ class FileTaskHandler(logging.Handler):
         if self.handler:
             self.handler.close()
 
-    def _render_filename(self, ti, try_number):
+    def _render_filename(self, ti: "TaskInstance", try_number: int) -> str:
         if self.filename_jinja_template:
-            if hasattr(ti, 'task'):
-                jinja_context = ti.get_template_context()
-                jinja_context['try_number'] = try_number
+            if hasattr(ti, "task"):
+                context = ti.get_template_context()
             else:
-                jinja_context = {
-                    'ti': ti,
-                    'ts': ti.execution_date.isoformat(),
-                    'try_number': try_number,
-                }
-            return self.filename_jinja_template.render(**jinja_context)
+                context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
+            context["try_number"] = try_number
+            return render_template_to_string(self.filename_jinja_template, context)
 
         return self.filename_template.format(
             dag_id=ti.dag_id,
             task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(),
+            execution_date=ti.get_dagrun().logical_date.isoformat(),
             try_number=try_number,
         )
 
diff --git a/tests/conftest.py b/tests/conftest.py
index 94f915f..f7248d1 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -231,6 +231,7 @@ def breeze_test_helper(request):
 
 
 def pytest_configure(config):
+    config.addinivalue_line("filterwarnings", "error::airflow.utils.context.AirflowContextDeprecationWarning")
     config.addinivalue_line("markers", "integration(name): mark test to run with named integration")
     config.addinivalue_line("markers", "backend(name): mark test to run with named backend")
     config.addinivalue_line("markers", "system(name): mark test to run with named system")
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index f07147f..8458ea9 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1512,6 +1512,27 @@ class TestTaskInstance:
         assert isinstance(template_context["data_interval_start"], pendulum.DateTime)
         assert isinstance(template_context["data_interval_end"], pendulum.DateTime)
 
+    def test_template_render(self, create_task_instance):
+        ti = create_task_instance(
+            dag_id="test_template_render",
+            task_id="test_template_render_task",
+            schedule_interval="0 12 * * *",
+        )
+        template_context = ti.get_template_context()
+        result = ti.task.render_template("Task: {{ dag.dag_id }} -> {{ task.task_id }}", template_context)
+        assert result == "Task: test_template_render -> test_template_render_task"
+
+    def test_template_render_deprecated(self, create_task_instance):
+        ti = create_task_instance(
+            dag_id="test_template_render",
+            task_id="test_template_render_task",
+            schedule_interval="0 12 * * *",
+        )
+        template_context = ti.get_template_context()
+        with pytest.deprecated_call():
+            result = ti.task.render_template("Execution date: {{ execution_date }}", template_context)
+        assert result.startswith("Execution date: ")
+
     @pytest.mark.parametrize(
         "content, expected_output",
         [

[airflow] 02/02: Warn without tracebacks when example_dags are missing deps (#20295)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4989eea84a86951d218587d7b8061cdd7bfd7ce4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Dec 14 14:28:17 2021 -0700

    Warn without tracebacks when example_dags are missing deps (#20295)
    
    (cherry picked from commit 5a6c022f946d1be2bd68a42a7a920fdf932932e5)
---
 .../example_dags/example_kubernetes_executor.py    | 363 +++++++++++----------
 airflow/example_dags/example_python_operator.py    |  51 +--
 .../tutorial_taskflow_api_etl_virtualenv.py        | 112 ++++---
 3 files changed, 272 insertions(+), 254 deletions(-)

diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py
index 95a0f92..f984909 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -35,195 +35,198 @@ worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
 try:
     from kubernetes.client import models as k8s
 except ImportError:
-    log.warning("Could not import DAGs in example_kubernetes_executor.py", exc_info=True)
-    log.warning("Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
-
-
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example3'],
-) as dag:
-    # You can use annotations on your kubernetes pods!
-    start_task_executor_config = {
-        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-    }
-
-    @task(executor_config=start_task_executor_config)
-    def start_task():
-        print_stuff()
-
-    start_task = start_task()
-
-    # [START task_with_volume]
-    executor_config_volume_mount = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base",
-                        volume_mounts=[
-                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                        ],
-                    )
-                ],
-                volumes=[
-                    k8s.V1Volume(
-                        name="example-kubernetes-test-volume",
-                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                    )
-                ],
+    log.warning(
+        "The example_kubernetes_executor example DAG requires the kubernetes provider."
+        " Please install it with: pip install apache-airflow[cncf.kubernetes]"
+    )
+    k8s = None
+
+if k8s:
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example3'],
+    ) as dag:
+        # You can use annotations on your kubernetes pods!
+        start_task_executor_config = {
+            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+        }
+
+        @task(executor_config=start_task_executor_config)
+        def start_task():
+            print_stuff()
+
+        start_task = start_task()
+
+        # [START task_with_volume]
+        executor_config_volume_mount = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[
+                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                            ],
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="example-kubernetes-test-volume",
+                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                        )
+                    ],
+                )
+            ),
+        }
+
+        @task(executor_config=executor_config_volume_mount)
+        def test_volume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+
+            with open('/foo/volume_mount_test.txt', 'w') as foo:
+                foo.write('Hello')
+
+            return_code = os.system("cat /foo/volume_mount_test.txt")
+            if return_code != 0:
+                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+        volume_task = test_volume_mount()
+        # [END task_with_volume]
+
+        # [START task_with_sidecar]
+        executor_config_sidecar = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                            command=["bash", "-cx"],
+                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                        ),
+                    ],
+                    volumes=[
+                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                    ],
+                )
+            ),
+        }
+
+        @task(executor_config=executor_config_sidecar)
+        def test_sharedvolume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            for i in range(5):
+                try:
+                    return_code = os.system("cat /shared/test.txt")
+                    if return_code != 0:
+                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+                except ValueError as e:
+                    if i > 4:
+                        raise e
+
+        sidecar_task = test_sharedvolume_mount()
+        # [END task_with_sidecar]
+
+        # You can add labels to pods
+        executor_config_non_root = {
+            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+        }
+
+        @task(executor_config=executor_config_non_root)
+        def non_root_task():
+            print_stuff()
+
+        third_task = non_root_task()
+
+        executor_config_other_ns = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
             )
-        ),
-    }
-
-    @task(executor_config=executor_config_volume_mount)
-    def test_volume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-
-        with open('/foo/volume_mount_test.txt', 'w') as foo:
-            foo.write('Hello')
-
-        return_code = os.system("cat /foo/volume_mount_test.txt")
-        if return_code != 0:
-            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-    volume_task = test_volume_mount()
-    # [END task_with_volume]
-
-    # [START task_with_sidecar]
-    executor_config_sidecar = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base",
-                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                    ),
-                    k8s.V1Container(
-                        name="sidecar",
-                        image="ubuntu",
-                        args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                        command=["bash", "-cx"],
-                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                    ),
-                ],
-                volumes=[
-                    k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                ],
+        }
+
+        @task(executor_config=executor_config_other_ns)
+        def other_namespace_task():
+            print_stuff()
+
+        other_ns_task = other_namespace_task()
+
+        # You can also change the base image, here we used the worker image for demonstration.
+        # Note that the image must have the same configuration as the
+        # worker image. Could be that you want to run this task in a special docker image that has a zip
+        # library built-in. You build the special docker image on top your worker image.
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base", image=f"{worker_container_repository}:{worker_container_tag}"
+                        ),
+                    ]
+                )
             )
-        ),
-    }
-
-    @task(executor_config=executor_config_sidecar)
-    def test_sharedvolume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        for i in range(5):
-            try:
-                return_code = os.system("cat /shared/test.txt")
-                if return_code != 0:
-                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-            except ValueError as e:
-                if i > 4:
-                    raise e
-
-    sidecar_task = test_sharedvolume_mount()
-    # [END task_with_sidecar]
-
-    # You can add labels to pods
-    executor_config_non_root = {
-        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-    }
-
-    @task(executor_config=executor_config_non_root)
-    def non_root_task():
-        print_stuff()
-
-    third_task = non_root_task()
-
-    executor_config_other_ns = {
-        "pod_override": k8s.V1Pod(
-            metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
-        )
-    }
-
-    @task(executor_config=executor_config_other_ns)
-    def other_namespace_task():
-        print_stuff()
-
-    other_ns_task = other_namespace_task()
-
-    # You can also change the base image, here we used the worker image for demonstration.
-    # Note that the image must have the same configuration as the
-    # worker image. Could be that you want to run this task in a special docker image that has a zip
-    # library built-in. You build the special docker image on top your worker image.
-    kube_exec_config_special = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base", image=f"{worker_container_repository}:{worker_container_tag}"
-                    ),
+        }
+
+        @task(executor_config=kube_exec_config_special)
+        def base_image_override_task():
+            print_stuff()
+
+        base_image_task = base_image_override_task()
+
+        # Use k8s_client.V1Affinity to define node affinity
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
                 ]
             )
         )
-    }
-
-    @task(executor_config=kube_exec_config_special)
-    def base_image_override_task():
-        print_stuff()
-
-    base_image_task = base_image_override_task()
-
-    # Use k8s_client.V1Affinity to define node affinity
-    k8s_affinity = k8s.V1Affinity(
-        pod_anti_affinity=k8s.V1PodAntiAffinity(
-            required_during_scheduling_ignored_during_execution=[
-                k8s.V1PodAffinityTerm(
-                    label_selector=k8s.V1LabelSelector(
-                        match_expressions=[
-                            k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
-                        ]
-                    ),
-                    topology_key='kubernetes.io/hostname',
-                )
-            ]
-        )
-    )
 
-    # Use k8s_client.V1Toleration to define node tolerations
-    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    # Use k8s_client.V1ResourceRequirements to define resource limits
-    k8s_resource_requirements = k8s.V1ResourceRequirements(
-        requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
-    )
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_requirements = k8s.V1ResourceRequirements(
+            requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
+        )
 
-    kube_exec_config_resource_limits = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base",
-                        resources=k8s_resource_requirements,
-                    )
-                ],
-                affinity=k8s_affinity,
-                tolerations=k8s_tolerations,
+        kube_exec_config_resource_limits = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_requirements,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
             )
-        )
-    }
+        }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def task_with_resource_limits():
-        print_stuff()
+        @task(executor_config=kube_exec_config_resource_limits)
+        def task_with_resource_limits():
+            print_stuff()
 
-    four_task = task_with_resource_limits()
+        four_task = task_with_resource_limits()
 
-    start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task]
+        start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task]
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 8d5ce59..d533d84 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -20,6 +20,8 @@
 Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
 virtual environment.
 """
+import logging
+import shutil
 import time
 from datetime import datetime
 from pprint import pprint
@@ -27,6 +29,8 @@ from pprint import pprint
 from airflow import DAG
 from airflow.decorators import task
 
+log = logging.getLogger(__name__)
+
 with DAG(
     dag_id='example_python_operator',
     schedule_interval=None,
@@ -59,29 +63,32 @@ with DAG(
         run_this >> sleeping_task
     # [END howto_operator_python_kwargs]
 
-    # [START howto_operator_python_venv]
-    @task.virtualenv(
-        task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
-    )
-    def callable_virtualenv():
-        """
-        Example function that will be performed in a virtual environment.
+    if not shutil.which("virtualenv"):
+        log.warning("The virtalenv_python example task requires virtualenv, please install it.")
+    else:
+        # [START howto_operator_python_venv]
+        @task.virtualenv(
+            task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
+        )
+        def callable_virtualenv():
+            """
+            Example function that will be performed in a virtual environment.
 
-        Importing at the module level ensures that it will not attempt to import the
-        library before it is installed.
-        """
-        from time import sleep
+            Importing at the module level ensures that it will not attempt to import the
+            library before it is installed.
+            """
+            from time import sleep
 
-        from colorama import Back, Fore, Style
+            from colorama import Back, Fore, Style
 
-        print(Fore.RED + 'some red text')
-        print(Back.GREEN + 'and with a green background')
-        print(Style.DIM + 'and in dim text')
-        print(Style.RESET_ALL)
-        for _ in range(10):
-            print(Style.DIM + 'Please wait...', flush=True)
-            sleep(10)
-        print('Finished')
+            print(Fore.RED + 'some red text')
+            print(Back.GREEN + 'and with a green background')
+            print(Style.DIM + 'and in dim text')
+            print(Style.RESET_ALL)
+            for _ in range(10):
+                print(Style.DIM + 'Please wait...', flush=True)
+                sleep(10)
+            print('Finished')
 
-    virtualenv_task = callable_virtualenv()
-    # [END howto_operator_python_venv]
+        virtualenv_task = callable_virtualenv()
+        # [END howto_operator_python_venv]
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
index 09aefcb..ac28095 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
@@ -17,65 +17,73 @@
 # under the License.
 
 
+import logging
+import shutil
 from datetime import datetime
 
 from airflow.decorators import dag, task
 
+log = logging.getLogger(__name__)
 
-@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
-def tutorial_taskflow_api_etl_virtualenv():
-    """
-    ### TaskFlow API example using virtualenv
-    This is a simple ETL data pipeline example which demonstrates the use of
-    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
-    """
-
-    @task.virtualenv(
-        use_dill=True,
-        system_site_packages=False,
-        requirements=['funcsigs'],
+if not shutil.which("virtualenv"):
+    log.warning(
+        "The tutorial_taskflow_api_etl_virtualenv example DAG requires virtualenv, please install it."
     )
-    def extract():
-        """
-        #### Extract task
-        A simple Extract task to get data ready for the rest of the data
-        pipeline. In this case, getting data is simulated by reading from a
-        hardcoded JSON string.
-        """
-        import json
-
-        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+else:
 
-        order_data_dict = json.loads(data_string)
-        return order_data_dict
-
-    @task(multiple_outputs=True)
-    def transform(order_data_dict: dict):
+    @dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
+    def tutorial_taskflow_api_etl_virtualenv():
         """
-        #### Transform task
-        A simple Transform task which takes in the collection of order data and
-        computes the total order value.
+        ### TaskFlow API example using virtualenv
+        This is a simple ETL data pipeline example which demonstrates the use of
+        the TaskFlow API using three simple tasks for Extract, Transform, and Load.
         """
-        total_order_value = 0
-
-        for value in order_data_dict.values():
-            total_order_value += value
-
-        return {"total_order_value": total_order_value}
-
-    @task()
-    def load(total_order_value: float):
-        """
-        #### Load task
-        A simple Load task which takes in the result of the Transform task and
-        instead of saving it to end user review, just prints it out.
-        """
-
-        print(f"Total order value is: {total_order_value:.2f}")
-
-    order_data = extract()
-    order_summary = transform(order_data)
-    load(order_summary["total_order_value"])
-
 
-tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()
+        @task.virtualenv(
+            use_dill=True,
+            system_site_packages=False,
+            requirements=['funcsigs'],
+        )
+        def extract():
+            """
+            #### Extract task
+            A simple Extract task to get data ready for the rest of the data
+            pipeline. In this case, getting data is simulated by reading from a
+            hardcoded JSON string.
+            """
+            import json
+
+            data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+
+            order_data_dict = json.loads(data_string)
+            return order_data_dict
+
+        @task(multiple_outputs=True)
+        def transform(order_data_dict: dict):
+            """
+            #### Transform task
+            A simple Transform task which takes in the collection of order data and
+            computes the total order value.
+            """
+            total_order_value = 0
+
+            for value in order_data_dict.values():
+                total_order_value += value
+
+            return {"total_order_value": total_order_value}
+
+        @task()
+        def load(total_order_value: float):
+            """
+            #### Load task
+            A simple Load task which takes in the result of the Transform task and
+            instead of saving it to end user review, just prints it out.
+            """
+
+            print(f"Total order value is: {total_order_value:.2f}")
+
+        order_data = extract()
+        order_summary = transform(order_data)
+        load(order_summary["total_order_value"])
+
+    tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()