You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/04 16:10:30 UTC
[airflow] branch master updated: Add type annotation to
providers/jenkins (#9947)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 73a08ed Add type annotation to providers/jenkins (#9947)
73a08ed is described below
commit 73a08ed757bf9f2af27cfca913200b61528a2d80
Author: Cooper Gillan <co...@gmail.com>
AuthorDate: Tue Aug 4 11:09:52 2020 -0500
Add type annotation to providers/jenkins (#9947)
Part of #9708
---
airflow/providers/jenkins/hooks/jenkins.py | 4 +-
.../jenkins/operators/jenkins_job_trigger.py | 64 +++++++++++++---------
2 files changed, 40 insertions(+), 28 deletions(-)
diff --git a/airflow/providers/jenkins/hooks/jenkins.py b/airflow/providers/jenkins/hooks/jenkins.py
index e547390..a3910d1 100644
--- a/airflow/providers/jenkins/hooks/jenkins.py
+++ b/airflow/providers/jenkins/hooks/jenkins.py
@@ -29,7 +29,7 @@ class JenkinsHook(BaseHook):
Hook to manage connection to jenkins server
"""
- def __init__(self, conn_id='jenkins_default'):
+ def __init__(self, conn_id: str = 'jenkins_default') -> None:
super().__init__()
connection = self.get_connection(conn_id)
self.connection = connection
@@ -45,7 +45,7 @@ class JenkinsHook(BaseHook):
self.log.info('Trying to connect to %s', url)
self.jenkins_server = jenkins.Jenkins(url, connection.login, connection.password)
- def get_jenkins_server(self):
+ def get_jenkins_server(self) -> jenkins.Jenkins:
"""
Get jenkins server
"""
diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
index 6db7688..77859bf 100644
--- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py
+++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
@@ -16,13 +16,15 @@
# specific language governing permissions and limitations
# under the License.
+import ast
import json
import socket
import time
+from typing import Any, Dict, List, Mapping, Optional, Union
from urllib.error import HTTPError, URLError
import jenkins
-from jenkins import JenkinsException
+from jenkins import Jenkins, JenkinsException
from requests import Request
from airflow.exceptions import AirflowException
@@ -30,8 +32,11 @@ from airflow.models import BaseOperator
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
from airflow.utils.decorators import apply_defaults
+JenkinsRequest = Mapping[str, Any]
+ParamType = Optional[Union[str, Dict, List]]
-def jenkins_request_with_headers(jenkins_server, req):
+
+def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optional[JenkinsRequest]:
"""
We need to get the headers in addition to the body answer
to get the location from them
@@ -56,7 +61,7 @@ def jenkins_request_with_headers(jenkins_server, req):
# Jenkins's funky authentication means its nigh impossible to distinguish errors.
if e.code in [401, 403, 500]:
raise JenkinsException(
- 'Error in request. Possibly authentication failed [%s]: %s' % (e.code, e.msg)
+ 'Error in request. Possibly authentication failed [%s]: %s' % (e.code, e.reason)
)
elif e.code == 404:
raise jenkins.NotFoundException('Requested item could not be found')
@@ -66,6 +71,7 @@ def jenkins_request_with_headers(jenkins_server, req):
raise jenkins.TimeoutException('Error in request: %s' % e)
except URLError as e:
raise JenkinsException('Error in request: %s' % e.reason)
+ return None
class JenkinsJobTriggerOperator(BaseOperator):
@@ -79,8 +85,9 @@ class JenkinsJobTriggerOperator(BaseOperator):
:type jenkins_connection_id: str
:param job_name: The name of the job to trigger
:type job_name: str
- :param parameters: The parameters block to provide to jenkins. (templated)
- :type parameters: str
+ :param parameters: The parameters block provided to jenkins for use in
+ the API call when triggering a build. (templated)
+ :type parameters: str, Dict, or List
:param sleep_time: How long will the operator sleep between each status
request for the job (min 1, default 10)
:type sleep_time: int
@@ -94,11 +101,12 @@ class JenkinsJobTriggerOperator(BaseOperator):
@apply_defaults
def __init__(self,
- jenkins_connection_id,
- job_name,
- parameters="",
- sleep_time=10,
- max_try_before_job_appears=10,
+ jenkins_connection_id: str,
+ job_name: str,
+ parameters: ParamType = "",
+ sleep_time: int = 10,
+ max_try_before_job_appears: int = 10,
+ *args,
**kwargs):
super().__init__(**kwargs)
self.job_name = job_name
@@ -109,7 +117,9 @@ class JenkinsJobTriggerOperator(BaseOperator):
self.jenkins_connection_id = jenkins_connection_id
self.max_try_before_job_appears = max_try_before_job_appears
- def build_job(self, jenkins_server):
+ def build_job(self,
+ jenkins_server: Jenkins,
+ params: ParamType = "") -> Optional[JenkinsRequest]:
"""
This function makes an API call to Jenkins to trigger a build for 'job_name'
It returned a dict with 2 keys : body and headers.
@@ -117,25 +127,25 @@ class JenkinsJobTriggerOperator(BaseOperator):
the location to poll in the queue.
:param jenkins_server: The jenkins server where the job should be triggered
+ :param params: The parameters block to provide to jenkins API call.
:return: Dict containing the response body (key body)
and the headers coming along (headers)
"""
- # Warning if the parameter is too long, the URL can be longer than
- # the maximum allowed size
- if self.parameters and isinstance(self.parameters, str):
- import ast
- self.parameters = ast.literal_eval(self.parameters)
+ # Since params can be either JSON string, dictionary, or list,
+ # check type and pass to build_job_url
+ if params and isinstance(params, str):
+ params = ast.literal_eval(params)
- if not self.parameters:
- # We need a None to call the non parametrized jenkins api end point
- self.parameters = None
+ # We need a None to call the non-parametrized jenkins api end point
+ if not params:
+ params = None
request = Request(
method='POST',
- url=jenkins_server.build_job_url(self.job_name, self.parameters, None))
+ url=jenkins_server.build_job_url(self.job_name, params, None))
return jenkins_request_with_headers(jenkins_server, request)
- def poll_job_in_queue(self, location, jenkins_server):
+ def poll_job_in_queue(self, location: str, jenkins_server: Jenkins) -> int:
"""
This method poll the jenkins queue until the job is executed.
When we trigger a job through an API call,
@@ -170,13 +180,13 @@ class JenkinsJobTriggerOperator(BaseOperator):
raise AirflowException("The job hasn't been executed after polling "
f"the queue {self.max_try_before_job_appears} times")
- def get_hook(self):
+ def get_hook(self) -> JenkinsHook:
"""
Instantiate jenkins hook
"""
return JenkinsHook(self.jenkins_connection_id)
- def execute(self, context):
+ def execute(self, context: Mapping[Any, Any]) -> Optional[str]:
if not self.jenkins_connection_id:
self.log.error(
'Please specify the jenkins connection id to use.'
@@ -194,9 +204,10 @@ class JenkinsJobTriggerOperator(BaseOperator):
'Triggering the job %s on the jenkins : %s with the parameters : %s',
self.job_name, self.jenkins_connection_id, self.parameters)
jenkins_server = self.get_hook().get_jenkins_server()
- jenkins_response = self.build_job(jenkins_server)
- build_number = self.poll_job_in_queue(
- jenkins_response['headers']['Location'], jenkins_server)
+ jenkins_response = self.build_job(jenkins_server, self.parameters)
+ if jenkins_response:
+ build_number = self.poll_job_in_queue(
+ jenkins_response['headers']['Location'], jenkins_server)
time.sleep(self.sleep_time)
keep_polling_job = True
@@ -234,3 +245,4 @@ class JenkinsJobTriggerOperator(BaseOperator):
# If we can we return the url of the job
# for later use (like retrieving an artifact)
return build_info['url']
+ return None