You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/21 23:32:13 UTC

[impala] 01/02: IMPALA-8682: Add authorized proxy user/group test coverage with Ranger

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cad97be479bc2c7256ad927beb7f24859dafac0e
Author: Fredy Wijaya <fw...@cloudera.com>
AuthorDate: Wed Jun 19 11:23:11 2019 -0700

    IMPALA-8682: Add authorized proxy user/group test coverage with Ranger
    
    This patch adds a test coverage for authorized proxy user/group with
    Ranger. This patch also moves the authorized proxy tests into a separate
    file, test_authorized_proxy and refactors the tests to be more readable
    and reusable.
    
    Testing:
    - Added a new test_authorized_proxy.py
    - Ran all E2E authorization tests
    
    Change-Id: If6f797600720e6432b85cac8f13afe8fa5624596
    Reviewed-on: http://gerrit.cloudera.org:8080/13679
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/authorization/test_authorization.py    | 145 --------------
 tests/authorization/test_authorized_proxy.py | 275 +++++++++++++++++++++++++++
 2 files changed, 275 insertions(+), 145 deletions(-)

diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index f8ff36f..0236699 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -19,16 +19,13 @@
 
 import os
 import pytest
-import shutil
 import tempfile
-import json
 import grp
 import re
 import sys
 import subprocess
 import urllib
 
-from time import sleep, time
 from getpass import getuser
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
@@ -38,16 +35,12 @@ from thrift.protocol import TBinaryProtocol
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.file_utils import assert_file_in_dir_contains,\
     assert_no_files_in_dir_contain
-from tests.hs2.hs2_test_suite import operation_id_to_query_id
-from tests.util.filesystem_utils import WAREHOUSE
 
 SENTRY_CONFIG_DIR = os.getenv('IMPALA_HOME') + '/fe/src/test/resources/'
 SENTRY_CONFIG_FILE = SENTRY_CONFIG_DIR + 'sentry-site.xml'
 
 
 class TestAuthorization(CustomClusterTestSuite):
-  AUDIT_LOG_DIR = tempfile.mkdtemp(dir=os.getenv('LOG_DIR'))
-
   def setup(self):
     host, port = (self.cluster.impalads[0].service.hostname,
                   self.cluster.impalads[0].service.hs2_port)
@@ -60,7 +53,6 @@ class TestAuthorization(CustomClusterTestSuite):
   def teardown(self):
     if self.socket:
       self.socket.close()
