You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2017/03/16 23:36:47 UTC
incubator-airflow git commit: [AIRFLOW-969] Catch bad python_callable
argument
Repository: incubator-airflow
Updated Branches:
refs/heads/master a8bd1695e -> 12901ddfa
[AIRFLOW-969] Catch bad python_callable argument
Checks for callable when Operator is
created, not when it is run.
* added initial PythonOperator unit test, testing
run
* python_callable must be callable; added unit
test
Closes #2142 from abloomston/python-callable
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/12901ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/12901ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/12901ddf
Branch: refs/heads/master
Commit: 12901ddfa9961a11feaa3f17696d19102ff8ecd0
Parents: a8bd169
Author: abloomston <ad...@cloverhealth.com>
Authored: Thu Mar 16 19:36:00 2017 -0400
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Thu Mar 16 19:36:08 2017 -0400
----------------------------------------------------------------------
airflow/operators/python_operator.py | 3 ++
tests/operators/__init__.py | 1 +
tests/operators/python_operator.py | 79 +++++++++++++++++++++++++++++++
3 files changed, 83 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12901ddf/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index b5f6386..a17e6fa 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -16,6 +16,7 @@ from builtins import str
from datetime import datetime
import logging
+from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, TaskInstance
from airflow.utils.state import State
from airflow.utils.decorators import apply_defaults
@@ -63,6 +64,8 @@ class PythonOperator(BaseOperator):
templates_exts=None,
*args, **kwargs):
super(PythonOperator, self).__init__(*args, **kwargs)
+ if not callable(python_callable):
+ raise AirflowException('`python_callable` param must be callable')
self.python_callable = python_callable
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12901ddf/tests/operators/__init__.py
----------------------------------------------------------------------
diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py
index 1fb0e5e..7a517a1 100644
--- a/tests/operators/__init__.py
+++ b/tests/operators/__init__.py
@@ -18,3 +18,4 @@ from .operators import *
from .sensors import *
from .hive_operator import *
from .s3_to_hive_operator import *
+from .python_operator import *
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12901ddf/tests/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/python_operator.py b/tests/operators/python_operator.py
new file mode 100644
index 0000000..621172f
--- /dev/null
+++ b/tests/operators/python_operator.py
@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function, unicode_literals
+
+import datetime
+import unittest
+
+from airflow import configuration, DAG
+from airflow.operators.python_operator import PythonOperator
+
+from airflow.exceptions import AirflowException
+
+DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+END_DATE = datetime.datetime(2016, 1, 2)
+INTERVAL = datetime.timedelta(hours=12)
+FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+
+
+class PythonOperatorTest(unittest.TestCase):
+
+ def setUp(self):
+ super(PythonOperatorTest, self).setUp()
+ configuration.load_test_config()
+ self.dag = DAG(
+ 'test_dag',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE},
+ schedule_interval=INTERVAL)
+ self.addCleanup(self.dag.clear)
+ self.clear_run()
+ self.addCleanup(self.clear_run)
+
+ def do_run(self):
+ self.run = True
+
+ def clear_run(self):
+ self.run = False
+
+ def is_run(self):
+ return self.run
+
+ def test_python_operator_run(self):
+ """Tests that the python callable is invoked on task run."""
+ task = PythonOperator(
+ python_callable=self.do_run,
+ task_id='python_operator',
+ dag=self.dag)
+ self.assertFalse(self.is_run())
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ self.assertTrue(self.is_run())
+
+ def test_python_operator_python_callable_is_callable(self):
+ """Tests that PythonOperator will only instantiate if
+ the python_callable argument is callable."""
+ not_callable = {}
+ with self.assertRaises(AirflowException):
+ PythonOperator(
+ python_callable=not_callable,
+ task_id='python_operator',
+ dag=self.dag)
+ not_callable = None
+ with self.assertRaises(AirflowException):
+ PythonOperator(
+ python_callable=not_callable,
+ task_id='python_operator',
+ dag=self.dag)