You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/04 16:10:30 UTC

[airflow] branch master updated: Add type annotation to providers/jenkins (#9947)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a08ed  Add type annotation to providers/jenkins (#9947)
73a08ed is described below

commit 73a08ed757bf9f2af27cfca913200b61528a2d80
Author: Cooper Gillan <co...@gmail.com>
AuthorDate: Tue Aug 4 11:09:52 2020 -0500

    Add type annotation to providers/jenkins (#9947)
    
    Part of #9708
---
 airflow/providers/jenkins/hooks/jenkins.py         |  4 +-
 .../jenkins/operators/jenkins_job_trigger.py       | 64 +++++++++++++---------
 2 files changed, 40 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/jenkins/hooks/jenkins.py b/airflow/providers/jenkins/hooks/jenkins.py
index e547390..a3910d1 100644
--- a/airflow/providers/jenkins/hooks/jenkins.py
+++ b/airflow/providers/jenkins/hooks/jenkins.py
@@ -29,7 +29,7 @@ class JenkinsHook(BaseHook):
     Hook to manage connection to jenkins server
     """
 
-    def __init__(self, conn_id='jenkins_default'):
+    def __init__(self, conn_id: str = 'jenkins_default') -> None:
         super().__init__()
         connection = self.get_connection(conn_id)
         self.connection = connection
@@ -45,7 +45,7 @@ class JenkinsHook(BaseHook):
         self.log.info('Trying to connect to %s', url)
         self.jenkins_server = jenkins.Jenkins(url, connection.login, connection.password)
 
-    def get_jenkins_server(self):
+    def get_jenkins_server(self) -> jenkins.Jenkins:
         """
         Get jenkins server
         """
diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
index 6db7688..77859bf 100644
--- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py
+++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
@@ -16,13 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import ast
 import json
 import socket
 import time
+from typing import Any, Dict, List, Mapping, Optional, Union
 from urllib.error import HTTPError, URLError
 
 import jenkins
-from jenkins import JenkinsException
+from jenkins import Jenkins, JenkinsException
 from requests import Request
 
 from airflow.exceptions import AirflowException
@@ -30,8 +32,11 @@ from airflow.models import BaseOperator
 from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
 from airflow.utils.decorators import apply_defaults
 
+JenkinsRequest = Mapping[str, Any]
+ParamType = Optional[Union[str, Dict, List]]
 
-def jenkins_request_with_headers(jenkins_server, req):
+
+def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optional[JenkinsRequest]:
     """
     We need to get the headers in addition to the body answer
     to get the location from them
@@ -56,7 +61,7 @@ def jenkins_request_with_headers(jenkins_server, req):
         # Jenkins's funky authentication means its nigh impossible to distinguish errors.
         if e.code in [401, 403, 500]:
             raise JenkinsException(
-                'Error in request. Possibly authentication failed [%s]: %s' % (e.code, e.msg)
+                'Error in request. Possibly authentication failed [%s]: %s' % (e.code, e.reason)
             )
         elif e.code == 404:
             raise jenkins.NotFoundException('Requested item could not be found')
@@ -66,6 +71,7 @@ def jenkins_request_with_headers(jenkins_server, req):
         raise jenkins.TimeoutException('Error in request: %s' % e)
     except URLError as e:
         raise JenkinsException('Error in request: %s' % e.reason)
+    return None
 
 
 class JenkinsJobTriggerOperator(BaseOperator):
@@ -79,8 +85,9 @@ class JenkinsJobTriggerOperator(BaseOperator):
     :type jenkins_connection_id: str
     :param job_name: The name of the job to trigger
     :type job_name: str
-    :param parameters: The parameters block to provide to jenkins. (templated)
-    :type parameters: str
+    :param parameters: The parameters block provided to jenkins for use in
+        the API call when triggering a build. (templated)
+    :type parameters: str, Dict, or List
     :param sleep_time: How long will the operator sleep between each status
         request for the job (min 1, default 10)
     :type sleep_time: int
@@ -94,11 +101,12 @@ class JenkinsJobTriggerOperator(BaseOperator):
 
     @apply_defaults
     def __init__(self,
-                 jenkins_connection_id,
-                 job_name,
-                 parameters="",
-                 sleep_time=10,
-                 max_try_before_job_appears=10,
+                 jenkins_connection_id: str,
+                 job_name: str,
+                 parameters: ParamType = "",
+                 sleep_time: int = 10,
+                 max_try_before_job_appears: int = 10,
+                 *args,
                  **kwargs):
         super().__init__(**kwargs)
         self.job_name = job_name
@@ -109,7 +117,9 @@ class JenkinsJobTriggerOperator(BaseOperator):
         self.jenkins_connection_id = jenkins_connection_id
         self.max_try_before_job_appears = max_try_before_job_appears
 
-    def build_job(self, jenkins_server):
+    def build_job(self,
+                  jenkins_server: Jenkins,
+                  params: ParamType = "") -> Optional[JenkinsRequest]:
         """
         This function makes an API call to Jenkins to trigger a build for 'job_name'
         It returned a dict with 2 keys : body and headers.
@@ -117,25 +127,25 @@ class JenkinsJobTriggerOperator(BaseOperator):
         the location to poll in the queue.
 
         :param jenkins_server: The jenkins server where the job should be triggered
+        :param params: The parameters block to provide to jenkins API call.
         :return: Dict containing the response body (key body)
             and the headers coming along (headers)
         """
-        # Warning if the parameter is too long, the URL can be longer than
-        # the maximum allowed size
-        if self.parameters and isinstance(self.parameters, str):
-            import ast
-            self.parameters = ast.literal_eval(self.parameters)
+        # Since params can be either JSON string, dictionary, or list,
+        # check type and pass to build_job_url
+        if params and isinstance(params, str):
+            params = ast.literal_eval(params)
 
-        if not self.parameters:
-            # We need a None to call the non parametrized jenkins api end point
-            self.parameters = None
+        # We need a None to call the non-parametrized jenkins api end point
+        if not params:
+            params = None
 
         request = Request(
             method='POST',
-            url=jenkins_server.build_job_url(self.job_name, self.parameters, None))
+            url=jenkins_server.build_job_url(self.job_name, params, None))
         return jenkins_request_with_headers(jenkins_server, request)
 
-    def poll_job_in_queue(self, location, jenkins_server):
+    def poll_job_in_queue(self, location: str, jenkins_server: Jenkins) -> int:
         """
         This method poll the jenkins queue until the job is executed.
         When we trigger a job through an API call,
@@ -170,13 +180,13 @@ class JenkinsJobTriggerOperator(BaseOperator):
         raise AirflowException("The job hasn't been executed after polling "
                                f"the queue {self.max_try_before_job_appears} times")
 
-    def get_hook(self):
+    def get_hook(self) -> JenkinsHook:
         """
         Instantiate jenkins hook
         """
         return JenkinsHook(self.jenkins_connection_id)
 
-    def execute(self, context):
+    def execute(self, context: Mapping[Any, Any]) -> Optional[str]:
         if not self.jenkins_connection_id:
             self.log.error(
                 'Please specify the jenkins connection id to use.'
@@ -194,9 +204,10 @@ class JenkinsJobTriggerOperator(BaseOperator):
             'Triggering the job %s on the jenkins : %s with the parameters : %s',
             self.job_name, self.jenkins_connection_id, self.parameters)
         jenkins_server = self.get_hook().get_jenkins_server()
-        jenkins_response = self.build_job(jenkins_server)
-        build_number = self.poll_job_in_queue(
-            jenkins_response['headers']['Location'], jenkins_server)
+        jenkins_response = self.build_job(jenkins_server, self.parameters)
+        if jenkins_response:
+            build_number = self.poll_job_in_queue(
+                jenkins_response['headers']['Location'], jenkins_server)
 
         time.sleep(self.sleep_time)
         keep_polling_job = True
@@ -234,3 +245,4 @@ class JenkinsJobTriggerOperator(BaseOperator):
             # If we can we return the url of the job
             # for later use (like retrieving an artifact)
             return build_info['url']
+        return None