You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/10/31 21:57:59 UTC
incubator-airflow git commit: [AIRFLOW-1769] Add support for
templates in VirtualenvOperator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 44710d7e9 -> 52f8d7da9
[AIRFLOW-1769] Add support for templates in VirtualenvOperator
Closes #2741 from saguziel/aguziel-virtualenv-
templates
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/52f8d7da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/52f8d7da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/52f8d7da
Branch: refs/heads/master
Commit: 52f8d7da9da177b86d036289218377220f6610c2
Parents: 44710d7
Author: Alex Guziel <al...@airbnb.com>
Authored: Tue Oct 31 14:57:55 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Tue Oct 31 14:57:55 2017 -0700
----------------------------------------------------------------------
airflow/operators/python_operator.py | 17 +++++++++++++++--
tests/operators/test_virtualenv_operator.py | 6 ++++++
2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52f8d7da/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index 718c88f..18e7bce 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -54,6 +54,7 @@ class PythonOperator(BaseOperator):
:type templates_dict: dict of str
:param templates_exts: a list of file extensions to resolve while
processing templated fields, for examples ``['.sql', '.hql']``
+ :type templates_exts: list(str)
"""
template_fields = ('templates_dict',)
template_ext = tuple()
@@ -194,15 +195,25 @@ class PythonVirtualenvOperator(PythonOperator):
available to python_callable at runtime as a list(str). Note that args are split
by newline.
:type string_args: list(str)
-
+ :param templates_dict: a dictionary where the values are templates that
+ will get templated by the Airflow engine sometime between
+ ``__init__`` and ``execute`` takes place and are made available
+ in your callable's context after the template has been applied
+ :type templates_dict: dict of str
+ :param templates_exts: a list of file extensions to resolve while
+ processing templated fields, for examples ``['.sql', '.hql']``
+ :type templates_exts: list(str)
"""
def __init__(self, python_callable, requirements=None, python_version=None, use_dill=False,
system_site_packages=True, op_args=None, op_kwargs=None, string_args=None,
- *args, **kwargs):
+ templates_dict=None, templates_exts=None, *args, **kwargs):
super(PythonVirtualenvOperator, self).__init__(
python_callable=python_callable,
op_args=op_args,
op_kwargs=op_kwargs,
+ templates_dict=templates_dict,
+ templates_exts=templates_exts,
+ provide_context=False,
*args,
**kwargs)
self.requirements = requirements or []
@@ -230,6 +241,8 @@ class PythonVirtualenvOperator(PythonOperator):
def execute_callable(self):
with TemporaryDirectory(prefix='venv') as tmp_dir:
+ if self.templates_dict:
+ self.op_kwargs['templates_dict'] = self.templates_dict
# generate filenames
input_filename = os.path.join(tmp_dir, 'script.in')
output_filename = os.path.join(tmp_dir, 'script.out')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52f8d7da/tests/operators/test_virtualenv_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_virtualenv_operator.py b/tests/operators/test_virtualenv_operator.py
index 9231d39..fdd2742 100644
--- a/tests/operators/test_virtualenv_operator.py
+++ b/tests/operators/test_virtualenv_operator.py
@@ -186,3 +186,9 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
def f(a):
return None
self._run_as_operator(f, op_args=[datetime.datetime.now()])
+
+ def test_context(self):
+ def f(**kwargs):
+ return kwargs['templates_dict']['ds']
+ self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
+