You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/18 06:02:01 UTC

[jira] [Commented] (AIRFLOW-2759) Simplify proxy server based access to external platforms like Google cloud

    [ https://issues.apache.org/jira/browse/AIRFLOW-2759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723725#comment-16723725 ] 

ASF GitHub Bot commented on AIRFLOW-2759:
-----------------------------------------

stale[bot] closed pull request #3722: [AIRFLOW-2759] Add changes to extract proxy details at the base hook …
URL: https://github.com/apache/incubator-airflow/pull/3722
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 4f1f0df383..328c40108e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -173,6 +173,9 @@ killed_task_cleanup_time = 60
 # `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
 dag_run_conf_overrides_params = False
 
+# Connect via Proxy
+use_proxy = False
+
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the
@@ -635,3 +638,9 @@ in_cluster = True
 #
 # Additionally you may override worker airflow settings with the AIRFLOW__<SECTION>__<KEY>
 # formatting as supported by airflow normally.
+
+[proxy]
+# Proxy section to pass proxy related details
+proxy_type =
+proxy_host =
+proxy_port =
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 01696c6906..60e574f822 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -51,6 +51,7 @@ enable_xcom_pickling = False
 killed_task_cleanup_time = 5
 secure_mode = False
 hostname_callable = socket:getfqdn
+use_proxy = False
 
 [cli]
 api_client = airflow.api.client.local_client
@@ -123,3 +124,8 @@ hide_sensitive_variable_fields = True
 elasticsearch_host =
 elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
 elasticsearch_end_of_log_mark = end_of_log
+
+[proxy]
+proxy_type =
+proxy_host =
+proxy_port =
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 053494743f..342af2b255 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -27,7 +27,7 @@
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.utils.log.logging_mixin import LoggingMixin
-
+from airflow.configuration import conf
 
 _DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)
 
