You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/14 08:15:43 UTC

[4/6] incubator-airflow git commit: [AIRFLOW-2203] Cache signature in apply_defaults

[AIRFLOW-2203] Cache signature in apply_defaults

Cache inspect.signature for the wrapper closure to avoid calling it at
every decorated invocation. This is separate sig_cache created per
decoration, i.e. each function decorated using apply_defaults will have
a different sig_cache.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/81ec595b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/81ec595b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/81ec595b

Branch: refs/heads/master
Commit: 81ec595b6c1ac05bc7f42e2c92c0dd79409953a4
Parents: 92357d5
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:08:44 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:50 2018 +0100

----------------------------------------------------------------------
 airflow/utils/decorators.py    | 31 ++++++++++-------
 tests/utils/test_decorators.py | 69 +++++++++++++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/81ec595b/airflow/utils/decorators.py
----------------------------------------------------------------------
diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py
index 995e60f..4e9cb05 100644
--- a/airflow/utils/decorators.py
+++ b/airflow/utils/decorators.py
@@ -39,6 +39,19 @@ def apply_defaults(func):
     inheritance and argument defaults, this decorator also alerts with
     specific information about the missing arguments.
     """
+
+    import airflow.models
+    # Cache inspect.signature for the wrapper closure to avoid calling it
+    # at every decorated invocation. This is separate sig_cache created
+    # per decoration, i.e. each function decorated using apply_defaults will
+    # have a different sig_cache.
+    sig_cache = signature(func)
+    non_optional_args = {
+        name for (name, param) in sig_cache.parameters.items()
+        if param.default == param.empty and
+        param.name != 'self' and
+        param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)}
+
     @wraps(func)
     def wrapper(*args, **kwargs):
         if len(args) > 1:
@@ -46,9 +59,9 @@ def apply_defaults(func):
                 "Use keyword arguments when initializing operators")
         dag_args = {}
         dag_params = {}
-        import airflow.models
-        if kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG:
-            dag = kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG
+
+        dag = kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG
+        if dag:
             dag_args = copy(dag.default_args) or {}
             dag_params = copy(dag.params) or {}
 
@@ -67,16 +80,10 @@ def apply_defaults(func):
         dag_args.update(default_args)
         default_args = dag_args
 
-        sig = signature(func)
-        non_optional_args = [
-            name for (name, param) in sig.parameters.items()
-            if param.default == param.empty and
-            param.name != 'self' and
-            param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)]
-        for arg in sig.parameters:
-            if arg in default_args and arg not in kwargs:
+        for arg in sig_cache.parameters:
+            if arg not in kwargs and arg in default_args:
                 kwargs[arg] = default_args[arg]
-        missing_args = list(set(non_optional_args) - set(kwargs))
+        missing_args = list(non_optional_args - set(kwargs))
         if missing_args:
             msg = "Argument {0} is required".format(missing_args)
             raise AirflowException(msg)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/81ec595b/tests/utils/test_decorators.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_decorators.py b/tests/utils/test_decorators.py
new file mode 100644
index 0000000..29dada7
--- /dev/null
+++ b/tests/utils/test_decorators.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+
+
+# Essentially similar to airflow.models.BaseOperator
+class DummyClass(object):
+    @apply_defaults
+    def __init__(self, test_param, params=None, default_args=None):
+        self.test_param = test_param
+
+
+class DummySubClass(DummyClass):
+    @apply_defaults
+    def __init__(self, test_sub_param, *args, **kwargs):
+        super(DummySubClass, self).__init__(*args, **kwargs)
+        self.test_sub_param = test_sub_param
+
+
+class ApplyDefaultTest(unittest.TestCase):
+
+    def test_apply(self):
+        dc = DummyClass(test_param=True)
+        self.assertTrue(dc.test_param)
+
+        with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'):
+            DummySubClass(test_sub_param=True)
+
+    def test_default_args(self):
+        default_args = {'test_param': True}
+        dc = DummyClass(default_args=default_args)
+        self.assertTrue(dc.test_param)
+
+        default_args = {'test_param': True, 'test_sub_param': True}
+        dsc = DummySubClass(default_args=default_args)
+        self.assertTrue(dc.test_param)
+        self.assertTrue(dsc.test_sub_param)
+
+        default_args = {'test_param': True}
+        dsc = DummySubClass(default_args=default_args, test_sub_param=True)
+        self.assertTrue(dc.test_param)
+        self.assertTrue(dsc.test_sub_param)
+
+        with self.assertRaisesRegexp(AirflowException,
+                                     'Argument.*test_sub_param.*required'):
+            DummySubClass(default_args=default_args)
+
+    def test_incorrect_default_args(self):
+        default_args = {'test_param': True, 'extra_param': True}
+        dc = DummyClass(default_args=default_args)
+        self.assertTrue(dc.test_param)
+
+        default_args = {'random_params': True}
+        with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'):
+            DummyClass(default_args=default_args)