You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/28 12:36:31 UTC

[GitHub] bolkedebruin closed pull request #3764: [AIRFLOW-2916] Arg `verify` for AwsHook() & S3 sensors/operators

bolkedebruin closed pull request #3764: [AIRFLOW-2916] Arg `verify` for AwsHook() & S3 sensors/operators
URL: https://github.com/apache/incubator-airflow/pull/3764
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/aws_hook.py b/airflow/contrib/hooks/aws_hook.py
index 8ca1f3d744..448de63ffe 100644
--- a/airflow/contrib/hooks/aws_hook.py
+++ b/airflow/contrib/hooks/aws_hook.py
@@ -84,8 +84,9 @@ class AwsHook(BaseHook):
     This class is a thin wrapper around the boto3 python library.
     """
 
-    def __init__(self, aws_conn_id='aws_default'):
+    def __init__(self, aws_conn_id='aws_default', verify=None):
         self.aws_conn_id = aws_conn_id
+        self.verify = verify
 
     def _get_credentials(self, region_name):
         aws_access_key_id = None
@@ -162,12 +163,14 @@ def _get_credentials(self, region_name):
     def get_client_type(self, client_type, region_name=None):
         session, endpoint_url = self._get_credentials(region_name)
 
-        return session.client(client_type, endpoint_url=endpoint_url)
+        return session.client(client_type, endpoint_url=endpoint_url,
+                              verify=self.verify)
 
     def get_resource_type(self, resource_type, region_name=None):
         session, endpoint_url = self._get_credentials(region_name)
 
-        return session.resource(resource_type, endpoint_url=endpoint_url)
+        return session.resource(resource_type, endpoint_url=endpoint_url,
+                                verify=self.verify)
 
     def get_session(self, region_name=None):
         """Get the underlying boto3.session."""
diff --git a/airflow/contrib/operators/gcs_to_s3.py b/airflow/contrib/operators/gcs_to_s3.py
index a87aa3af5c..0df6170eab 100644
--- a/airflow/contrib/operators/gcs_to_s3.py
+++ b/airflow/contrib/operators/gcs_to_s3.py
@@ -47,6 +47,16 @@ class GoogleCloudStorageToS3Operator(GoogleCloudStorageListOperator):
     :type dest_aws_conn_id: str
     :param dest_s3_key: The base S3 key to be used to store the files. (templated)
     :type dest_s3_key: str
+    :parame dest_verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type dest_verify: bool or str
     """
     template_fields = ('bucket', 'prefix', 'delimiter', 'dest_s3_key')
     ui_color = '#f0eee4'
@@ -60,6 +70,7 @@ def __init__(self,
                  delegate_to=None,
                  dest_aws_conn_id=None,
                  dest_s3_key=None,
+                 dest_verify=None,
                  replace=False,
                  *args,
                  **kwargs):
@@ -75,12 +86,13 @@ def __init__(self,
         )
         self.dest_aws_conn_id = dest_aws_conn_id
         self.dest_s3_key = dest_s3_key
+        self.dest_verify = dest_verify
         self.replace = replace
 
     def execute(self, context):
         # use the super to list all files in an Google Cloud Storage bucket
         files = super(GoogleCloudStorageToS3Operator, self).execute(context)
-        s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id)
+        s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify)
 
         if not self.replace:
             # if we are not replacing -> list all files in the S3 bucket
diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py
index b85691b005..a9e005eed3 100644
--- a/airflow/contrib/operators/s3_list_operator.py
+++ b/airflow/contrib/operators/s3_list_operator.py
@@ -38,6 +38,16 @@ class S3ListOperator(BaseOperator):
     :type delimiter: string
     :param aws_conn_id: The connection ID to use when connecting to S3 storage.
     :type aws_conn_id: string
+    :parame verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type verify: bool or str
 
     **Example**:
         The following operator would list all the files
@@ -61,6 +71,7 @@ def __init__(self,
                  prefix='',
                  delimiter='',
                  aws_conn_id='aws_default',
+                 verify=None,
                  *args,
                  **kwargs):
         super(S3ListOperator, self).__init__(*args, **kwargs)
