You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/16 16:48:35 UTC
[8/9] incubator-airflow git commit: [AIRFLOW-683] Add jira hook,
operator and sensor
[AIRFLOW-683] Add jira hook, operator and sensor
Closes #1950 from jhsenjaliya/AIRFLOW-683
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/44798e0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/44798e0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/44798e0d
Branch: refs/heads/v1-8-test
Commit: 44798e0d4d36e6a793d19b4986f2ddb6814ec208
Parents: a8b2f7f
Author: Jay <jh...@gmail.com>
Authored: Mon Jan 16 17:46:11 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jan 16 17:46:21 2017 +0100
----------------------------------------------------------------------
airflow/contrib/hooks/jira_hook.py | 82 ++++++++++++
airflow/contrib/operators/jira_operator.py | 89 +++++++++++++
airflow/contrib/sensors/jira_sensor.py | 146 +++++++++++++++++++++
airflow/models.py | 4 +
scripts/ci/requirements.txt | 1 +
setup.py | 2 +
tests/contrib/hooks/test_jira_hook.py | 51 +++++++
tests/contrib/operators/jira_operator_test.py | 101 ++++++++++++++
tests/contrib/sensors/jira_sensor_test.py | 85 ++++++++++++
9 files changed, 561 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/contrib/hooks/jira_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py
new file mode 100644
index 0000000..148101b
--- /dev/null
+++ b/airflow/contrib/hooks/jira_hook.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from jira import JIRA
+from jira.exceptions import JIRAError
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+
+
+class JiraHook(BaseHook):
+ """
+ Jira interaction hook, a Wrapper around JIRA Python SDK.
+
+ :param jira_conn_id: reference to a pre-defined Jira Connection
+ :type jira_conn_id: string
+ """
+
+ def __init__(self,
+ jira_conn_id='jira_default'):
+ super(JiraHook, self).__init__(jira_conn_id)
+ self.jira_conn_id = jira_conn_id
+ self.client = None
+ self.get_conn()
+
+ def get_conn(self):
+ if not self.client:
+ logging.debug('creating jira client for conn_id: {0}'.format(self.jira_conn_id))
+
+ get_server_info = True
+ validate = True
+ extra_options = {}
+ conn = None
+
+ if self.jira_conn_id is not None:
+ conn = self.get_connection(self.jira_conn_id)
+ if conn.extra is not None:
+ extra_options = conn.extra_dejson
+ # only required attributes are taken for now,
+ # more can be added ex: async, logging, max_retries
+
+ # verify
+ if 'verify' in extra_options \
+ and extra_options['verify'].lower() == 'false':
+ extra_options['verify'] = False
+
+ # validate
+ if 'validate' in extra_options \
+ and extra_options['validate'].lower() == 'false':
+ validate = False
+
+ if 'get_server_info' in extra_options \
+ and extra_options['get_server_info'].lower() == 'false':
+ get_server_info = False
+
+ try:
+ self.client = JIRA(conn.host,
+ options=extra_options,
+ basic_auth=(conn.login, conn.password),
+ get_server_info=get_server_info,
+ validate=validate)
+ except JIRAError as jira_error:
+ raise AirflowException('Failed to create jira client, jira error: %s'
+ % str(jira_error))
+ except Exception as e:
+ raise AirflowException('Failed to create jira client, error: %s'
+ % str(e))
+
+ return self.client
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/contrib/operators/jira_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/jira_operator.py b/airflow/contrib/operators/jira_operator.py
new file mode 100644
index 0000000..6623b1c
--- /dev/null
+++ b/airflow/contrib/operators/jira_operator.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from airflow.contrib.hooks.jira_hook import JIRAError
+from airflow.contrib.hooks.jira_hook import JiraHook
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class JiraOperator(BaseOperator):
+ """
+ JiraOperator to interact and perform action on Jira issue tracking system.
+ This operator is designed to use Jira Python SDK: http://jira.readthedocs.io
+
+ :param jira_conn_id: reference to a pre-defined Jira Connection
+ :type jira_conn_id: str
+ :param jira_method: method name from Jira Python SDK to be called
+ :type jira_method: str
+ :param jira_method_args: required method parameters for the jira_method
+ :type jira_method_args: dict
+ :param result_processor: function to further process the response from Jira
+ :type result_processor: function
+ :param get_jira_resource_method: function or operator to get jira resource
+ on which the provided jira_method will be executed
+ :type get_jira_resource_method: function
+ """
+
+ template_fields = ("jira_method_args",)
+
+ @apply_defaults
+ def __init__(self,
+ jira_conn_id='jira_default',
+ jira_method=None,
+ jira_method_args=None,
+ result_processor=None,
+ get_jira_resource_method=None,
+ *args,
+ **kwargs):
+ super(JiraOperator, self).__init__(*args, **kwargs)
+ self.jira_conn_id = jira_conn_id
+ self.method_name = jira_method
+ self.jira_method_args = jira_method_args
+ self.result_processor = result_processor
+ self.get_jira_resource_method = get_jira_resource_method
+
+ def execute(self, context):
+ try:
+ if self.get_jira_resource_method is not None:
+ # if get_jira_resource_method is provided, jira_method will be executed on
+ # resource returned by executing the get_jira_resource_method.
+ # This makes all the provided methods of JIRA sdk accessible and usable
+ # directly at the JiraOperator without additional wrappers.
+ # ref: http://jira.readthedocs.io/en/latest/api.html
+ if isinstance(self.get_jira_resource_method, JiraOperator):
+ resource = self.get_jira_resource_method.execute(**context)
+ else:
+ resource = self.get_jira_resource_method(**context)
+ else:
+ # Default method execution is on the top level jira client resource
+ hook = JiraHook(jira_conn_id=self.jira_conn_id)
+ resource = hook.client
+
+ # Current Jira-Python SDK (1.0.7) has issue with pickling the jira response.
+ # ex: self.xcom_push(context, key='operator_response', value=jira_response)
+ # This could potentially throw error if jira_result is not picklable
+ jira_result = getattr(resource, self.method_name)(**self.jira_method_args)
+ if self.result_processor:
+ return self.result_processor(context, jira_result)
+
+ return jira_result
+
+ except JIRAError as jira_error:
+ raise AirflowException("Failed to execute jiraOperator, error: %s"
+ % str(jira_error))
+ except Exception as e:
+ raise AirflowException("Jira operator error: %s" % str(e))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
new file mode 100644
index 0000000..708caad
--- /dev/null
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from jira.resources import Resource
+
+from airflow.contrib.operators.jira_operator import JIRAError
+from airflow.contrib.operators.jira_operator import JiraOperator
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class JiraSensor(BaseSensorOperator):
+ """
+ Monitors a jira ticket for any change.
+
+ :param jira_conn_id: reference to a pre-defined Jira Connection
+ :type jira_conn_id: str
+ :param method_name: method name from jira-python-sdk to be execute
+ :type method_name: str
+ :param method_params: parameters for the method method_name
+ :type method_params: dict
+ :param result_processor: function that return boolean and act as a sensor response
+ :type result_processor: function
+ """
+
+ @apply_defaults
+ def __init__(self,
+ jira_conn_id='jira_default',
+ method_name=None,
+ method_params=None,
+ result_processor=None,
+ *args,
+ **kwargs):
+ super(JiraSensor, self).__init__(*args, **kwargs)
+ self.jira_conn_id = jira_conn_id
+ self.result_processor = None
+ if result_processor is not None:
+ self.result_processor = result_processor
+ self.method_name = method_name
+ self.method_params = method_params
+ self.jira_operator = JiraOperator(task_id=self.task_id,
+ jira_conn_id=self.jira_conn_id,
+ jira_method=self.method_name,
+ jira_method_args=self.method_params,
+ result_processor=self.result_processor)
+
+ def poke(self, context):
+ return self.jira_operator.execute(context=context)
+
+
+class JiraTicketSensor(JiraSensor):
+ """
+ Monitors a jira ticket for given change in terms of function.
+
+ :param jira_conn_id: reference to a pre-defined Jira Connection
+ :type jira_conn_id: str
+ :param ticket_id: id of the ticket to be monitored
+ :type ticket_id: str
+ :param field: field of the ticket to be monitored
+ :type field: str
+ :param expected_value: expected value of the field
+ :type expected_value: str
+ :param result_processor: function that return boolean and act as a sensor response
+ :type result_processor: function
+ """
+
+ template_fields = ("ticket_id",)
+
+ @apply_defaults
+ def __init__(self,
+ jira_conn_id='jira_default',
+ ticket_id=None,
+ field=None,
+ expected_value=None,
+ field_checker_func=None,
+ *args, **kwargs):
+
+ self.jira_conn_id = jira_conn_id
+ self.ticket_id = ticket_id
+ self.field = field
+ self.expected_value = expected_value
+ if field_checker_func is None:
+ field_checker_func = self.issue_field_checker
+
+ super(JiraTicketSensor, self).__init__(jira_conn_id=jira_conn_id,
+ result_processor=field_checker_func,
+ *args, **kwargs)
+
+ def poke(self, context):
+ logging.info('Jira Sensor checking for change in ticket : {0}'
+ .format(self.ticket_id))
+
+ self.jira_operator.method_name = "issue"
+ self.jira_operator.jira_method_args = {
+ 'id': self.ticket_id,
+ 'fields': self.field
+ }
+ return JiraSensor.poke(self, context=context)
+
+ def issue_field_checker(self, context, issue):
+ result = None
+ try:
+ if issue is not None \
+ and self.field is not None \
+ and self.expected_value is not None:
+
+ field_value = getattr(issue.fields, self.field)
+ if field_value is not None:
+ if isinstance(field_value, list):
+ result = self.expected_value in field_value
+ elif isinstance(field_value, str):
+ result = self.expected_value.lower() == field_value.lower()
+ elif isinstance(field_value, Resource) \
+ and getattr(field_value, 'name'):
+ result = self.expected_value.lower() == field_value.name.lower()
+ else:
+ logging.warning("not implemented checker for issue field {0} "
+ "which is neither string nor list nor "
+ "jira Resource".format(self.field))
+
+ except JIRAError as jira_error:
+ logging.error("jira error while checking with expected value: {0}"
+ .format(jira_error))
+ except Exception as e:
+ logging.error("error while checking with expected value {0}, error: {1}"
+ .format(self.expected_value, e))
+ if result is True:
+ logging.info("issue field {0} has expected value {1}, returning success"
+ .format(self.field, self.expected_value))
+ else:
+ logging.info("issue field {0} dont have expected value {1} yet."
+ .format(self.field, self.expected_value))
+ return result
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d878457..8682f35 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -535,6 +535,7 @@ class Connection(Base):
('cloudant', 'IBM Cloudant',),
('mssql', 'Microsoft SQL Server'),
('mesos_framework-id', 'Mesos Framework ID'),
+ ('jira', 'JIRA',),
]
def __init__(
@@ -655,6 +656,9 @@ class Connection(Base):
elif self.conn_type == 'cloudant':
from airflow.contrib.hooks.cloudant_hook import CloudantHook
return CloudantHook(cloudant_conn_id=self.conn_id)
+ elif self.conn_type == 'jira':
+ from airflow.contrib.hooks.jira_hook import JiraHook
+ return JiraHook(jira_conn_id=self.conn_id)
except:
pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 446952b..9e503f9 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -27,6 +27,7 @@ impyla
ipython
jaydebeapi
jinja2<2.9.0
+jira
ldap3
lxml
markdown
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 7acc90f..aad9984 100644
--- a/setup.py
+++ b/setup.py
@@ -127,6 +127,7 @@ gcp_api = [
]
hdfs = ['snakebite>=2.7.8']
webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']
+jira = ['JIRA>1.0.7']
hive = [
'hive-thrift-py>=0.0.1',
'pyhive>=0.1.3',
@@ -256,6 +257,7 @@ def do_setup():
'statsd': statsd,
'vertica': vertica,
'webhdfs': webhdfs,
+ 'jira': jira,
},
classifiers=[
'Development Status :: 5 - Production/Stable',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/tests/contrib/hooks/test_jira_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_jira_hook.py b/tests/contrib/hooks/test_jira_hook.py
new file mode 100644
index 0000000..1a3d735
--- /dev/null
+++ b/tests/contrib/hooks/test_jira_hook.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from mock import Mock
+from mock import patch
+
+from airflow import configuration
+from airflow.contrib.hooks.jira_hook import JiraHook
+from airflow import models
+from airflow.utils import db
+
+jira_client_mock = Mock(
+ name="jira_client"
+)
+
+
+class TestJiraHook(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ db.merge_conn(
+ models.Connection(
+ conn_id='jira_default', conn_type='jira',
+ host='https://localhost/jira/', port=443,
+ extra='{"verify": "False", "project": "AIRFLOW"}'))
+
+ @patch("airflow.contrib.hooks.jira_hook.JIRA", autospec=True,
+ return_value=jira_client_mock)
+ def test_jira_client_connection(self, jira_mock):
+ jira_hook = JiraHook()
+
+ assert jira_mock.called
+ self.assertIsInstance(jira_hook.client, Mock)
+ self.assertEqual(jira_hook.client.name, jira_mock.return_value.name)
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/tests/contrib/operators/jira_operator_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/jira_operator_test.py b/tests/contrib/operators/jira_operator_test.py
new file mode 100644
index 0000000..0188c0b
--- /dev/null
+++ b/tests/contrib/operators/jira_operator_test.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import datetime
+from mock import Mock
+from mock import patch
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.jira_operator import JiraOperator
+from airflow import models
+from airflow.utils import db
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+jira_client_mock = Mock(
+ name="jira_client_for_test"
+)
+
+minimal_test_ticket = {
+ "id": "911539",
+ "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539",
+ "key": "TEST-1226",
+ "fields": {
+ "labels": [
+ "test-label-1",
+ "test-label-2"
+ ],
+ "description": "this is a test description",
+ }
+}
+
+
+class TestJiraOperator(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ }
+ dag = DAG('test_dag_id', default_args=args)
+ self.dag = dag
+ db.merge_conn(
+ models.Connection(
+ conn_id='jira_default', conn_type='jira',
+ host='https://localhost/jira/', port=443,
+ extra='{"verify": "False", "project": "AIRFLOW"}'))
+
+ @patch("airflow.contrib.hooks.jira_hook.JIRA",
+ autospec=True, return_value=jira_client_mock)
+ def test_issue_search(self, jira_mock):
+ jql_str = 'issuekey=TEST-1226'
+ jira_mock.return_value.search_issues.return_value = minimal_test_ticket
+
+ jira_ticket_search_operator = JiraOperator(task_id='search-ticket-test',
+ jira_method="search_issues",
+ jira_method_args={
+ 'jql_str': jql_str,
+ 'maxResults': '1'
+ },
+ dag=self.dag)
+
+ jira_ticket_search_operator.run(start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ assert jira_mock.called
+ assert jira_mock.return_value.search_issues.called
+
+ @patch("airflow.contrib.hooks.jira_hook.JIRA",
+ autospec=True, return_value=jira_client_mock)
+ def test_update_issue(self, jira_mock):
+ jira_mock.return_value.add_comment.return_value = True
+
+ add_comment_operator = JiraOperator(task_id='add_comment_test',
+ jira_method="add_comment",
+ jira_method_args={
+ 'issue': minimal_test_ticket.get("key"),
+ 'body': 'this is test comment'
+ },
+ dag=self.dag)
+
+ add_comment_operator.run(start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ assert jira_mock.called
+ assert jira_mock.return_value.add_comment.called
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/tests/contrib/sensors/jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/jira_sensor_test.py b/tests/contrib/sensors/jira_sensor_test.py
new file mode 100644
index 0000000..5ca58e4
--- /dev/null
+++ b/tests/contrib/sensors/jira_sensor_test.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import datetime
+from mock import Mock
+from mock import patch
+
+from airflow import DAG, configuration
+from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
+from airflow import models
+from airflow.utils import db
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+jira_client_mock = Mock(
+ name="jira_client_for_test"
+)
+
+minimal_test_ticket = {
+ "id": "911539",
+ "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539",
+ "key": "TEST-1226",
+ "fields": {
+ "labels": [
+ "test-label-1",
+ "test-label-2"
+ ],
+ "description": "this is a test description",
+ }
+}
+
+
+class TestJiraSensor(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ }
+ dag = DAG('test_dag_id', default_args=args)
+ self.dag = dag
+ db.merge_conn(
+ models.Connection(
+ conn_id='jira_default', conn_type='jira',
+ host='https://localhost/jira/', port=443,
+ extra='{"verify": "False", "project": "AIRFLOW"}'))
+
+ @patch("airflow.contrib.hooks.jira_hook.JIRA",
+ autospec=True, return_value=jira_client_mock)
+ def test_issue_label_set(self, jira_mock):
+ jira_mock.return_value.issue.return_value = minimal_test_ticket
+
+ ticket_label_sensor = JiraTicketSensor(task_id='search-ticket-test',
+ ticket_id='TEST-1226',
+ field_checker_func=
+ TestJiraSensor.field_checker_func,
+ timeout=518400,
+ poke_interval=10,
+ dag=self.dag)
+
+ ticket_label_sensor.run(start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ assert jira_mock.called
+ assert jira_mock.return_value.issue.called
+
+ @staticmethod
+ def field_checker_func(context, issue):
+ return "test-label-1" in issue['fields']['labels']
+
+
+if __name__ == '__main__':
+ unittest.main()