You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/21 23:32:13 UTC
[impala] 01/02: IMPALA-8682: Add authorized proxy user/group test
coverage with Ranger
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit cad97be479bc2c7256ad927beb7f24859dafac0e
Author: Fredy Wijaya <fw...@cloudera.com>
AuthorDate: Wed Jun 19 11:23:11 2019 -0700
IMPALA-8682: Add authorized proxy user/group test coverage with Ranger
This patch adds a test coverage for authorized proxy user/group with
Ranger. This patch also moves the authorized proxy tests into a separate
file, test_authorized_proxy and refactors the tests to be more readable
and reusable.
Testing:
- Added a new test_authorized_proxy.py
- Ran all E2E authorization tests
Change-Id: If6f797600720e6432b85cac8f13afe8fa5624596
Reviewed-on: http://gerrit.cloudera.org:8080/13679
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
tests/authorization/test_authorization.py | 145 --------------
tests/authorization/test_authorized_proxy.py | 275 +++++++++++++++++++++++++++
2 files changed, 275 insertions(+), 145 deletions(-)
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index f8ff36f..0236699 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -19,16 +19,13 @@
import os
import pytest
-import shutil
import tempfile
-import json
import grp
import re
import sys
import subprocess
import urllib
-from time import sleep, time
from getpass import getuser
from ImpalaService import ImpalaHiveServer2Service
from TCLIService import TCLIService
@@ -38,16 +35,12 @@ from thrift.protocol import TBinaryProtocol
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import assert_file_in_dir_contains,\
assert_no_files_in_dir_contain
-from tests.hs2.hs2_test_suite import operation_id_to_query_id
-from tests.util.filesystem_utils import WAREHOUSE
SENTRY_CONFIG_DIR = os.getenv('IMPALA_HOME') + '/fe/src/test/resources/'
SENTRY_CONFIG_FILE = SENTRY_CONFIG_DIR + 'sentry-site.xml'
class TestAuthorization(CustomClusterTestSuite):
- AUDIT_LOG_DIR = tempfile.mkdtemp(dir=os.getenv('LOG_DIR'))
-
def setup(self):
host, port = (self.cluster.impalads[0].service.hostname,
self.cluster.impalads[0].service.hs2_port)
@@ -60,7 +53,6 @@ class TestAuthorization(CustomClusterTestSuite):
def teardown(self):
if self.socket:
self.socket.close()
- shutil.rmtree(self.AUDIT_LOG_DIR, ignore_errors=True)
def __execute_hs2_stmt(self, statement, verify=True):
"""
@@ -200,143 +192,6 @@ class TestAuthorization(CustomClusterTestSuite):
self.__execute_hs2_stmt("drop database if exists {0}".format(unique_db))
self.__execute_hs2_stmt("drop role {0}".format(unique_role))
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args(
- impalad_args="--server_name=server1 "
- "--authorized_proxy_user_config=foo=bar;hue={0} "
- "--abort_on_failed_audit_event=false "
- "--audit_event_log_dir={1}"
- .format(getuser(), AUDIT_LOG_DIR),
- catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE,
- sentry_config=SENTRY_CONFIG_FILE)
- def test_user_impersonation(self, unique_role):
- """End-to-end user impersonation + authorization test"""
- self.__test_impersonation(unique_role)
-
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args(
- impalad_args="--server_name=server1 "
- "--authorized_proxy_user_config=hue=bar "
- "--authorized_proxy_group_config=foo=bar;hue={0} "
- "--abort_on_failed_audit_event=false "
- "--audit_event_log_dir={1}"
- .format(grp.getgrgid(os.getgid()).gr_name, AUDIT_LOG_DIR),
- catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE,
- sentry_config=SENTRY_CONFIG_FILE)
- def test_group_impersonation(self, unique_role):
- """End-to-end group impersonation + authorization test"""
- self.__test_impersonation(unique_role)
-
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args("--server_name=server1\
- --authorized_proxy_user_config=foo=bar\
- --authorized_proxy_group_config=foo=bar\
- --abort_on_failed_audit_event=false\
- --audit_event_log_dir=%s" % (AUDIT_LOG_DIR))
- def test_no_matching_user_and_group_impersonation(self):
- open_session_req = TCLIService.TOpenSessionReq()
- open_session_req.username = 'hue'
- open_session_req.configuration = dict()
- open_session_req.configuration['impala.doas.user'] = 'abc'
- resp = self.hs2_client.OpenSession(open_session_req)
- assert 'User \'hue\' is not authorized to delegate to \'abc\'' in str(resp)
-
- def __test_impersonation(self, role):
- """End-to-end impersonation + authorization test. Expects authorization to be
- configured before running this test"""
- # TODO: To reuse the HS2 utility code from the TestHS2 test suite we need to import
- # the module within this test function, rather than as a top-level import. This way
- # the tests in that module will not get pulled when executing this test suite. The fix
- # is to split the utility code out of the TestHS2 class and support HS2 as a first
- # class citizen in our test framework.
- from tests.hs2.test_hs2 import TestHS2
-
- try:
- self.session_handle = self.__open_hs2(getuser(), dict()).sessionHandle
- self.__execute_hs2_stmt("create role {0}".format(role))
- self.__execute_hs2_stmt("grant all on table tpch.lineitem to role {0}"
- .format(role))
- self.__execute_hs2_stmt("grant role {0} to group {1}"
- .format(role, grp.getgrnam(getuser()).gr_name))
- self.__execute_hs2_stmt("grant role {0} to group {1}"
- .format(role, grp.getgrgid(os.getgid()).gr_name))
-
- # Try to query a table we are not authorized to access
- self.session_handle = self.__open_hs2('hue',
- {'impala.doas.user': getuser()}).sessionHandle
- bad_resp = self.__execute_hs2_stmt("describe tpch_seq.lineitem", False)
- assert 'User \'%s\' does not have privileges to access' % getuser() in\
- str(bad_resp)
-
- assert self.__wait_for_audit_record(user=getuser(), impersonator='hue'),\
- 'No matching audit event recorded in time window'
-
- # Now try the same operation on a table we are authorized to access.
- good_resp = self.__execute_hs2_stmt("describe tpch.lineitem")
- TestHS2.check_response(good_resp)
-
- # Verify the correct user information is in the runtime profile
- query_id = operation_id_to_query_id(
- good_resp.operationHandle.operationId)
- profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
- self.__verify_profile_user_fields(profile_page, effective_user=getuser(),
- delegated_user=getuser(), connected_user='hue')
-
- # Try to user we are not authorized to delegate to.
- resp = self.__open_hs2('hue', {'impala.doas.user': 'some_user'}, False)
- assert 'User \'hue\' is not authorized to delegate to \'some_user\'' in str(resp)
-
- # Create a new session which does not have a do_as_user and run a simple query.
- self.session_handle = self.__open_hs2('hue', dict()).sessionHandle
- resp = self.__execute_hs2_stmt("select 1")
-
- # Verify the correct user information is in the runtime profile. Since there is
- # no do_as_user the Delegated User field should be empty.
- query_id = operation_id_to_query_id(resp.operationHandle.operationId)
-
- profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
- self.__verify_profile_user_fields(profile_page, effective_user='hue',
- delegated_user='', connected_user='hue')
- finally:
- self.session_handle = self.__open_hs2(getuser(), dict()).sessionHandle
- self.__execute_hs2_stmt("grant all on server to role {0}".format(role))
- self.__execute_hs2_stmt("grant role {0} to group {1}"
- .format(role, grp.getgrnam(getuser()).gr_name))
- self.__execute_hs2_stmt("drop role {0}".format(role))
-
- def __verify_profile_user_fields(self, profile_str, effective_user, connected_user,
- delegated_user):
- """Verifies the given runtime profile string contains the specified values for
- User, Connected User, and Delegated User"""
- assert '\n User: %s\n' % effective_user in profile_str
- assert '\n Connected User: %s\n' % connected_user in profile_str
- assert '\n Delegated User: %s\n' % delegated_user in profile_str
-
- def __wait_for_audit_record(self, user, impersonator, timeout_secs=30):
- """Waits until an audit log record is found that contains the given user and
- impersonator, or until the timeout is reached.
- """
- # The audit event might not show up immediately (the audit logs are flushed to disk
- # on regular intervals), so poll the audit event logs until a matching record is
- # found.
- start_time = time()
- while time() - start_time < timeout_secs:
- for audit_file_name in os.listdir(self.AUDIT_LOG_DIR):
- if self.__find_matching_audit_record(audit_file_name, user, impersonator):
- return True
- sleep(1)
- return False
-
- def __find_matching_audit_record(self, audit_file_name, user, impersonator):
- with open(os.path.join(self.AUDIT_LOG_DIR, audit_file_name)) as audit_log_file:
- for line in audit_log_file.readlines():
- json_dict = json.loads(line)
- if len(json_dict) == 0: continue
- if json_dict[min(json_dict)]['user'] == user and\
- json_dict[min(json_dict)]['impersonator'] == impersonator:
- return True
- return False
-
def __run_stmt_and_verify_profile_access(self, stmt, has_access, close_operation):
"""Runs 'stmt' and retrieves the runtime profile and exec summary. If
'has_access' is true, it verifies that no runtime profile or exec summary are
diff --git a/tests/authorization/test_authorized_proxy.py b/tests/authorization/test_authorized_proxy.py
new file mode 100644
index 0000000..5529954
--- /dev/null
+++ b/tests/authorization/test_authorized_proxy.py
@@ -0,0 +1,275 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import os
+import grp
+import time
+import json
+import tempfile
+import shutil
+
+from getpass import getuser
+from ImpalaService import ImpalaHiveServer2Service
+from TCLIService import TCLIService
+from thrift.transport.TSocket import TSocket
+from thrift.transport.TTransport import TBufferedTransport
+from thrift.protocol import TBinaryProtocol
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.hs2.hs2_test_suite import operation_id_to_query_id
+
+AUDIT_LOG_DIR = tempfile.mkdtemp(dir=os.getenv("LOG_DIR"))
+
+SENTRY_CONFIG_FILE = "{0}/fe/src/test/resources/sentry-site.xml" \
+ .format(os.getenv("IMPALA_HOME"))
+SENTRY_IMPALAD_ARGS = "--server-name=server1 " \
+ "--abort_on_failed_audit_event=false " \
+ "--audit_event_log_dir={0}".format(AUDIT_LOG_DIR)
+SENTRY_CATALOGD_ARGS = "--sentry_config={0}".format(SENTRY_CONFIG_FILE)
+
+RANGER_IMPALAD_ARGS = "--server-name=server1 " \
+ "--ranger_service_type=hive " \
+ "--ranger_app_id=impala " \
+ "--authorization_provider=ranger " \
+ "--abort_on_failed_audit_event=false " \
+ "--audit_event_log_dir={0}".format(AUDIT_LOG_DIR)
+RANGER_CATALOGD_ARGS = "--server-name=server1 " \
+ "--ranger_service_type=hive " \
+ "--ranger_app_id=impala " \
+ "--authorization_provider=ranger"
+RANGER_ADMIN_USER = "admin"
+
+
+class TestAuthorizedProxy(CustomClusterTestSuite):
+ def setup(self):
+ host, port = (self.cluster.impalads[0].service.hostname,
+ self.cluster.impalads[0].service.hs2_port)
+ self.socket = TSocket(host, port)
+ self.transport = TBufferedTransport(self.socket)
+ self.transport.open()
+ self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
+ self.hs2_client = ImpalaHiveServer2Service.Client(self.protocol)
+
+ def teardown(self):
+ if self.socket:
+ self.socket.close()
+ shutil.rmtree(AUDIT_LOG_DIR, ignore_errors=True)
+
+ def _execute_hs2_stmt(self, statement, verify=True):
+ """
+ Executes an hs2 statement
+
+ :param statement: the statement to execute
+ :param verify: If set to true, will thrown an exception on a failed hs2 execution
+ :return: the result of execution
+ """
+ from tests.hs2.test_hs2 import TestHS2
+ execute_statement_req = TCLIService.TExecuteStatementReq()
+ execute_statement_req.sessionHandle = self.session_handle
+ execute_statement_req.statement = statement
+ result = self.hs2_client.ExecuteStatement(execute_statement_req)
+ if verify:
+ TestHS2.check_response(result)
+ return result
+
+ def _open_hs2(self, user, configuration, verify=True):
+ """
+ Open a session with hs2
+
+ :param user: the user to open the session
+ :param configuration: the configuration for the session
+ :param verify: If set to true, will thrown an exception on failed session open
+ :return: the result of opening the session
+ """
+ from tests.hs2.test_hs2 import TestHS2
+ open_session_req = TCLIService.TOpenSessionReq()
+ open_session_req.username = user
+ open_session_req.configuration = configuration
+ resp = self.hs2_client.OpenSession(open_session_req)
+ if verify:
+ TestHS2.check_response(resp)
+ return resp
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="{0} --authorized_proxy_user_config=foo=bar;hue={1} "
+ .format(SENTRY_IMPALAD_ARGS, getuser()),
+ catalogd_args=SENTRY_CATALOGD_ARGS)
+ def test_authorized_proxy_user_with_sentry(self, unique_role):
+ """Tests authorized proxy user with Sentry using HS2."""
+ self._test_authorized_proxy_with_sentry(unique_role, self._test_authorized_proxy)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="{0} --authorized_proxy_user_config=foo=bar;hue={1} "
+ .format(RANGER_IMPALAD_ARGS, getuser()),
+ catalogd_args=RANGER_CATALOGD_ARGS)
+ def test_authorized_proxy_user_with_ranger(self):
+ """Tests authorized proxy user with Ranger using HS2."""
+ self._test_authorized_proxy_with_ranger(self._test_authorized_proxy)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="{0} --authorized_proxy_user_config=hue=bar "
+ "--authorized_proxy_group_config=foo=bar;hue={1}"
+ .format(SENTRY_IMPALAD_ARGS, grp.getgrgid(os.getgid()).gr_name),
+ catalogd_args=SENTRY_CATALOGD_ARGS)
+ def test_authorized_proxy_group_with_sentry(self, unique_role):
+ """Tests authorized proxy group with Sentry using HS2."""
+ self._test_authorized_proxy_with_sentry(unique_role, self._test_authorized_proxy)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="{0} --authorized_proxy_user_config=hue=bar "
+ "--authorized_proxy_group_config=foo=bar;hue={1}"
+ .format(RANGER_IMPALAD_ARGS, grp.getgrgid(os.getgid()).gr_name),
+ catalogd_args=RANGER_CATALOGD_ARGS)
+ def test_authorized_proxy_group_with_ranger(self):
+ """Tests authorized proxy group with Ranger using HS2."""
+ self._test_authorized_proxy_with_ranger(self._test_authorized_proxy)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="{0} --authorized_proxy_user_config=foo=bar "
+ "--authorized_proxy_group_config=foo=bar".format(SENTRY_IMPALAD_ARGS),
+ catalogd_args=SENTRY_CATALOGD_ARGS)
+ def test_no_matching_user_and_group_authorized_proxy_with_sentry(self):
+ self._test_no_matching_user_and_group_authorized_proxy()
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="{0} --authorized_proxy_user_config=foo=bar "
+ "--authorized_proxy_group_config=foo=bar".format(RANGER_IMPALAD_ARGS),
+ catalogd_args=RANGER_CATALOGD_ARGS)
+ def test_no_matching_user_and_group_authorized_proxy_with_ranger(self):
+ self._test_no_matching_user_and_group_authorized_proxy()
+
+ def _test_no_matching_user_and_group_authorized_proxy(self):
+ open_session_req = TCLIService.TOpenSessionReq()
+ open_session_req.username = "hue"
+ open_session_req.configuration = dict()
+ open_session_req.configuration["impala.doas.user"] = "abc"
+ resp = self.hs2_client.OpenSession(open_session_req)
+ assert "User 'hue' is not authorized to delegate to 'abc'" in str(resp)
+
+ def _test_authorized_proxy_with_sentry(self, role, test_func):
+ try:
+ self.session_handle = self._open_hs2(getuser(), dict()).sessionHandle
+ self._execute_hs2_stmt("create role {0}".format(role))
+ self._execute_hs2_stmt("grant all on table tpch.lineitem to role {0}"
+ .format(role))
+ self._execute_hs2_stmt("grant role {0} to group {1}"
+ .format(role, grp.getgrnam(getuser()).gr_name))
+ self._execute_hs2_stmt("grant role {0} to group {1}"
+ .format(role, grp.getgrgid(os.getgid()).gr_name))
+ test_func()
+ finally:
+ self.session_handle = self._open_hs2(getuser(), dict()).sessionHandle
+ self._execute_hs2_stmt("grant all on server to role {0}".format(role))
+ self._execute_hs2_stmt("grant role {0} to group {1}"
+ .format(role, grp.getgrnam(getuser()).gr_name))
+ self._execute_hs2_stmt("drop role {0}".format(role))
+
+ def _test_authorized_proxy_with_ranger(self, test_func):
+ try:
+ self.session_handle = self._open_hs2(RANGER_ADMIN_USER, dict()).sessionHandle
+ self._execute_hs2_stmt("grant all on table tpch.lineitem to user {0}"
+ .format(getuser()))
+ test_func()
+ finally:
+ self.session_handle = self._open_hs2(RANGER_ADMIN_USER, dict()).sessionHandle
+ self._execute_hs2_stmt("revoke all on table tpch.lineitem from user {0}"
+ .format(getuser()))
+
+ def _test_authorized_proxy(self):
+ """End-to-end impersonation + authorization test. Expects authorization to be
+ configured before running this test"""
+ # TODO: To reuse the HS2 utility code from the TestHS2 test suite we need to import
+ # the module within this test function, rather than as a top-level import. This way
+ # the tests in that module will not get pulled when executing this test suite. The fix
+ # is to split the utility code out of the TestHS2 class and support HS2 as a first
+ # class citizen in our test framework.
+ from tests.hs2.test_hs2 import TestHS2
+
+ # Try to query a table we are not authorized to access.
+ self.session_handle = self._open_hs2("hue",
+ {"impala.doas.user": getuser()}).sessionHandle
+ bad_resp = self._execute_hs2_stmt("describe tpch_seq.lineitem", False)
+ assert "User '%s' does not have privileges to access" % getuser() in \
+ str(bad_resp)
+
+ assert self._wait_for_audit_record(user=getuser(), impersonator="hue"), \
+ "No matching audit event recorded in time window"
+
+ # Now try the same operation on a table we are authorized to access.
+ good_resp = self._execute_hs2_stmt("describe tpch.lineitem")
+ TestHS2.check_response(good_resp)
+
+ # Verify the correct user information is in the runtime profile.
+ query_id = operation_id_to_query_id(good_resp.operationHandle.operationId)
+ profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
+ self._verify_profile_user_fields(profile_page, effective_user=getuser(),
+ delegated_user=getuser(), connected_user="hue")
+
+ # Try to delegate a user we are not authorized to delegate to.
+ resp = self._open_hs2("hue", {"impala.doas.user": "some_user"}, False)
+ assert "User 'hue' is not authorized to delegate to 'some_user'" in str(resp)
+
+ # Create a new session which does not have a do_as_user and run a simple query.
+ self.session_handle = self._open_hs2("hue", dict()).sessionHandle
+ resp = self._execute_hs2_stmt("select 1")
+
+ # Verify the correct user information is in the runtime profile. Since there is
+ # no do_as_user the Delegated User field should be empty.
+ query_id = operation_id_to_query_id(resp.operationHandle.operationId)
+
+ profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
+ self._verify_profile_user_fields(profile_page, effective_user="hue",
+ delegated_user="", connected_user="hue")
+
+ def _verify_profile_user_fields(self, profile_str, effective_user, connected_user,
+ delegated_user):
+ """Verifies the given runtime profile string contains the specified values for
+ User, Connected User, and Delegated User"""
+ assert "\n User: {0}\n".format(effective_user) in profile_str
+ assert "\n Connected User: {0}\n".format(connected_user) in profile_str
+ assert "\n Delegated User: {0}\n".format(delegated_user) in profile_str
+
+ def _wait_for_audit_record(self, user, impersonator, timeout_secs=30):
+ """Waits until an audit log record is found that contains the given user and
+ impersonator, or until the timeout is reached.
+ """
+ # The audit event might not show up immediately (the audit logs are flushed to disk
+ # on regular intervals), so poll the audit event logs until a matching record is
+ # found.
+ start_time = time.time()
+ while time.time() - start_time < timeout_secs:
+ for audit_file_name in os.listdir(AUDIT_LOG_DIR):
+ if self._find_matching_audit_record(audit_file_name, user, impersonator):
+ return True
+ time.sleep(1)
+ return False
+
+ def _find_matching_audit_record(self, audit_file_name, user, impersonator):
+ with open(os.path.join(AUDIT_LOG_DIR, audit_file_name)) as audit_log_file:
+ for line in audit_log_file.readlines():
+ json_dict = json.loads(line)
+ if len(json_dict) == 0: continue
+ if json_dict[min(json_dict)]["user"] == user and \
+ json_dict[min(json_dict)]["impersonator"] == impersonator:
+ return True
+ return False