@@ -68,9 +79,10 @@ def __init__(self,
         self.prefix = prefix
         self.delimiter = delimiter
         self.aws_conn_id = aws_conn_id
+        self.verify = verify
 
     def execute(self, context):
-        hook = S3Hook(aws_conn_id=self.aws_conn_id)
+        hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
 
         self.log.info(
             'Getting the list of files from bucket: {0} in prefix: {1} (Delimiter {2})'.
diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py
index 64d7dc7cab..81c48a9e15 100644
--- a/airflow/contrib/operators/s3_to_gcs_operator.py
+++ b/airflow/contrib/operators/s3_to_gcs_operator.py
@@ -41,6 +41,16 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator):
     :type delimiter: string
     :param aws_conn_id: The source S3 connection
     :type aws_conn_id: string
+    :parame verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type verify: bool or str
     :param dest_gcs_conn_id: The destination connection ID to use
         when connecting to Google Cloud Storage.
     :type dest_gcs_conn_id: string
@@ -80,6 +90,7 @@ def __init__(self,
                  prefix='',
                  delimiter='',
                  aws_conn_id='aws_default',
+                 verify=None,
                  dest_gcs_conn_id=None,
                  dest_gcs=None,
                  delegate_to=None,
@@ -98,6 +109,7 @@ def __init__(self,
         self.dest_gcs = dest_gcs
         self.delegate_to = delegate_to
         self.replace = replace
+        self.verify = verify
 
         if dest_gcs and not self._gcs_object_is_directory(self.dest_gcs):
             self.log.info(
@@ -146,7 +158,7 @@ def execute(self, context):
                     'There are no new files to sync. Have a nice day!')
 
         if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id)
+            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
 
             for file in files:
                 # GCS hook builds its own in-memory file so we have to create
diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py
index 9c1b621dae..e6682c78df 100644
--- a/airflow/operators/redshift_to_s3_operator.py
+++ b/airflow/operators/redshift_to_s3_operator.py
@@ -39,6 +39,16 @@ class RedshiftToS3Transfer(BaseOperator):
     :type redshift_conn_id: string
     :param aws_conn_id: reference to a specific S3 connection
     :type aws_conn_id: string
+    :parame verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type verify: bool or str
     :param unload_options: reference to a list of UNLOAD options
     :type unload_options: list
     """
@@ -56,6 +66,7 @@ def __init__(
             s3_key,
             redshift_conn_id='redshift_default',
             aws_conn_id='aws_default',
+            verify=None,
             unload_options=tuple(),
             autocommit=False,
             parameters=None,
@@ -68,6 +79,7 @@ def __init__(
         self.s3_key = s3_key
         self.redshift_conn_id = redshift_conn_id
         self.aws_conn_id = aws_conn_id
+        self.verify = verify
         self.unload_options = unload_options
         self.autocommit = autocommit
         self.parameters = parameters
@@ -79,7 +91,7 @@ def __init__(
 
     def execute(self, context):
         self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
-        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id)
+        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
         credentials = self.s3.get_credentials()
         unload_options = '\n\t\t\t'.join(self.unload_options)
 
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index 84a6eda0c8..2e4028298c 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -47,6 +47,17 @@ class S3FileTransformOperator(BaseOperator):
     :type source_s3_key: str
     :param source_aws_conn_id: source s3 connection
     :type source_aws_conn_id: str
+    :parame source_verify: Whether or not to verify SSL certificates for S3 connetion.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+        This is also applicable to ``dest_verify``.
+    :type source_verify: bool or str
     :param dest_s3_key: The key to be written from S3. (templated)
     :type dest_s3_key: str
     :param dest_aws_conn_id: destination s3 connection
@@ -71,14 +82,18 @@ def __init__(
             transform_script=None,
             select_expression=None,
             source_aws_conn_id='aws_default',
+            source_verify=None,
             dest_aws_conn_id='aws_default',
+            dest_verify=None,
             replace=False,
             *args, **kwargs):
         super(S3FileTransformOperator, self).__init__(*args, **kwargs)
         self.source_s3_key = source_s3_key
         self.source_aws_conn_id = source_aws_conn_id
+        self.source_verify = source_verify
         self.dest_s3_key = dest_s3_key
         self.dest_aws_conn_id = dest_aws_conn_id
+        self.dest_verify = dest_verify
         self.replace = replace
         self.transform_script = transform_script
         self.select_expression = select_expression
@@ -88,8 +103,10 @@ def execute(self, context):
             raise AirflowException(
                 "Either transform_script or select_expression must be specified")
 
-        source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id)
-        dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id)
+        source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id,
+                           verify=self.source_verify)
+        dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id,
+                         verify=self.dest_verify)
 
         self.log.info("Downloading source S3 file %s", self.source_s3_key)
         if not source_s3.check_for_key(self.source_s3_key):
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index b82ebce6fa..85f05325f6 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -78,6 +78,16 @@ class S3ToHiveTransfer(BaseOperator):
     :type delimiter: str
     :param aws_conn_id: source s3 connection
     :type aws_conn_id: str
+    :parame verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type verify: bool or str
     :param hive_cli_conn_id: destination hive connection
     :type hive_cli_conn_id: str
     :param input_compressed: Boolean to determine if file decompression is
@@ -107,6 +117,7 @@ def __init__(
             check_headers=False,
             wildcard_match=False,
             aws_conn_id='aws_default',
+            verify=None,
             hive_cli_conn_id='hive_cli_default',
             input_compressed=False,
             tblproperties=None,
@@ -125,6 +136,7 @@ def __init__(
         self.wildcard_match = wildcard_match
         self.hive_cli_conn_id = hive_cli_conn_id
         self.aws_conn_id = aws_conn_id
+        self.verify = verify
         self.input_compressed = input_compressed
         self.tblproperties = tblproperties
         self.select_expression = select_expression
@@ -136,7 +148,7 @@ def __init__(
 
     def execute(self, context):
         # Downloading file from S3
-        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id)
+        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
         self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         self.log.info("Downloading S3 file")
 
diff --git a/airflow/operators/s3_to_redshift_operator.py b/airflow/operators/s3_to_redshift_operator.py
index 0d7921e9ed..8c83f44372 100644
--- a/airflow/operators/s3_to_redshift_operator.py
+++ b/airflow/operators/s3_to_redshift_operator.py
@@ -39,6 +39,16 @@ class S3ToRedshiftTransfer(BaseOperator):
     :type redshift_conn_id: string
     :param aws_conn_id: reference to a specific S3 connection
     :type aws_conn_id: string
+    :parame verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type verify: bool or str
     :param copy_options: reference to a list of COPY options
     :type copy_options: list
     """
@@ -56,6 +66,7 @@ def __init__(
             s3_key,
             redshift_conn_id='redshift_default',
             aws_conn_id='aws_default',
+            verify=None,
             copy_options=tuple(),
             autocommit=False,
             parameters=None,
@@ -67,13 +78,14 @@ def __init__(
         self.s3_key = s3_key
         self.redshift_conn_id = redshift_conn_id
         self.aws_conn_id = aws_conn_id
+        self.verify = verify
         self.copy_options = copy_options
         self.autocommit = autocommit
         self.parameters = parameters
 
     def execute(self, context):
         self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
-        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id)
+        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
         credentials = self.s3.get_credentials()
         copy_options = '\n\t\t\t'.join(self.copy_options)
 
diff --git a/airflow/sensors/s3_key_sensor.py b/airflow/sensors/s3_key_sensor.py
index 246b4c3e73..403adb0c80 100644
--- a/airflow/sensors/s3_key_sensor.py
+++ b/airflow/sensors/s3_key_sensor.py
@@ -43,6 +43,16 @@ class S3KeySensor(BaseSensorOperator):
     :type wildcard_match: bool
     :param aws_conn_id: a reference to the s3 connection
     :type aws_conn_id: str
+    :parame verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type verify: bool or str
     """
     template_fields = ('bucket_key', 'bucket_name')
 
@@ -52,6 +62,7 @@ def __init__(self,
                  bucket_name=None,
                  wildcard_match=False,
                  aws_conn_id='aws_default',
+                 verify=None,
                  *args,
                  **kwargs):
         super(S3KeySensor, self).__init__(*args, **kwargs)
@@ -76,10 +87,11 @@ def __init__(self,
         self.bucket_key = bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
+        self.verify = verify
 
     def poke(self, context):
         from airflow.hooks.S3_hook import S3Hook
-        hook = S3Hook(aws_conn_id=self.aws_conn_id)
+        hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
         full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
         self.log.info('Poking for key : {full_url}'.format(**locals()))
         if self.wildcard_match:
diff --git a/airflow/sensors/s3_prefix_sensor.py b/airflow/sensors/s3_prefix_sensor.py
index 917dd46e26..559e5a1348 100644
--- a/airflow/sensors/s3_prefix_sensor.py
+++ b/airflow/sensors/s3_prefix_sensor.py
@@ -38,6 +38,18 @@ class S3PrefixSensor(BaseSensorOperator):
     :param delimiter: The delimiter intended to show hierarchy.
         Defaults to '/'.
     :type delimiter: str
+    :param aws_conn_id: a reference to the s3 connection
+    :type aws_conn_id: str
+    :parame verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+        You can provide the following values:
+        - False: do not validate SSL certificates. SSL will still be used
+                 (unless use_ssl is False), but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+    :type verify: bool or str
     """
     template_fields = ('prefix', 'bucket_name')
 
@@ -47,6 +59,7 @@ def __init__(self,
                  prefix,
                  delimiter='/',
                  aws_conn_id='aws_default',
+                 verify=None,
                  *args,
                  **kwargs):
         super(S3PrefixSensor, self).__init__(*args, **kwargs)
@@ -56,12 +69,13 @@ def __init__(self,
         self.delimiter = delimiter
         self.full_url = "s3://" + bucket_name + '/' + prefix
         self.aws_conn_id = aws_conn_id
+        self.verify = verify
 
     def poke(self, context):
         self.log.info('Poking for prefix : {self.prefix}\n'
                       'in bucket s3://{self.bucket_name}'.format(**locals()))
         from airflow.hooks.S3_hook import S3Hook
-        hook = S3Hook(aws_conn_id=self.aws_conn_id)
+        hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
         return hook.check_for_prefix(
             prefix=self.prefix,
             delimiter=self.delimiter,
diff --git a/tests/contrib/operators/test_s3_to_gcs_operator.py b/tests/contrib/operators/test_s3_to_gcs_operator.py
index 807882c324..97d6eae916 100644
--- a/tests/contrib/operators/test_s3_to_gcs_operator.py
+++ b/tests/contrib/operators/test_s3_to_gcs_operator.py
@@ -88,8 +88,8 @@ def _assert_upload(bucket, object, tmp_filename):
 
         uploaded_files = operator.execute(None)
 
-        s3_one_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
-        s3_two_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
+        s3_one_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID, verify=None)
+        s3_two_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID, verify=None)
         gcs_mock_hook.assert_called_once_with(
             google_cloud_storage_conn_id=GCS_CONN_ID, delegate_to=None)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services