@@ -129,7 +129,8 @@ def _authorize(self):
         service hook connection.
         """
         credentials = self._get_credentials()
-        http = httplib2.Http()
+        proxy_obj = self._get_proxy_obj()
+        http = httplib2.Http(proxy_info=proxy_obj)
         authed_http = google_auth_httplib2.AuthorizedHttp(
             credentials, http=http)
         return authed_http
@@ -147,6 +148,48 @@ def _get_field(self, f, default=None):
         else:
             return default
 
+    def _get_proxy_obj(self):
+        """
+        Returns proxy object with proxy details auch as host, port and type
+        """
+        proxy_obj = None
+        if self._get_useproxy() is True:
+            proxy = self.get_proxyconfig()
+            proxy_host = proxy.get('proxy_host')
+            proxy_type = self._get_proxy_type(proxy)
+            try:
+                proxy_port = conf.getint('proxy', 'proxy_port')
+            except ValueError:
+                proxy_port = None
+            proxy_obj = httplib2.ProxyInfo(proxy_type, proxy_host, proxy_port)
+        return proxy_obj
+
+    def _get_proxy_type(self, proxy):
+        """
+        :param proxy: Proxy details fetched from configuration file
+        :return: Proxy type
+        """
+        proxy_type_dictionary = {
+            "SOCKS4": httplib2.socks.PROXY_TYPE_SOCKS4,
+            "SOCKS5": httplib2.socks.PROXY_TYPE_SOCKS5,
+            "HTTP": httplib2.socks.PROXY_TYPE_HTTP,
+            "HTTP_NO_TUNNEL": httplib2.socks.PROXY_TYPE_HTTP_NO_TUNNEL
+        }
+
+        proxy_type_from_config = proxy.get('proxy_type')
+        proxy_type = proxy_type_dictionary.get(proxy_type_from_config)
+
+        if proxy_type is None:
+            self.log.info("Proxy type does not exist returning proxy type as None")
+        return proxy_type
+
+    def _get_useproxy(self):
+        """
+        Fetch use_proxy field from config file
+        """
+        use_proxy = conf.getboolean('core', 'use_proxy')
+        return use_proxy
+
     @property
     def project_id(self):
         return self._get_field('project')
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index 103fa6260b..e614332da8 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -29,6 +29,7 @@
 from airflow.exceptions import AirflowException
 from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow import configuration
 
 CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
 
@@ -88,6 +89,10 @@ def get_hook(cls, conn_id):
         connection = cls.get_connection(conn_id)
         return connection.get_hook()
 
+    def get_proxyconfig(self):
+        proxy_config = configuration.getsection('proxy')
+        return proxy_config
+
     def get_conn(self):
         raise NotImplementedError()
 
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 45d0217e23..db58e650d8 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -127,7 +127,7 @@ def chunks(items, chunk_size):
     """
     Yield successive chunks of a given size from a list of items
     """
-    if chunk_size <= 0:
+    if (chunk_size <= 0):
         raise ValueError('Chunk size must be a positive integer')
     for i in range(0, len(items), chunk_size):
         yield items[i:i + chunk_size]
diff --git a/tests/contrib/hooks/test_gcp_api_base_hook_proxy.py b/tests/contrib/hooks/test_gcp_api_base_hook_proxy.py
new file mode 100644
index 0000000000..ea861fbb9b
--- /dev/null
+++ b/tests/contrib/hooks/test_gcp_api_base_hook_proxy.py
@@ -0,0 +1,127 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import airflow
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+path_cred = 'airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook._get_credentials'
+path_pobj = 'airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook._get_proxy_obj'
+path_usepxy = 'airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook._get_useproxy'
+path_proxy_config = 'airflow.hooks.base_hook.BaseHook.get_proxyconfig'
+path_getboolean = 'airflow.configuration.conf.getboolean'
+
+
+class TestGcpApiBaseHook(unittest.TestCase):
+
+    def setUp(self):
+        airflow.configuration.load_test_config()
+        self.hook = GoogleCloudBaseHook()
+
+    def side_effect_get_proxyconfig(self):
+        """
+        Side effect to mock proxy details
+        """
+        mock_proxy_details = {
+            'proxy_host': 'abc.com',
+            'proxy_port': 8080,
+            'proxy_type': 'HTTP_NO_TUNNEL'}
+        return mock_proxy_details
+
+    @mock.patch(path_cred)
+    @mock.patch(path_pobj)
+    def test_authorize_no_proxy_object(self, mock_get_proxy_obj, mock_get_credentials):
+        """
+        Test the creation of authed_http object when proxy object returned is None
+        """
+        mock_get_proxy_obj.return_value = None
+        mock_get_credentials.return_value = None
+        self.assertIsNotNone(self.hook._authorize())
+
+    @mock.patch(path_cred)
+    @mock.patch(path_usepxy)
+    @mock.patch(path_proxy_config)
+    def test_authorize_with_proxy_object(self, mock_pobj, mock_usep, mock_cred):
+        """
+        Test the creation of authed_http object when use_proxy is True
+        """
+        mock_usep.return_value = True
+        mock_pobj.side_effect = self.side_effect_get_proxyconfig
+        mock_cred.return_value = None
+        self.assertIsNotNone(self.hook._authorize())
+
+    @mock.patch(path_usepxy)
+    @mock.patch(path_proxy_config)
+    def test_get_proxy_obj_useproxy_true(self, mock_get_proxy_config, mock_useproxy_true):
+        """
+        Test the creation of proxy object when use_proxy is True
+        """
+        mock_useproxy_true.return_value = True
+        mock_get_proxy_config.side_effect = self.side_effect_get_proxyconfig
+        proxy_obj = self.hook._get_proxy_obj()
+        self.assertIsNotNone(proxy_obj)
+
+    @mock.patch(path_usepxy)
+    @mock.patch(path_proxy_config)
+    def test_get_proxy_obj_useproxy_false(self, mock_pconfig, mock_usep):
+        """
+        Test the scenario when use_proxy is False
+        """
+        mock_usep.return_value = False
+        mock_pconfig.side_effect = self.side_effect_get_proxyconfig
+        proxy_obj = self.hook._get_proxy_obj()
+        self.assertIsNone(proxy_obj)
+
+    @mock.patch(path_getboolean)
+    def test_get_useproxy(self, mock_getboolean_true):
+        """
+        Test use_proxy method when use_proxy configuration is True
+        """
+        mock_getboolean_true.return_value = True
+        self.assertEqual(self.hook._get_useproxy(), True)
+
+    @mock.patch(path_getboolean)
+    def test_get_useproxy_exception(self, mock_getboolean):
+        """
+        Test use_proxy method when use_proxy section is absent in Airflow configuration
+        """
+        mock_getboolean.side_effect = airflow.exceptions.AirflowConfigException
+        self.assertEqual(self.hook._get_useproxy(), False)
+
+    def test_get_proxy_type(self):
+        """
+        Test get_proxy_type mapping when proxy type matches
+        """
+        proxy_obj = {'proxy_type': "HTTP"}
+        self.assertIsNotNone(self.hook._get_proxy_type(proxy_obj))
+
+    def test_get_proxy_type_invalid(self):
+        """
+        Test get_proxy_type with invalid proxy type
+        """
+        proxy_obj = {'proxy_type': "Invalid_type"}
+        self.assertIsNone(self.hook._get_proxy_type(proxy_obj))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Simplify proxy server based access to external platforms like Google cloud 
> ---------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2759
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2759
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: hooks
>            Reporter: Aishwarya Mohan
>            Assignee: Aishwarya Mohan
>            Priority: Major
>              Labels: hooks, proxy
>
> Several companies adopt a Proxy Server based approach in order to provide an additional layer of security while communicating with external platforms to establish legitimacy of caller and calle. A potential use case would be writing logs from Airflow to a cloud storage platform like google cloud via an intermediary proxy server.
> In the current scenario the proxy details need to be hardcoded and passed to the HTTP client library(httplib2) in the GoogleCloudBaseHook class (gcp_api_base_hook.py). It would be convenient if the proxy details (for example, host and port) can be extracted from the airflow configuration file as opposed to hardcoding the details at hook level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)