-    shutil.rmtree(self.AUDIT_LOG_DIR, ignore_errors=True)
 
   def __execute_hs2_stmt(self, statement, verify=True):
     """
@@ -200,143 +192,6 @@ class TestAuthorization(CustomClusterTestSuite):
       self.__execute_hs2_stmt("drop database if exists {0}".format(unique_db))
       self.__execute_hs2_stmt("drop role {0}".format(unique_role))
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      impalad_args="--server_name=server1 "
-                   "--authorized_proxy_user_config=foo=bar;hue={0} "
-                   "--abort_on_failed_audit_event=false "
-                   "--audit_event_log_dir={1}"
-                   .format(getuser(), AUDIT_LOG_DIR),
-      catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE,
-      sentry_config=SENTRY_CONFIG_FILE)
-  def test_user_impersonation(self, unique_role):
-    """End-to-end user impersonation + authorization test"""
-    self.__test_impersonation(unique_role)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      impalad_args="--server_name=server1 "
-                   "--authorized_proxy_user_config=hue=bar "
-                   "--authorized_proxy_group_config=foo=bar;hue={0} "
-                   "--abort_on_failed_audit_event=false "
-                   "--audit_event_log_dir={1}"
-                   .format(grp.getgrgid(os.getgid()).gr_name, AUDIT_LOG_DIR),
-      catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE,
-      sentry_config=SENTRY_CONFIG_FILE)
-  def test_group_impersonation(self, unique_role):
-    """End-to-end group impersonation + authorization test"""
-    self.__test_impersonation(unique_role)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--server_name=server1\
-        --authorized_proxy_user_config=foo=bar\
-        --authorized_proxy_group_config=foo=bar\
-        --abort_on_failed_audit_event=false\
-        --audit_event_log_dir=%s" % (AUDIT_LOG_DIR))
-  def test_no_matching_user_and_group_impersonation(self):
-    open_session_req = TCLIService.TOpenSessionReq()
-    open_session_req.username = 'hue'
-    open_session_req.configuration = dict()
-    open_session_req.configuration['impala.doas.user'] = 'abc'
-    resp = self.hs2_client.OpenSession(open_session_req)
-    assert 'User \'hue\' is not authorized to delegate to \'abc\'' in str(resp)
-
-  def __test_impersonation(self, role):
-    """End-to-end impersonation + authorization test. Expects authorization to be
-    configured before running this test"""
-    # TODO: To reuse the HS2 utility code from the TestHS2 test suite we need to import
-    # the module within this test function, rather than as a top-level import. This way
-    # the tests in that module will not get pulled when executing this test suite. The fix
-    # is to split the utility code out of the TestHS2 class and support HS2 as a first
-    # class citizen in our test framework.
-    from tests.hs2.test_hs2 import TestHS2
-
-    try:
-      self.session_handle = self.__open_hs2(getuser(), dict()).sessionHandle
-      self.__execute_hs2_stmt("create role {0}".format(role))
-      self.__execute_hs2_stmt("grant all on table tpch.lineitem to role {0}"
-                              .format(role))
-      self.__execute_hs2_stmt("grant role {0} to group {1}"
-                              .format(role, grp.getgrnam(getuser()).gr_name))
-      self.__execute_hs2_stmt("grant role {0} to group {1}"
-                              .format(role, grp.getgrgid(os.getgid()).gr_name))
-
-      # Try to query a table we are not authorized to access
-      self.session_handle = self.__open_hs2('hue',
-                                            {'impala.doas.user': getuser()}).sessionHandle
-      bad_resp = self.__execute_hs2_stmt("describe tpch_seq.lineitem", False)
-      assert 'User \'%s\' does not have privileges to access' % getuser() in\
-          str(bad_resp)
-
-      assert self.__wait_for_audit_record(user=getuser(), impersonator='hue'),\
-          'No matching audit event recorded in time window'
-
-      # Now try the same operation on a table we are authorized to access.
-      good_resp = self.__execute_hs2_stmt("describe tpch.lineitem")
-      TestHS2.check_response(good_resp)
-
-      # Verify the correct user information is in the runtime profile
-      query_id = operation_id_to_query_id(
-          good_resp.operationHandle.operationId)
-      profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
-      self.__verify_profile_user_fields(profile_page, effective_user=getuser(),
-          delegated_user=getuser(), connected_user='hue')
-
-      # Try to user we are not authorized to delegate to.
-      resp = self.__open_hs2('hue', {'impala.doas.user': 'some_user'}, False)
-      assert 'User \'hue\' is not authorized to delegate to \'some_user\'' in str(resp)
-
-      # Create a new session which does not have a do_as_user and run a simple query.
-      self.session_handle = self.__open_hs2('hue', dict()).sessionHandle
-      resp = self.__execute_hs2_stmt("select 1")
-
-      # Verify the correct user information is in the runtime profile. Since there is
-      # no do_as_user the Delegated User field should be empty.
-      query_id = operation_id_to_query_id(resp.operationHandle.operationId)
-
-      profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
-      self.__verify_profile_user_fields(profile_page, effective_user='hue',
-          delegated_user='', connected_user='hue')
-    finally:
-      self.session_handle = self.__open_hs2(getuser(), dict()).sessionHandle
-      self.__execute_hs2_stmt("grant all on server to role {0}".format(role))
-      self.__execute_hs2_stmt("grant role {0} to group {1}"
-                              .format(role, grp.getgrnam(getuser()).gr_name))
-      self.__execute_hs2_stmt("drop role {0}".format(role))
-
-  def __verify_profile_user_fields(self, profile_str, effective_user, connected_user,
-      delegated_user):
-    """Verifies the given runtime profile string contains the specified values for
-    User, Connected User, and Delegated User"""
-    assert '\n    User: %s\n' % effective_user in profile_str
-    assert '\n    Connected User: %s\n' % connected_user in profile_str
-    assert '\n    Delegated User: %s\n' % delegated_user in profile_str
-
-  def __wait_for_audit_record(self, user, impersonator, timeout_secs=30):
-    """Waits until an audit log record is found that contains the given user and
-    impersonator, or until the timeout is reached.
-    """
-    # The audit event might not show up immediately (the audit logs are flushed to disk
-    # on regular intervals), so poll the audit event logs until a matching record is
-    # found.
-    start_time = time()
-    while time() - start_time < timeout_secs:
-      for audit_file_name in os.listdir(self.AUDIT_LOG_DIR):
-        if self.__find_matching_audit_record(audit_file_name, user, impersonator):
-          return True
-      sleep(1)
-    return False
-
-  def __find_matching_audit_record(self, audit_file_name, user, impersonator):
-    with open(os.path.join(self.AUDIT_LOG_DIR, audit_file_name)) as audit_log_file:
-      for line in audit_log_file.readlines():
-          json_dict = json.loads(line)
-          if len(json_dict) == 0: continue
-          if json_dict[min(json_dict)]['user'] == user and\
-              json_dict[min(json_dict)]['impersonator'] == impersonator:
-            return True
-    return False
-
   def __run_stmt_and_verify_profile_access(self, stmt, has_access, close_operation):
     """Runs 'stmt' and retrieves the runtime profile and exec summary. If
       'has_access' is true, it verifies that no runtime profile or exec summary are
