You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2017/03/16 23:36:47 UTC

incubator-airflow git commit: [AIRFLOW-969] Catch bad python_callable argument

Repository: incubator-airflow
Updated Branches:
  refs/heads/master a8bd1695e -> 12901ddfa


[AIRFLOW-969] Catch bad python_callable argument

Checks for callable when Operator is
created, not when it is run.

* added initial PythonOperator unit test, testing
run
* python_callable must be callable; added unit
test

Closes #2142 from abloomston/python-callable


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/12901ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/12901ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/12901ddf

Branch: refs/heads/master
Commit: 12901ddfa9961a11feaa3f17696d19102ff8ecd0
Parents: a8bd169
Author: abloomston <ad...@cloverhealth.com>
Authored: Thu Mar 16 19:36:00 2017 -0400
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Thu Mar 16 19:36:08 2017 -0400

----------------------------------------------------------------------
 airflow/operators/python_operator.py |  3 ++
 tests/operators/__init__.py          |  1 +
 tests/operators/python_operator.py   | 79 +++++++++++++++++++++++++++++++
 3 files changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12901ddf/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index b5f6386..a17e6fa 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -16,6 +16,7 @@ from builtins import str
 from datetime import datetime
 import logging
 
+from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, TaskInstance
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
@@ -63,6 +64,8 @@ class PythonOperator(BaseOperator):
             templates_exts=None,
             *args, **kwargs):
         super(PythonOperator, self).__init__(*args, **kwargs)
+        if not callable(python_callable):
+            raise AirflowException('`python_callable` param must be callable')
         self.python_callable = python_callable
         self.op_args = op_args or []
         self.op_kwargs = op_kwargs or {}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12901ddf/tests/operators/__init__.py
----------------------------------------------------------------------
diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py
index 1fb0e5e..7a517a1 100644
--- a/tests/operators/__init__.py
+++ b/tests/operators/__init__.py
@@ -18,3 +18,4 @@ from .operators import *
 from .sensors import *
 from .hive_operator import *
 from .s3_to_hive_operator import *
+from .python_operator import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12901ddf/tests/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/python_operator.py b/tests/operators/python_operator.py
new file mode 100644
index 0000000..621172f
--- /dev/null
+++ b/tests/operators/python_operator.py
@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function, unicode_literals
+
+import datetime
+import unittest
+
+from airflow import configuration, DAG
+from airflow.operators.python_operator import PythonOperator
+
+from airflow.exceptions import AirflowException
+
+DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+END_DATE = datetime.datetime(2016, 1, 2)
+INTERVAL = datetime.timedelta(hours=12)
+FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+
+
+class PythonOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+        super(PythonOperatorTest, self).setUp()
+        configuration.load_test_config()
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE},
+            schedule_interval=INTERVAL)
+        self.addCleanup(self.dag.clear)
+        self.clear_run()
+        self.addCleanup(self.clear_run)
+
+    def do_run(self):
+        self.run = True
+
+    def clear_run(self):
+        self.run = False
+
+    def is_run(self):
+        return self.run
+
+    def test_python_operator_run(self):
+        """Tests that the python callable is invoked on task run."""
+        task = PythonOperator(
+            python_callable=self.do_run,
+            task_id='python_operator',
+            dag=self.dag)
+        self.assertFalse(self.is_run())
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        self.assertTrue(self.is_run())
+
+    def test_python_operator_python_callable_is_callable(self):
+        """Tests that PythonOperator will only instantiate if
+        the python_callable argument is callable."""
+        not_callable = {}
+        with self.assertRaises(AirflowException):
+            PythonOperator(
+                python_callable=not_callable,
+                task_id='python_operator',
+                dag=self.dag)
+        not_callable = None
+        with self.assertRaises(AirflowException):
+            PythonOperator(
+                python_callable=not_callable,
+                task_id='python_operator',
+                dag=self.dag)