diff --git a/tests/authorization/test_authorized_proxy.py b/tests/authorization/test_authorized_proxy.py
new file mode 100644
index 0000000..5529954
--- /dev/null
+++ b/tests/authorization/test_authorized_proxy.py
@@ -0,0 +1,275 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import os
+import grp
+import time
+import json
+import tempfile
+import shutil
+
+from getpass import getuser
+from ImpalaService import ImpalaHiveServer2Service
+from TCLIService import TCLIService
+from thrift.transport.TSocket import TSocket
+from thrift.transport.TTransport import TBufferedTransport
+from thrift.protocol import TBinaryProtocol
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.hs2.hs2_test_suite import operation_id_to_query_id
+
+AUDIT_LOG_DIR = tempfile.mkdtemp(dir=os.getenv("LOG_DIR"))
+
+SENTRY_CONFIG_FILE = "{0}/fe/src/test/resources/sentry-site.xml" \
+                     .format(os.getenv("IMPALA_HOME"))
+SENTRY_IMPALAD_ARGS = "--server-name=server1 " \
+                      "--abort_on_failed_audit_event=false " \
+                      "--audit_event_log_dir={0}".format(AUDIT_LOG_DIR)
+SENTRY_CATALOGD_ARGS = "--sentry_config={0}".format(SENTRY_CONFIG_FILE)
+
+RANGER_IMPALAD_ARGS = "--server-name=server1 " \
+                      "--ranger_service_type=hive " \
+                      "--ranger_app_id=impala " \
+                      "--authorization_provider=ranger " \
+                      "--abort_on_failed_audit_event=false " \
+                      "--audit_event_log_dir={0}".format(AUDIT_LOG_DIR)
+RANGER_CATALOGD_ARGS = "--server-name=server1 " \
+                       "--ranger_service_type=hive " \
+                       "--ranger_app_id=impala " \
+                       "--authorization_provider=ranger"
+RANGER_ADMIN_USER = "admin"
+
+
+class TestAuthorizedProxy(CustomClusterTestSuite):
+  def setup(self):
+    host, port = (self.cluster.impalads[0].service.hostname,
+                  self.cluster.impalads[0].service.hs2_port)
+    self.socket = TSocket(host, port)
+    self.transport = TBufferedTransport(self.socket)
+    self.transport.open()
+    self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
+    self.hs2_client = ImpalaHiveServer2Service.Client(self.protocol)
+
+  def teardown(self):
+    if self.socket:
+      self.socket.close()
+    shutil.rmtree(AUDIT_LOG_DIR, ignore_errors=True)
+
+  def _execute_hs2_stmt(self, statement, verify=True):
+    """
+    Executes an hs2 statement
+
+    :param statement: the statement to execute
+    :param verify: If set to true, will thrown an exception on a failed hs2 execution
+    :return: the result of execution
+    """
+    from tests.hs2.test_hs2 import TestHS2
+    execute_statement_req = TCLIService.TExecuteStatementReq()
+    execute_statement_req.sessionHandle = self.session_handle
+    execute_statement_req.statement = statement
+    result = self.hs2_client.ExecuteStatement(execute_statement_req)
+    if verify:
+      TestHS2.check_response(result)
+    return result
+
+  def _open_hs2(self, user, configuration, verify=True):
+    """
+    Open a session with hs2
+
+    :param user: the user to open the session
+    :param configuration: the configuration for the session
+    :param verify: If set to true, will thrown an exception on failed session open
+    :return: the result of opening the session
+    """
+    from tests.hs2.test_hs2 import TestHS2
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_req.username = user
+    open_session_req.configuration = configuration
+    resp = self.hs2_client.OpenSession(open_session_req)
+    if verify:
+      TestHS2.check_response(resp)
+    return resp
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} --authorized_proxy_user_config=foo=bar;hue={1} "
+                 .format(SENTRY_IMPALAD_ARGS, getuser()),
+    catalogd_args=SENTRY_CATALOGD_ARGS)
+  def test_authorized_proxy_user_with_sentry(self, unique_role):
+    """Tests authorized proxy user with Sentry using HS2."""
+    self._test_authorized_proxy_with_sentry(unique_role, self._test_authorized_proxy)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} --authorized_proxy_user_config=foo=bar;hue={1} "
+                 .format(RANGER_IMPALAD_ARGS, getuser()),
+    catalogd_args=RANGER_CATALOGD_ARGS)
+  def test_authorized_proxy_user_with_ranger(self):
+    """Tests authorized proxy user with Ranger using HS2."""
+    self._test_authorized_proxy_with_ranger(self._test_authorized_proxy)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} --authorized_proxy_user_config=hue=bar "
+                 "--authorized_proxy_group_config=foo=bar;hue={1}"
+                 .format(SENTRY_IMPALAD_ARGS, grp.getgrgid(os.getgid()).gr_name),
+    catalogd_args=SENTRY_CATALOGD_ARGS)
+  def test_authorized_proxy_group_with_sentry(self, unique_role):
+    """Tests authorized proxy group with Sentry using HS2."""
+    self._test_authorized_proxy_with_sentry(unique_role, self._test_authorized_proxy)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} --authorized_proxy_user_config=hue=bar "
+                 "--authorized_proxy_group_config=foo=bar;hue={1}"
+                 .format(RANGER_IMPALAD_ARGS, grp.getgrgid(os.getgid()).gr_name),
+    catalogd_args=RANGER_CATALOGD_ARGS)
+  def test_authorized_proxy_group_with_ranger(self):
+    """Tests authorized proxy group with Ranger using HS2."""
+    self._test_authorized_proxy_with_ranger(self._test_authorized_proxy)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} --authorized_proxy_user_config=foo=bar "
+                 "--authorized_proxy_group_config=foo=bar".format(SENTRY_IMPALAD_ARGS),
+    catalogd_args=SENTRY_CATALOGD_ARGS)
+  def test_no_matching_user_and_group_authorized_proxy_with_sentry(self):
+    self._test_no_matching_user_and_group_authorized_proxy()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} --authorized_proxy_user_config=foo=bar "
+                 "--authorized_proxy_group_config=foo=bar".format(RANGER_IMPALAD_ARGS),
+    catalogd_args=RANGER_CATALOGD_ARGS)
+  def test_no_matching_user_and_group_authorized_proxy_with_ranger(self):
+    self._test_no_matching_user_and_group_authorized_proxy()
+
+  def _test_no_matching_user_and_group_authorized_proxy(self):
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_req.username = "hue"
+    open_session_req.configuration = dict()
+    open_session_req.configuration["impala.doas.user"] = "abc"
+    resp = self.hs2_client.OpenSession(open_session_req)
+    assert "User 'hue' is not authorized to delegate to 'abc'" in str(resp)
+
+  def _test_authorized_proxy_with_sentry(self, role, test_func):
+    try:
+      self.session_handle = self._open_hs2(getuser(), dict()).sessionHandle
+      self._execute_hs2_stmt("create role {0}".format(role))
+      self._execute_hs2_stmt("grant all on table tpch.lineitem to role {0}"
+                             .format(role))
+      self._execute_hs2_stmt("grant role {0} to group {1}"
+                             .format(role, grp.getgrnam(getuser()).gr_name))
+      self._execute_hs2_stmt("grant role {0} to group {1}"
+                             .format(role, grp.getgrgid(os.getgid()).gr_name))
+      test_func()
+    finally:
+      self.session_handle = self._open_hs2(getuser(), dict()).sessionHandle
+      self._execute_hs2_stmt("grant all on server to role {0}".format(role))
+      self._execute_hs2_stmt("grant role {0} to group {1}"
+                             .format(role, grp.getgrnam(getuser()).gr_name))
+      self._execute_hs2_stmt("drop role {0}".format(role))
+
+  def _test_authorized_proxy_with_ranger(self, test_func):
+    try:
+      self.session_handle = self._open_hs2(RANGER_ADMIN_USER, dict()).sessionHandle
+      self._execute_hs2_stmt("grant all on table tpch.lineitem to user {0}"
+                             .format(getuser()))
+      test_func()
+    finally:
+      self.session_handle = self._open_hs2(RANGER_ADMIN_USER, dict()).sessionHandle
+      self._execute_hs2_stmt("revoke all on table tpch.lineitem from user {0}"
+                             .format(getuser()))
+
+  def _test_authorized_proxy(self):
+    """End-to-end impersonation + authorization test. Expects authorization to be
+       configured before running this test"""
+    # TODO: To reuse the HS2 utility code from the TestHS2 test suite we need to import
+    # the module within this test function, rather than as a top-level import. This way
+    # the tests in that module will not get pulled when executing this test suite. The fix
+    # is to split the utility code out of the TestHS2 class and support HS2 as a first
+    # class citizen in our test framework.
+    from tests.hs2.test_hs2 import TestHS2
+
+    # Try to query a table we are not authorized to access.
+    self.session_handle = self._open_hs2("hue",
+                                         {"impala.doas.user": getuser()}).sessionHandle
+    bad_resp = self._execute_hs2_stmt("describe tpch_seq.lineitem", False)
+    assert "User '%s' does not have privileges to access" % getuser() in \
+           str(bad_resp)
+
+    assert self._wait_for_audit_record(user=getuser(), impersonator="hue"), \
+           "No matching audit event recorded in time window"
+
+    # Now try the same operation on a table we are authorized to access.
+    good_resp = self._execute_hs2_stmt("describe tpch.lineitem")
+    TestHS2.check_response(good_resp)
+
+    # Verify the correct user information is in the runtime profile.
+    query_id = operation_id_to_query_id(good_resp.operationHandle.operationId)
+    profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
+    self._verify_profile_user_fields(profile_page, effective_user=getuser(),
+                                     delegated_user=getuser(), connected_user="hue")
+
+    # Try to delegate a user we are not authorized to delegate to.
+    resp = self._open_hs2("hue", {"impala.doas.user": "some_user"}, False)
+    assert "User 'hue' is not authorized to delegate to 'some_user'" in str(resp)
+
+    # Create a new session which does not have a do_as_user and run a simple query.
+    self.session_handle = self._open_hs2("hue", dict()).sessionHandle
+    resp = self._execute_hs2_stmt("select 1")
+
+    # Verify the correct user information is in the runtime profile. Since there is
+    # no do_as_user the Delegated User field should be empty.
+    query_id = operation_id_to_query_id(resp.operationHandle.operationId)
+
+    profile_page = self.cluster.impalads[0].service.read_query_profile_page(query_id)
+    self._verify_profile_user_fields(profile_page, effective_user="hue",
+                                     delegated_user="", connected_user="hue")
+
+  def _verify_profile_user_fields(self, profile_str, effective_user, connected_user,
+                                  delegated_user):
+    """Verifies the given runtime profile string contains the specified values for
+       User, Connected User, and Delegated User"""
+    assert "\n    User: {0}\n".format(effective_user) in profile_str
+    assert "\n    Connected User: {0}\n".format(connected_user) in profile_str
+    assert "\n    Delegated User: {0}\n".format(delegated_user) in profile_str
+
+  def _wait_for_audit_record(self, user, impersonator, timeout_secs=30):
+    """Waits until an audit log record is found that contains the given user and
+       impersonator, or until the timeout is reached.
+    """
+    # The audit event might not show up immediately (the audit logs are flushed to disk
+    # on regular intervals), so poll the audit event logs until a matching record is
+    # found.
+    start_time = time.time()
+    while time.time() - start_time < timeout_secs:
+      for audit_file_name in os.listdir(AUDIT_LOG_DIR):
+        if self._find_matching_audit_record(audit_file_name, user, impersonator):
+          return True
+      time.sleep(1)
+    return False
+
+  def _find_matching_audit_record(self, audit_file_name, user, impersonator):
+    with open(os.path.join(AUDIT_LOG_DIR, audit_file_name)) as audit_log_file:
+      for line in audit_log_file.readlines():
+        json_dict = json.loads(line)
+        if len(json_dict) == 0: continue
+        if json_dict[min(json_dict)]["user"] == user and \
+            json_dict[min(json_dict)]["impersonator"] == impersonator:
+          return True
+    return False