You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/12/20 18:24:38 UTC

[impala] branch master updated (ed5e7da -> 1b4ca58)

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from ed5e7da  IMPALA-9240: add HTTP code handling to THttpClient.
     new 8a4fece  IMPALA-9137: Blacklist node if a DataStreamService RPC to the node fails
     new 1b4ca58  IMPALA-9149: part 1: Re-enabe Ranger-related FE tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/coordinator.cc                      |  73 +++++
 be/src/runtime/coordinator.h                       |  17 ++
 be/src/runtime/fragment-instance-state.cc          |   4 +
 be/src/runtime/krpc-data-stream-sender.cc          |   4 +
 be/src/runtime/runtime-state.cc                    |  19 ++
 be/src/runtime/runtime-state.h                     |  27 ++
 be/src/util/network-util.cc                        |  13 +
 be/src/util/network-util.h                         |   7 +
 common/protobuf/common.proto                       |   6 +
 common/protobuf/control_service.proto              |  23 ++
 .../ranger/RangerAuthorizationChecker.java         |   9 +-
 .../authorization/AuthorizationStmtTest.java       | 322 +++++++++------------
 .../authorization/AuthorizationTestBase.java       |  69 ++++-
 .../authorization/ranger/RangerAuditLogTest.java   |   8 -
 .../org/apache/impala/common/FrontendFixture.java  |  12 +
 .../org/apache/impala/common/FrontendTestBase.java |   7 +
 testdata/bin/create-load-data.sh                   |  43 ++-
 ...p.json.template => impala_group_non_owner.json} |   2 +-
 ...n.template => impala_group_owner.json.template} |   0
 ...emplate => impala_user_non_owner.json.template} |   4 +-
 ...on.template => impala_user_owner.json.template} |   2 +-
 .../cluster/ranger/setup/policy_4_revised.json     | 117 ++++++++
 tests/custom_cluster/test_blacklist.py             |  37 +++
 23 files changed, 610 insertions(+), 215 deletions(-)
 copy testdata/cluster/ranger/setup/{impala_group.json.template => impala_group_non_owner.json} (51%)
 rename testdata/cluster/ranger/setup/{impala_group.json.template => impala_group_owner.json.template} (100%)
 copy testdata/cluster/ranger/setup/{impala_user.json.template => impala_user_non_owner.json.template} (50%)
 rename testdata/cluster/ranger/setup/{impala_user.json.template => impala_user_owner.json.template} (68%)
 create mode 100644 testdata/cluster/ranger/setup/policy_4_revised.json


[impala] 01/02: IMPALA-9137: Blacklist node if a DataStreamService RPC to the node fails

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8a4fececcf8e9599978cc1a532386b8e924838ed
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Wed Nov 27 11:46:04 2019 -0800

    IMPALA-9137: Blacklist node if a DataStreamService RPC to the node fails
    
    Introduces a new optional field to FragmentInstanceExecStatusPB:
    AuxErrorInfoPB. AuxErrorInfoPB contains optional metadata associated
    with a failed fragment instance. Currently, AuxErrorInfoPB only contains
    one field: RPCErrorInfoPB, which is only set if the fragment failed
    because a RPC to another impalad failed. The RPCErrorInfoPB contains
    the destination node of the failed RPC and the posix error code of the
    failed RPC.
    
    Coordinator::UpdateBackendExecStatus(ReportExecStatusRequestPB, ...)
    uses the information in RPCErrorInfoPB (if one is set) to blacklist
    the target node. While RPCErrorInfoPB::dest_node can be set to the address
    of the Coordinator, the Coordinator will not blacklist itself. The
    Coordinator only blacklists the node if the RPC failed with a specific
    error code (currently either ENOTCONN, ECONNREFUSED, ESHUTDOWN).
    
    Testing:
    * Ran core tests
    * Added new test to test_blacklist.py
    
    Change-Id: I733cca13847fde43c8ea2ae574d3ae04bd06419c
    Reviewed-on: http://gerrit.cloudera.org:8080/14677
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc             | 73 +++++++++++++++++++++++++++++++
 be/src/runtime/coordinator.h              | 17 +++++++
 be/src/runtime/fragment-instance-state.cc |  4 ++
 be/src/runtime/krpc-data-stream-sender.cc |  4 ++
 be/src/runtime/runtime-state.cc           | 19 ++++++++
 be/src/runtime/runtime-state.h            | 27 ++++++++++++
 be/src/util/network-util.cc               | 13 ++++++
 be/src/util/network-util.h                |  7 +++
 common/protobuf/common.proto              |  6 +++
 common/protobuf/control_service.proto     | 23 ++++++++++
 tests/custom_cluster/test_blacklist.py    | 37 ++++++++++++++++
 11 files changed, 230 insertions(+)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1051695..7bebdd0 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -17,6 +17,7 @@
 
 #include "runtime/coordinator.h"
 
+#include <cerrno>
 #include <unordered_set>
 
 #include <thrift/protocol/TDebugProtocol.h>
@@ -236,6 +237,15 @@ void Coordinator::InitBackendStates() {
         schedule_, query_ctx(), backend_idx, filter_mode_, entry.second));
     backend_state->Init(fragment_stats_, host_profiles_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
+    // was_inserted is true if the pair was successfully inserted into the map, false
+    // otherwise.
+    bool was_inserted = addr_to_backend_state_
+                            .emplace(backend_state->krpc_impalad_address(), backend_state)
+                            .second;
+    if (UNLIKELY(!was_inserted)) {
+      DCHECK(false) << "Network address " << backend_state->krpc_impalad_address()
+                    << " associated with multiple BackendStates";
+    }
   }
   backend_resource_state_ =
       obj_pool()->Add(new BackendResourceState(backend_states_, schedule_));
@@ -827,6 +837,11 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
       // We may start receiving status reports before all exec rpcs are complete.
       // Can't apply state transition until no more exec rpcs will be sent.
       exec_rpcs_complete_barrier_.Wait();
+
+      // Iterate through all instance exec statuses, and use each fragment's AuxErrorInfo
+      // to possibly blacklist any "faulty" nodes.
+      UpdateBlacklistWithAuxErrorInfo(request);
+
       // Transition the status if we're not already in a terminal state. This won't block
       // because either this transitions to an ERROR state or the query is already in
       // a terminal state.
@@ -855,6 +870,64 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
   return IsExecuting() ? Status::OK() : Status::CANCELLED;
 }
 
+void Coordinator::UpdateBlacklistWithAuxErrorInfo(
+    const ReportExecStatusRequestPB& request) {
+  // If the Backend failed due to a RPC failure, blacklist the destination node of
+  // the failed RPC. Only blacklist one node per ReportExecStatusRequestPB to avoid
+  // blacklisting nodes too aggressively. Currently, only blacklist the first node
+  // that contains a valid RPCErrorInfoPB object.
+  for (auto instance_exec_status : request.instance_exec_status()) {
+    if (instance_exec_status.has_aux_error_info()
+        && instance_exec_status.aux_error_info().has_rpc_error_info()) {
+      RPCErrorInfoPB rpc_error_info =
+          instance_exec_status.aux_error_info().rpc_error_info();
+      DCHECK(rpc_error_info.has_dest_node());
+      DCHECK(rpc_error_info.has_posix_error_code());
+      const NetworkAddressPB& dest_node = rpc_error_info.dest_node();
+
+      auto dest_node_and_be_state =
+          addr_to_backend_state_.find(FromNetworkAddressPB(dest_node));
+
+      // If the target address of the RPC is not known to the Coordinator, it cannot
+      // be blacklisted.
+      if (dest_node_and_be_state == addr_to_backend_state_.end()) {
+        string err_msg = "Query failed due to a failed RPC to an unknown target address "
+            + NetworkAddressPBToString(dest_node);
+        DCHECK(false) << err_msg;
+        LOG(ERROR) << err_msg;
+        continue;
+      }
+
+      // The execution parameters of the destination node for the failed RPC.
+      const BackendExecParams* dest_node_exec_params =
+          dest_node_and_be_state->second->exec_params();
+
+      // The Coordinator for the query should never be blacklisted.
+      if (dest_node_exec_params->is_coord_backend) {
+        VLOG_QUERY << "Query failed due to a failed RPC to the Coordinator";
+        continue;
+      }
+
+      // A set of RPC related posix error codes that should cause the target node
+      // of the failed RPC to be blacklisted.
+      static const set<int32_t> blacklistable_rpc_error_codes = {
+          ENOTCONN, // 107: Transport endpoint is not connected
+          ESHUTDOWN, // 108: Cannot send after transport endpoint shutdown
+          ECONNREFUSED  // 111: Connection refused
+      };
+
+      if (blacklistable_rpc_error_codes.find(rpc_error_info.posix_error_code())
+          != blacklistable_rpc_error_codes.end()) {
+        LOG(INFO) << "Blacklisting " << NetworkAddressPBToString(dest_node)
+                  << " because a RPC to it failed.";
+        ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
+            dest_node_exec_params->be_desc);
+        break;
+      }
+    }
+  }
+}
+
 int64_t Coordinator::GetMaxBackendStateLagMs(TNetworkAddress* address) {
   if (exec_rpcs_complete_barrier_.pending() > 0) {
     // Exec() hadn't completed for all the backends, so we can't rely on
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0185f99..816018a 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -248,6 +248,13 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
   std::vector<BackendState*> backend_states_;
 
+  /// A map from the TNetworkAddress of a backend to the BackendState running on the
+  /// TNetworkAddress. All values are non-nullptr and owned by obj_pool(). The address
+  /// is the kRPC address (Coordinator::BackendState::krpc_impalad_address) of the
+  /// Backend. This map is distinct from QuerySchedule::per_backend_exec_params(),
+  /// which uses the Thrift address as the key rather than the kRPC address.
+  boost::unordered_map<TNetworkAddress, BackendState*> addr_to_backend_state_;
+
   /// Protects the population of backend_states_ vector (not the BackendState objects).
   /// Used when accessing backend_states_ if it's not guaranteed that
   /// InitBackendStates() has completed.
@@ -541,6 +548,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Checks the exec_state_ of the query and returns true if the query is executing.
   bool IsExecuting();
 
+  /// Helper function for UpdateBackendExecStatus that iterates through the
+  /// FragmentInstanceExecStatusPB for each fragment and uses AuxErrorInfoPB to check if
+  /// any nodes should be blacklisted. AuxErrorInfoPB contains additional error
+  /// information about why the fragment failed, beyond what is available in the
+  /// ReportExecStatusRequestPB::overall_status field. This method uses information in
+  /// AuxErrorInfoPB to classify specific nodes as "faulty" and then blacklists them. A
+  /// node might be considered "faulty" if, for example, a RPC to that node failed, or a
+  /// fragment on that node failed due to a disk IO error.
+  void UpdateBlacklistWithAuxErrorInfo(const ReportExecStatusRequestPB& request);
+
   /// BackendState and BackendResourceState are private to the Coordinator class, so mark
   /// all tests in CoordinatorBackendStateTest as friends.
   friend class CoordinatorBackendStateTest;
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index dde5b79..02bc9cf 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -290,6 +290,10 @@ void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
     stateful_report->set_report_seq_no(report_seq_no_);
     runtime_state()->GetUnreportedErrors(stateful_report->mutable_error_log());
   }
+  // If set in the RuntimeState, set the AuxErrorInfoPB field.
+  if (runtime_state()->HasAuxErrorInfo()) {
+    runtime_state()->GetAuxErrorInfo(instance_status->mutable_aux_error_info());
+  }
 }
 
 void FragmentInstanceState::ReportSuccessful(
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 972925c..f1a9951 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -443,6 +443,10 @@ void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
         MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms));
     return;
   }
+  // If the RPC failed due to a network error, set the RPC error info in RuntimeState.
+  if (controller_status.IsNetworkError()) {
+    parent_->state_->SetRPCErrorInfo(address_, controller_status.posix_code());
+  }
   MarkDone(FromKuduStatus(controller_status, prepend));
 }
 
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 6d1ab84..a7ff96d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -311,6 +311,25 @@ void RuntimeState::ReleaseResources() {
   released_resources_ = true;
 }
 
+void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code) {
+  boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+  if (aux_error_info_ == nullptr) {
+    aux_error_info_.reset(new AuxErrorInfoPB());
+    RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
+    NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node();
+    network_addr->set_hostname(dest_node.hostname);
+    network_addr->set_port(dest_node.port);
+    rpc_error_info->set_posix_error_code(posix_error_code);
+  }
+}
+
+void RuntimeState::GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info) {
+  boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+  if (aux_error_info_ != nullptr) {
+    aux_error_info->CopyFrom(*aux_error_info_);
+  }
+}
+
 const std::string& RuntimeState::GetEffectiveUser() const {
   return impala::GetEffectiveUser(query_ctx().session);
 }
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 16c6df4..884fae6 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -302,6 +302,26 @@ class RuntimeState {
   /// Release resources and prepare this object for destruction. Can only be called once.
   void ReleaseResources();
 
+  /// If the fragment instance associated with this RuntimeState failed due to a RPC
+  /// failure, use this method to set the network address of the RPC's target node and
+  /// the posix error code of the failed RPC. The target node address and posix error code
+  /// will be included in the AuxErrorInfo returned by GetAuxErrorInfo. This method is
+  /// idempotent.
+  void SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code);
+
+  /// Returns true if this RuntimeState has any auxiliary error information, false
+  /// otherwise. Currently, only SetRPCErrorInfo() sets aux error info.
+  bool HasAuxErrorInfo() {
+    boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+    return aux_error_info_ != nullptr;
+  }
+
+  /// Sets the given AuxErrorInfoPB with all relevant aux error info from the fragment
+  /// instance associated with this RuntimeState. If no aux error info for this
+  /// RuntimeState has been set, this method does nothing. Currently, only
+  /// SetRPCErrorInfo() sets aux error info.
+  void GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info);
+
   static const char* LLVM_CLASS_NAME;
 
  private:
@@ -414,6 +434,13 @@ class RuntimeState {
   /// nodes that share this runtime state.
   boost::scoped_ptr<RuntimeFilterBank> filter_bank_;
 
+  /// Lock protecting aux_error_info_.
+  SpinLock aux_error_info_lock_;
+
+  /// Auxiliary error information, only set if the fragment instance failed (e.g.
+  /// query_status_ != Status::OK()). Owned by this RuntimeState.
+  std::unique_ptr<AuxErrorInfoPB> aux_error_info_;
+
   /// prohibit copies
   RuntimeState(const RuntimeState&);
 
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index d29e3e9..d31076b 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -167,6 +167,19 @@ string TNetworkAddressToString(const TNetworkAddress& address) {
   return ss.str();
 }
 
+string NetworkAddressPBToString(const NetworkAddressPB& address) {
+  stringstream ss;
+  ss << address.hostname() << ":" << dec << address.port();
+  return ss.str();
+}
+
+TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address) {
+  TNetworkAddress t_address;
+  t_address.__set_hostname(address.hostname());
+  t_address.__set_port(address.port());
+  return t_address;
+}
+
 /// Pick a random port in the range of ephemeral ports
 /// https://tools.ietf.org/html/rfc6335
 int FindUnusedEphemeralPort() {
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 947ea03..b6629af 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "common/status.h"
+#include "gen-cpp/common.pb.h"
 #include "gen-cpp/StatestoreService_types.h"
 #include "gen-cpp/Types_types.h"
 #include <vector>
@@ -64,6 +65,12 @@ bool IsWildcardAddress(const std::string& ipaddress);
 /// Utility method to print address as address:port
 std::string TNetworkAddressToString(const TNetworkAddress& address);
 
+/// Utility method to print a NetworkAddressPB as address:port.
+std::string NetworkAddressPBToString(const NetworkAddressPB& address);
+
+/// Utility method to convert a NetworkAddressPB to a TNetworkAddress.
+TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address);
+
 /// Utility method to convert TNetworkAddress to Kudu sock addr.
 /// Note that 'address' has to contain a resolved IP address.
 Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 6c265a3..0bf2a97 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -21,6 +21,12 @@ syntax="proto2";
 
 package impala;
 
+// Refer to Types.thrift for documentation.
+message NetworkAddressPB {
+  required string hostname = 1;
+  required int32 port = 2;
+}
+
 // Proto-serialized version of Impala's Status object.
 message StatusPB {
   optional int32 status_code = 1;
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 50cad00..085be7a 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -124,6 +124,25 @@ message StatefulStatusPB {
   map<int32, ErrorLogEntryPB> error_log = 2;
 }
 
+// RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
+// RPC to another node failed.
+message RPCErrorInfoPB {
+  // The address of the RPC's target node.
+  required NetworkAddressPB dest_node = 1;
+
+  // The posix error code of the failed RPC.
+  required int32 posix_error_code = 2;
+}
+
+// Error metadata that can be associated with a failed fragment instance. Used to store
+// extra info about errors encountered during fragment execution. This information is
+// used by the Coordinator to blacklist potentially unhealthy nodes.
+message AuxErrorInfoPB {
+  // Set if the fragment instance failed because a RPC to another node failed. Only set
+  // if the RPC failed due to a network error.
+  optional RPCErrorInfoPB rpc_error_info = 1;
+}
+
 message FragmentInstanceExecStatusPB {
   // Sequence number prevents out-of-order or duplicated updates from being applied.
   optional int64 report_seq_no = 1;
@@ -144,6 +163,10 @@ message FragmentInstanceExecStatusPB {
   // The non-idempotent parts of the report, and any prior reports that are not known to
   // have been received by the coordinator.
   repeated StatefulStatusPB stateful_report = 6;
+
+  // Metadata associated with a failed fragment instance. Only set for failed fragment
+  // instances.
+  optional AuxErrorInfoPB aux_error_info = 7;
 }
 
 message ReportExecStatusRequestPB {
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index 4b47039..b278b8b 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -20,6 +20,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 import pytest
 import re
 
+from beeswaxd.BeeswaxService import QueryState
 from tests.common.skip import SkipIfNotHdfsMinicluster
 from time import sleep
 
@@ -113,3 +114,39 @@ class TestBlacklist(CustomClusterTestSuite):
     assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
         result.runtime_profile
     assert re.search("NumBackends: 3", result.runtime_profile), result.runtime_profile
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
+  def test_kill_impalad_with_running_queries(self, cursor):
+    """Verifies that when an Impala executor is killed while running a query, that the
+    Coordinator blacklists the killed executor."""
+
+    # Run a query asynchronously. Normally, this query should take a few seconds to
+    # complete.
+    query = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
+        where t1.l_orderkey = t2.l_orderkey"
+    handle = self.execute_query_async(query)
+
+    # Wait for the query to start running
+    self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10)
+
+    # Kill one of the Impala executors
+    killed_impalad = self.cluster.impalads[2]
+    killed_impalad.kill()
+
+    # Try to fetch results from the query. Fetch requests should fail because one of the
+    # impalads running the query was killed. When the query fails, the Coordinator should
+    # add the killed Impala executor to the blacklist (since a RPC to that node failed).
+    try:
+      self.client.fetch(query, handle)
+      assert False, "Query was expected to fail"
+    except Exception as e:
+      # The query should fail due to an RPC error.
+      assert "TransmitData() to " in str(e) or "EndDataStream() to " in str(e)
+
+    # Run another query which should succeed and verify the impalad was blacklisted.
+    result = self.execute_query("select count(*) from tpch.lineitem")
+    match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
+    assert match is not None and match.group(1) == "%s:%s" % \
+      (killed_impalad.hostname, killed_impalad.service.be_port), \
+      result.runtime_profile


[impala] 02/02: IMPALA-9149: part 1: Re-enabe Ranger-related FE tests

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1b4ca58a98a1509e6129132c9645fe059c9079d9
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Mon Nov 11 16:08:44 2019 -0800

    IMPALA-9149: part 1: Re-enabe Ranger-related FE tests
    
    In IMPALA-9047, we disabled some Ranger-related FE and BE tests due to
    changes in Ranger's behavior after upgrading Ranger from 1.2 to 2.0.
    This patch aims to re-enable those disabled FE tests in
    AuthorizationStmtTest.java and RangerAuditLogTest.java to increase
    Impala's test coverage of authorization via Ranger.
    
    There are at least two major changes in Ranger's behavior in the newer
    versions.
    
    1. The first is that the owner of the requested resource no longer has
    to be explicitly granted privileges in order to access the resource.
    
    2. The second is that a user not explicitly granted the privilege of
    creating a database is able to do so.
    
    Due to these changes, some of previous Ranger authorization requests
    that were expected to be rejected are now granted after the upgrade.
    
    To re-enable the tests affected by the first change described above, we
    modify AuthorizationTestBase.java to allow our FE Ranger authorization
    tests to specify the requesting user in an authorization test. Those
    tests failed after the upgrade because the default requesting user in
    Impala's AuthorizationTestBase.java happens to be the owner of the
    resources involved in our FE authorization tests. After this patch, a
    requesting user can be either a non-owner user or an owner user in a
    Ranger authorization test and the requesting user would correspond to a
    non-owner user if it is not explicitly specified. Note that in a Sentry
    authorization test, we do not use the non-owner user as the requesting
    user by default as we do in the Ranger authorization tests. Instead, we
    set the name of the requesting user to a name that is the same as the
    owner user in Ranger authorization tests to avoid the need for providing
    a customized group mapping service when instantiating a Sentry
    ResourceAuthorizationProvider as we do in AuthorizationTest.java, our
    FE tests specifically for testing authorization via Sentry.
    
    On the other hand, to re-enable the tests affected by the second change,
    we remove from the Ranger policy for all databases the allowed
    condition that grants any user the privilege of creating a database,
    which is not by default granted by Sentry. After the removal of the
    allowed codition, those tests in AuthorizationStmtTest.java and
    RangerAuditLogTest.java affected by the second change now result in the
    same authorization errors before the upgrade of Ranger.
    
    Testing:
    - Passed AuthorizationStmtTest.java in a local dev environment
    - Passed RangerAuditLogTest.java in a local dev environment
    
    Change-Id: I228533aae34b9ac03bdbbcd51a380770ff17c7f2
    Reviewed-on: http://gerrit.cloudera.org:8080/14798
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../ranger/RangerAuthorizationChecker.java         |   9 +-
 .../authorization/AuthorizationStmtTest.java       | 322 +++++++++------------
 .../authorization/AuthorizationTestBase.java       |  69 ++++-
 .../authorization/ranger/RangerAuditLogTest.java   |   8 -
 .../org/apache/impala/common/FrontendFixture.java  |  12 +
 .../org/apache/impala/common/FrontendTestBase.java |   7 +
 testdata/bin/create-load-data.sh                   |  43 ++-
 ...p.json.template => impala_group_non_owner.json} |   2 +-
 ...n.template => impala_group_owner.json.template} |   0
 ...emplate => impala_user_non_owner.json.template} |   4 +-
 ...on.template => impala_user_owner.json.template} |   2 +-
 .../cluster/ranger/setup/policy_4_revised.json     | 117 ++++++++
 12 files changed, 380 insertions(+), 215 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index 5cffa02..7d06e29 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -34,6 +34,7 @@ import org.apache.impala.authorization.PrivilegeRequest;
 import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TSessionState;
 import org.apache.impala.util.EventSequence;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
@@ -309,7 +310,13 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
 
   @Override
   public Set<String> getUserGroups(User user) throws InternalException {
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user.getShortName());
+    UserGroupInformation ugi;
+    if (RuntimeEnv.INSTANCE.isTestEnv()) {
+      ugi = UserGroupInformation.createUserForTesting(user.getShortName(),
+          new String[]{user.getShortName()});
+    } else {
+      ugi = UserGroupInformation.createRemoteUser(user.getShortName());
+    }
     return new HashSet<>(ugi.getGroups());
   }
 
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index cc4d4ec..401a333 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.impala.authorization;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -56,7 +56,6 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * This class contains authorization tests for SQL statements.
@@ -82,8 +81,12 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
   @Before
   public void before() throws ImpalaException {
     if (authzProvider_ == AuthorizationProvider.SENTRY) {
-      // Remove existing roles in order to not interfere with these tests.
-      for (TSentryRole role : sentryService_.listAllRoles(USER)) {
+      // Remove existing roles in order to not interfere with these tests. To be able to
+      // list existing roles, we have to invoke listAllRoles() as the user corresponding
+      // to User(System.getProperty("user.name")), which has the privilege to execute
+      // LIST_ROLES.
+      User user = new User(System.getProperty("user.name"));
+      for (TSentryRole role : sentryService_.listAllRoles(user)) {
         authzCatalog_.removeRole(role.getRoleName());
       }
     }
@@ -336,9 +339,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testCopyTestCasePrivileges() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Used for select *, with, and union
     Set<String> expectedAuthorizables = Sets.newHashSet(
         "functional", // For including the DB related metadata in the testcase file.
@@ -432,9 +432,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testSelect() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     for (AuthzTest authzTest: new AuthzTest[]{
         // Select a specific column on a table.
         authorize("select id from functional.alltypes"),
@@ -782,9 +779,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testInsert() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Basic insert into a table.
     for (AuthzTest test: new AuthzTest[]{
         authorize("insert into functional.zipcode_incomes(id) values('123')"),
@@ -941,9 +935,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testUseDb() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     AuthzTest test = authorize("use functional");
     for (TPrivilegeLevel privilege: TPrivilegeLevel.values()) {
       test.ok(onServer(privilege))
@@ -965,10 +956,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testTruncate() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
-    // Truncate a table.
+    // Truncate a table
     authorize("truncate table functional.alltypes")
         .ok(onServer(TPrivilegeLevel.ALL))
         .ok(onServer(TPrivilegeLevel.OWNER))
@@ -1003,9 +991,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testLoad() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Load into a table.
     authorize("load data inpath 'hdfs://localhost:20500/test-warehouse/tpch.lineitem' " +
         "into table functional.alltypes partition(month=10, year=2009)")
@@ -1082,9 +1067,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testResetMetadata() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Invalidate metadata/refresh authorization on server.
     for (AuthzTest test: new AuthzTest[]{
         authorize("invalidate metadata"),
@@ -1137,9 +1119,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testShow() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Show databases should always be allowed.
     authorize("show databases").ok();
 
@@ -1211,7 +1190,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
     authorize("show roles").ok();
 
     // Show role grant group should always be allowed.
-    authorize(String.format("show role grant group `%s`", USER.getName())).ok();
+    authorize(String.format("show role grant group `%s`", user_.getName())).ok();
 
     // Show grant role/user should always be allowed.
     try {
@@ -1311,9 +1290,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
    */
   @Test
   public void testDescribe() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Describe database.
     AuthzTest authzTest = authorize("describe database functional");
     for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
@@ -1450,7 +1426,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
         TPrivilegeLevel.SELECT))
         .error(accessError("functional.allcomplextypes"));
 
-    for (AuthzTest test: new AuthzTest[]{
+    for (AuthzTest test : new AuthzTest[]{
         // User has access to a different column.
         authorize("describe functional.allcomplextypes.int_struct_col"),
         // Insufficient privileges on complex type column, accessing member
@@ -1464,9 +1440,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testStats() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Compute stats.
     authorize("compute stats functional.alltypes")
         .ok(onServer(TPrivilegeLevel.ALL))
@@ -1549,9 +1522,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testCreateDatabase() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     for (AuthzTest test: new AuthzTest[]{
         authorize("create database newdb"),
         authorize("create database if not exists newdb")}) {
@@ -1564,23 +1534,21 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
     }
 
     // Create a database with a specific location.
-    authorize("create database newdb location " +
-        "'hdfs://localhost:20500/test-warehouse/new_location'")
+    String uri = "hdfs://localhost:20500/test-warehouse/new_location";
+    String stmt = "create database newdb location " + "'" + uri + "'";
+    authorize(stmt)
         .ok(onServer(TPrivilegeLevel.ALL))
         .ok(onServer(TPrivilegeLevel.OWNER))
-        .ok(onServer(TPrivilegeLevel.CREATE), onUri(
-            "hdfs://localhost:20500/test-warehouse/new_location", TPrivilegeLevel.ALL))
-        .ok(onServer(TPrivilegeLevel.CREATE), onUri(
-            "hdfs://localhost:20500/test-warehouse/new_location", TPrivilegeLevel.OWNER))
+        .ok(onServer(TPrivilegeLevel.CREATE), onUri(uri, TPrivilegeLevel.ALL))
+        .ok(onServer(TPrivilegeLevel.CREATE), onUri(uri, TPrivilegeLevel.OWNER))
         .error(createError("newdb"))
         .error(createError("newdb"), onServer(allExcept(TPrivilegeLevel.ALL,
-            TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)), onUri(
-            "hdfs://localhost:20500/test-warehouse/new_location", TPrivilegeLevel.ALL))
+            TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)),
+            onUri(uri, TPrivilegeLevel.ALL))
         .error(createError("newdb"), onServer(allExcept(TPrivilegeLevel.ALL,
-            TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)), onUri(
-            "hdfs://localhost:20500/test-warehouse/new_location", TPrivilegeLevel.OWNER))
-        .error(accessError("hdfs://localhost:20500/test-warehouse/new_location"),
-            onServer(TPrivilegeLevel.CREATE));
+            TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)),
+            onUri(uri, TPrivilegeLevel.OWNER))
+        .error(accessError(uri), onServer(TPrivilegeLevel.CREATE));
 
     // Database already exists.
     for (AuthzTest test: new AuthzTest[]{
@@ -1598,9 +1566,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testCreateTable() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     for (AuthzTest test: new AuthzTest[]{
         authorize("create table functional.new_table(i int)"),
         authorize("create external table functional.new_table(i int)")}) {
@@ -1834,9 +1799,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testCreateView() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     for (AuthzTest test: new AuthzTest[]{
         authorize("create view functional.new_view as " +
             "select int_col from functional.alltypes"),
@@ -1916,9 +1878,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testDropDatabase() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     for (AuthzTest test: new AuthzTest[]{
         authorize("drop database functional"),
         authorize("drop database functional cascade"),
@@ -1972,9 +1931,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testDropTable() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     authorize("drop table functional.alltypes")
         .ok(onServer(TPrivilegeLevel.ALL))
         .ok(onServer(TPrivilegeLevel.OWNER))
@@ -2038,9 +1994,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testDropView() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     authorize("drop view functional.alltypes_view")
         .ok(onServer(TPrivilegeLevel.ALL))
         .ok(onServer(TPrivilegeLevel.OWNER))
@@ -2105,9 +2058,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testAlterTable() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
     for (AuthzTest test: new AuthzTest[]{
         authorize("alter table functional.alltypes add column c1 int"),
@@ -2177,6 +2127,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
                     TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)));
         // TODO: Checking if a request is allowed by checking if grant option flag is set
         // is to Sentry.
+        // The following do not result in Ranger authorization errors.
         if (authzProvider_ == AuthorizationProvider.SENTRY) {
           test.error(accessError(true, "functional.alltypes"), onServer(
               TPrivilegeLevel.values()))
@@ -2184,6 +2135,10 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
                   TPrivilegeLevel.values()))
               .error(accessError(true, "functional.alltypes"), onTable("functional",
                   "alltypes", TPrivilegeLevel.values()));
+        } else {
+          test.ok(onServer(TPrivilegeLevel.values()));
+          test.ok(onDatabase("functional", TPrivilegeLevel.values()));
+          test.ok(onTable("functional", "alltypes", TPrivilegeLevel.values()));
         }
       }
     } finally {
@@ -2321,9 +2276,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testAlterView() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     for (AuthzTest test: new AuthzTest[] {
         authorize("alter view functional.alltypes_view as " +
             "select int_col from functional.alltypes"),
@@ -2428,6 +2380,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
                     TPrivilegeLevel.OWNER)));
         // TODO: Checking if a request is allowed by checking if grant option flag is set
         // is to Sentry.
+        // The following do not result in Ranger authorization errors.
         if (authzProvider_ == AuthorizationProvider.SENTRY) {
           test.error(accessError(true, "functional.alltypes_view"), onServer(
               TPrivilegeLevel.values()))
@@ -2435,6 +2388,10 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
                   "functional", TPrivilegeLevel.values()))
               .error(accessError(true, "functional.alltypes_view"), onTable("functional",
                   "alltypes_view", TPrivilegeLevel.values()));
+        } else {
+          test.ok(onServer(TPrivilegeLevel.values()));
+          test.ok(onDatabase("functional", TPrivilegeLevel.values()));
+          test.ok(onTable("functional", "alltypes_view", TPrivilegeLevel.values()));
         }
       }
     } finally {
@@ -2458,9 +2415,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testAlterDatabase() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     try {
       // We cannot set an owner to a role that doesn't exist
       authzCatalog_.addRole("foo");
@@ -2480,12 +2434,18 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
                 allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)));
         // TODO: Checking if a request is allowed by checking if grant option flag is set
         // is to Sentry.
+        // The following do not result in Ranger authorization errors.
         if (authzProvider_ == AuthorizationProvider.SENTRY) {
           authorize(String.format("alter database functional set owner %s foo",
               ownerType))
               .error(accessError(true, "functional"), onServer(TPrivilegeLevel.values()))
               .error(accessError(true, "functional"), onDatabase("functional",
                   TPrivilegeLevel.values()));
+        } else {
+          authorize(String.format("alter database functional set owner %s foo",
+              ownerType))
+              .ok(onServer(TPrivilegeLevel.values()))
+              .ok(onDatabase("functional", TPrivilegeLevel.values()));
         }
 
         // Database does not exist.
@@ -2510,9 +2470,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testUpdate() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Update is only supported on Kudu tables.
     for (AuthzTest test: new AuthzTest[]{
         authorize("update functional_kudu.alltypes set int_col = 1"),
@@ -2552,9 +2509,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testUpsert() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Upsert is only supported on Kudu tables.
     for (AuthzTest test: new AuthzTest[]{
         authorize("upsert into table functional_kudu.testtbl(id, name) values(1, 'a')"),
@@ -2623,9 +2577,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testDelete() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Delete is only supported on Kudu tables.
     for (AuthzTest test: new AuthzTest[]{
         authorize("delete from functional_kudu.alltypes"),
@@ -2664,9 +2615,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testCommentOn() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Comment on database.
     authorize("comment on database functional is 'comment'")
         .ok(onServer(TPrivilegeLevel.ALL))
@@ -2764,9 +2712,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testFunction() throws ImpalaException {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     // Create function.
     authorize("create function functional.f() returns int location " +
         "'/test-warehouse/libTestUdfs.so' symbol='NoArgs'")
@@ -2876,9 +2821,15 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
       options.setEnable_expr_rewrites(true);
       for (AuthzTest test: new AuthzTest[] {
           authorize("select functional.to_lower('ABCDEF')"),
-          // Also test with expression rewrite enabled.
-          authorize(createAnalysisCtx(options, authzFactory_),
-              "select functional.to_lower('ABCDEF')")}) {
+          // Also test with expression rewrite enabled. Notice that when creating an
+          // analysis context, we have to explicitly specify the requesting user
+          // corresponding to 'user_' defined in AuthorizationTestBase.java. Otherwise,
+          // an analysis context will be created with a user corresponding to
+          // User(System.getProperty("user.name")), resulting in a Sentry authorization
+          // error.
+          authorize(createAnalysisCtx(options, authzFactory_, user_.getName()),
+              "select functional.to_lower('ABCDEF')")
+      }) {
         test.ok(onServer(TPrivilegeLevel.SELECT))
             .ok(onDatabase("functional", TPrivilegeLevel.ALL))
             .ok(onDatabase("functional", TPrivilegeLevel.OWNER))
@@ -2946,7 +2897,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
           "    }\n" +
           "  ]\n" +
           "}", policyName, RANGER_SERVICE_TYPE, RANGER_SERVICE_NAME, tableName,
-          USER.getShortName());
+          user_.getShortName());
 
       try {
         // Clear existing row filter policies, otherwise they will cause different
@@ -3064,7 +3015,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
           "    }\n" +
           "  ]\n" +
           "}", policyName, RANGER_SERVICE_TYPE, RANGER_SERVICE_NAME, tableName,
-          USER.getShortName());
+          user_.getShortName());
 
       try {
         createRangerPolicy(policyName, json);
@@ -3137,99 +3088,114 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
   }
 
   /**
-   * Validates Ranger's object ownership privileges.
+   * Validates Ranger's object ownership privileges. Note that we no longer have to add a
+   * policy to the Ranger server to explicitly grant a user the access privileges of the
+   * resources if the user is the owner of the resources.
    */
   @Test
   public void testRangerObjectOwnership() throws Exception {
-    // TODO: Fix this unit test in a follow up commit.
-    assumeTrue(authzProvider_ == AuthorizationProvider.SENTRY);
-
     if (authzProvider_ == AuthorizationProvider.SENTRY) return;
-    // Out of the box there are no privileges for the owner on functional db.
-    // So the following set of queries should fail with authz failures.
-    // Maps from a query to the corresponding authz error.
-    ImmutableMap<AuthzTest, String> testQueries = ImmutableMap
-        .<AuthzTest, String>builder()
-        .put(authorize("select count(*) from functional.alltypes"),
-            selectError("functional.alltypes"))
-        .put(authorize("select id from functional.alltypes"),
-            selectError("functional.alltypes"))
-        .put(authorize("select id from functional.alltypes_view"),
-            selectError("functional.alltypes_view"))
-        .put(authorize("show create table functional.alltypes"),
-            accessError("functional.alltypes"))
-        .put(authorize("describe functional.alltypes"),
-            accessError("functional.alltypes"))
-        .put(authorize("show create table functional.alltypes_view"),
-            accessError("functional.alltypes_view"))
-        .put(authorize("describe functional.alltypes_view"),
-            accessError("functional.alltypes_view"))
-        .put(authorize("describe functional.allcomplextypes.int_struct_col"),
-            accessError("functional.allcomplextypes"))
-        .put(authorize("refresh functional.alltypes"),
-            refreshError("functional.alltypes"))
-        .put(authorize("invalidate metadata functional.alltypes"),
-            refreshError("functional.alltypes"))
-        .put(authorize("compute stats functional.alltypes"),
-            alterError("functional.alltypes"))
-        .put(authorize("drop stats functional.alltypes"),
-            alterError("functional.alltypes"))
-        .put(authorize("create table functional.test_tbl(a int)"),
-            createError("functional"))
-        .put(authorize("create table functional.test_tbl like functional.alltypes"),
-            accessError("functional.alltypes"))
-        .put(authorize("create table functional.test_tbl as select 1"),
-            createError("functional"))
-        .put(authorize("create view functional.test_view as select 1"),
-            createError("functional"))
-        .put(authorize("alter table functional.alltypes add column c1 int"),
-            alterError("functional"))
-        .put(authorize("drop table functional.alltypes"),
-            dropError("functional"))
-        .put(authorize("drop view functional.alltypes_view"),
-            dropError("functional"))
-        .put(authorize("alter view functional.alltypes_view as select 1"),
-            alterError("functional.alltypes_view"))
-        .put(authorize("alter database functional set owner user foo"),
-            accessError(true, "functional"))
+
+    // 'as_owner_' is by default set to false for AuthorizationTestBase. But since this
+    // test is meant for testing Ranger's behavior when the requesting user is the owner
+    // of the resources, we set 'as_owner_' to true.
+    as_owner_ = true;
+    TQueryOptions options = new TQueryOptions();
+
+    ImmutableSet<AuthzTest> testQueries = ImmutableSet
+        .<AuthzTest>builder()
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "select count(*) from functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "select id from functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "select id from functional.alltypes_view"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "show create table functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "describe functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "show create table functional.alltypes_view"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "describe functional.alltypes_view"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "describe functional.allcomplextypes.int_struct_col"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "refresh functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "invalidate metadata functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "compute stats functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "drop stats functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "create table functional.test_tbl(a int)"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "create table functional.test_tbl like functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "create table functional.test_tbl as select 1"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "create view functional.test_view as select 1"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "alter table functional.alltypes add column c1 int"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "drop table functional.alltypes"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "drop view functional.alltypes_view"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "alter view functional.alltypes_view as select 1"))
+        .add(authorize(createAnalysisCtx(options, authzFactory_,
+            OWNER_USER.getName()),
+            "alter database functional set owner user foo"))
         .build();
     // Run the queries.
-    for (AuthzTest authz: testQueries.keySet()) authz.error(testQueries.get(authz));
-    // Grant ALL privileges on functional db to it's owner. All the above queries
-    // should be authorized now, since we are running as the owner of the db and
-    // ownership should be translated to the tables underneath.
-    String policyName = "functional_owner_" + TestUtils.getRandomString(5);
-    createOwnerPolicy(policyName, "ALL", "functional", "*", "*");
-    try {
-      rangerImpalaPlugin_.refreshPoliciesAndTags();
-      for (AuthzTest authz: testQueries.keySet()) authz.ok();
-    } finally {
-      deleteRangerPolicy(policyName);
-    }
-    rangerImpalaPlugin_.refreshPoliciesAndTags();
+    for (AuthzTest authz: testQueries) authz.ok();
     // Tests for more fine grained {OWNER} privileges.
     //
     // SELECT privilege.
-    // With default privileges, select on both alltypes/alltypes_view should fail.
-    authorize("select count(*) from functional.alltypes")
-        .error(selectError("functional.alltypes"));
-    authorize("select count(*) from functional.alltypes")
-        .error(selectError("functional.alltypes"));
-    policyName = "functional_owner_alltypes" + TestUtils.getRandomString(5);
-    createOwnerPolicy(policyName, "SELECT", "functional", "alltypes", "*");
-    rangerImpalaPlugin_.refreshPoliciesAndTags();
-    // With the new privileges, only the first query should pass. Also,
-    // any other non-SELECT on functional.alltypes should fail.
+    authorize(createAnalysisCtx(options, authzFactory_,
+        OWNER_USER.getName()),
+        "select count(*) from functional.alltypes").ok();
+    authorize(createAnalysisCtx(options, authzFactory_,
+        OWNER_USER.getName()),
+        "select count(*) from functional.alltypes_view").ok();
+
+    // The owner is granted all privileges in the following by default.
     try {
-      authorize("select count(*) from functional.alltypes").ok();
-      authorize("alter table functional.alltypes add column c1 int")
-          .error(alterError("functional"));
-      authorize("drop table functional.alltypes")
-          .error(dropError("functional"));
-      authorize("select count(*) from functional.alltypes_view")
-          .error(selectError("functional.alltypes_view"));
+      authorize(createAnalysisCtx(options, authzFactory_,
+          OWNER_USER.getName()),
+          "select count(*) from functional.alltypes").ok();
+      authorize(createAnalysisCtx(options, authzFactory_,
+          OWNER_USER.getName()),
+          "alter table functional.alltypes add column c1 int").ok();
+      authorize(createAnalysisCtx(options, authzFactory_,
+          OWNER_USER.getName()),
+          "drop table functional.alltypes").ok();
+      authorize(createAnalysisCtx(options, authzFactory_,
+          OWNER_USER.getName()),
+          "select count(*) from functional.alltypes_view").ok();
     } finally {
-      deleteRangerPolicy(policyName);
+      as_owner_ = false;
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
index 35aca0a..47482ba 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
@@ -77,12 +77,34 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
   protected static final String RANGER_USER = "admin";
   protected static final String RANGER_PASSWORD = "admin";
   protected static final String SERVER_NAME = "server1";
-  protected static final User USER = new User(System.getProperty("user.name"));
+
+  // For the Ranger tests, 'OWNER_USER' is used to denote a requesting user that is
+  // the owner of the resource.
+  protected static final User OWNER_USER = new User(System.getProperty("user.name"));
+  // For the Ranger tests, 'GROUPS' is used to denote the name of the group where a
+  // non-owner is associated with.
+  protected static final List<String> GROUPS = Collections.singletonList("non_owner");
+  // For the Ranger tests, 'OWNER_GROUPS' is used to denote the name of the group where
+  // an owner is associated with.
+  protected static final List<String> OWNER_GROUPS =
+      Collections.singletonList(System.getProperty("user.name"));
+
   protected static final String RANGER_SERVICE_TYPE = "hive";
   protected static final String RANGER_SERVICE_NAME = "test_impala";
   protected static final String RANGER_APP_ID = "impala";
   protected static final User RANGER_ADMIN = new User("admin");
 
+  // For the Ranger tests, 'user_' is used to denote a requesting user that is not the
+  // owner of the resource. Note that we defer the setup of 'user_' to the constructor
+  // and assign a different name for each authorization provider to avoid the need for
+  // providing a customized group mapping service when instantiating a Sentry
+  // ResourceAuthorizationProvider as we do in TestShortUsernameWithAuthToLocal() of
+  // AuthorizationTest.java.
+  protected static User user_ = null;
+  // For the Ranger tests, 'as_owner_' is used to indicate whether or not the requesting
+  // user in a test query is the owner of the resource.
+  protected static boolean as_owner_ = false;
+
   protected final AuthorizationConfig authzConfig_;
   protected final AuthorizationFactory authzFactory_;
   protected final AuthorizationProvider authzProvider_;
@@ -98,11 +120,12 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
     authzProvider_ = authzProvider;
     switch (authzProvider) {
       case SENTRY:
+        user_ = new User(System.getProperty("user.name"));
         authzConfig_ = SentryAuthorizationConfig.createHadoopGroupAuthConfig(
             "server1",
             System.getenv("IMPALA_HOME") + "/fe/src/test/resources/sentry-site.xml");
         authzFactory_ = createAuthorizationFactory(authzProvider);
-        authzCtx_ = createAnalysisCtx(authzFactory_, USER.getName());
+        authzCtx_ = createAnalysisCtx(authzFactory_, user_.getName());
         authzCatalog_ = new ImpaladTestCatalog(authzFactory_);
         authzFrontend_ = new Frontend(authzFactory_, authzCatalog_);
         sentryService_ = new SentryPolicyService(
@@ -111,10 +134,11 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
         rangerRestClient_ = null;
         break;
       case RANGER:
+        user_ = new User("non_owner");
         authzConfig_ = new RangerAuthorizationConfig(RANGER_SERVICE_TYPE, RANGER_APP_ID,
             SERVER_NAME);
         authzFactory_ = createAuthorizationFactory(authzProvider);
-        authzCtx_ = createAnalysisCtx(authzFactory_, USER.getName());
+        authzCtx_ = createAnalysisCtx(authzFactory_, user_.getName());
         authzCatalog_ = new ImpaladTestCatalog(authzFactory_);
         authzFrontend_ = new Frontend(authzFactory_, authzCatalog_);
         rangerImpalaPlugin_ =
@@ -146,11 +170,11 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
 
   protected abstract class WithSentryPrincipal implements WithPrincipal {
     protected final String role_ = "authz_test_role";
-    protected final String user_ = USER.getName();
+    protected final String sentry_user_ = user_.getName();
 
     protected void createRole(TPrivilege[]... privileges) throws ImpalaException {
       Role role = authzCatalog_.addRole(role_);
-      authzCatalog_.addRoleGrantGroup(role_, USER.getName());
+      authzCatalog_.addRoleGrantGroup(role_, sentry_user_);
       for (TPrivilege[] privs: privileges) {
         for (TPrivilege privilege: privs) {
           privilege.setPrincipal_id(role.getId());
@@ -161,12 +185,12 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
     }
 
     protected void createUser(TPrivilege[]... privileges) throws ImpalaException {
-      org.apache.impala.catalog.User user = authzCatalog_.addUser(user_);
+      org.apache.impala.catalog.User user = authzCatalog_.addUser(sentry_user_);
       for (TPrivilege[] privs: privileges) {
         for (TPrivilege privilege: privs) {
           privilege.setPrincipal_id(user.getId());
           privilege.setPrincipal_type(TPrincipalType.USER);
-          authzCatalog_.addUserPrivilege(user_, privilege);
+          authzCatalog_.addUserPrivilege(sentry_user_, privilege);
         }
       }
     }
@@ -176,7 +200,7 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
     }
 
     protected void dropUser() throws ImpalaException {
-      authzCatalog_.removeUser(user_);
+      authzCatalog_.removeUser(sentry_user_);
     }
   }
 
@@ -190,7 +214,7 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
     public void cleanUp() throws ImpalaException { dropUser(); }
 
     @Override
-    public String getName() { return user_; }
+    public String getName() { return sentry_user_; }
   }
 
   public class WithSentryRole extends WithSentryPrincipal {
@@ -239,15 +263,27 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
       authzManager.revokePrivilege(grants, "", "127.0.0.1");
     }
 
+    /**
+     * Depending on whether or not the principal is the owner of the resource, we return
+     * either 'OWNER_USER' or 'user_'. This function is used in authzOk() and
+     * anthzError().
+     */
     @Override
-    public String getName() { return USER.getName(); }
+    public String getName() {
+      return (as_owner_ ? OWNER_USER.getName() : user_.getName());
+    }
   }
 
   public class WithRangerUser extends WithRanger {
     @Override
     protected List<GrantRevokeRequest> buildRequest(List<TPrivilege> privileges) {
       return RangerCatalogdAuthorizationManager.createGrantRevokeRequests(
-          RANGER_ADMIN.getName(), true, USER.getName(), Collections.emptyList(),
+          RANGER_ADMIN.getName(), true,
+          // We provide the name of the grantee, which is a user in this case, according
+          // to whether or not we test the query with the requesting user that is the
+          // owner of the resource.
+          getName(),
+          Collections.emptyList(),
           rangerImpalaPlugin_.getClusterName(), "127.0.0.1", privileges);
     }
   }
@@ -255,10 +291,13 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
   public class WithRangerGroup extends WithRanger {
     @Override
     protected List<GrantRevokeRequest> buildRequest(List<TPrivilege> privileges) {
-      List<String> groups = Collections.singletonList(System.getProperty("user.name"));
-
       return RangerCatalogdAuthorizationManager.createGrantRevokeRequests(
-          RANGER_ADMIN.getName(), true, null, groups,
+          RANGER_ADMIN.getName(), true, null,
+          // We provide the name of the grantee, which is a group in this case, according
+          // to whether or not we test the query with the requesting user that is the
+          // owner of the resource.
+          (as_owner_ ? OWNER_GROUPS : GROUPS),
+          // groups,
           rangerImpalaPlugin_.getClusterName(), "127.0.0.1", privileges);
     }
   }
@@ -312,7 +351,7 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
               excludedStrings_.length != 0,
           "One or both of included or excluded strings must be defined.");
       List<String> result = resultToStringList(authzFrontend_.describeTable(table,
-          outputStyle_, USER));
+          outputStyle_, user_));
       for (String str: includedStrings_) {
         assertTrue(String.format("\"%s\" is not in the describe output.\n" +
                 "Expected : %s\n Actual   : %s", str, Arrays.toString(includedStrings_),
diff --git a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
index 16cb0d2..df90829 100644
--- a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
@@ -59,10 +59,6 @@ public class RangerAuditLogTest extends AuthorizationTestBase {
     super(AuthorizationProvider.RANGER);
   }
 
-  /**
-   * TODO: Fix this unit test in a follow up commit.
-   */
-  @Ignore("IMPALA-9047")
   @Test
   public void testAuditLogSuccess() throws ImpalaException {
     authzOk(events -> {
@@ -150,10 +146,6 @@ public class RangerAuditLogTest extends AuthorizationTestBase {
         onTable("functional", "alltypes", TPrivilegeLevel.SELECT));
   }
 
-  /**
-   * TODO: Fix this unit test in a follow up commit.
-   */
-  @Ignore("IMPALA-9047")
   @Test
   public void testAuditLogFailure() throws ImpalaException {
     authzError(events -> {
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
index de1e9d8..064eb6b 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
@@ -319,6 +319,18 @@ public class FrontendFixture {
     return analysisCtx;
   }
 
+  // This function is only called by createAnalysisCtx() in FrontendTestBase.java and
+  // allows us to specify the requesting user when creating an analysis context
+  // associated with an authorization request.
+  public AnalysisContext createAnalysisCtx(TQueryOptions queryOptions,
+      AuthorizationFactory authzFactory, String user) {
+    TQueryCtx queryCtx = TestUtils.createQueryContext(Catalog.DEFAULT_DB, user);
+    queryCtx.client_request.query_options = queryOptions;
+    EventSequence timeline = new EventSequence("Frontend Test Timeline");
+    AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory, timeline);
+    return analysisCtx;
+  }
+
   public AnalysisContext createAnalysisCtx(AuthorizationFactory authzFactory) {
     return createAnalysisCtx(authzFactory, System.getProperty("user.name"));
   }
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 71285eb..64d7807 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -214,6 +214,13 @@ public class FrontendTestBase extends AbstractFrontendTest {
     return feFixture_.createAnalysisCtx(queryOptions, authzFactory);
   }
 
+  // This function allows us to specify the requesting user when creating an
+  // analysis context associated with an authorization request.
+  protected AnalysisContext createAnalysisCtx(TQueryOptions queryOptions,
+      AuthorizationFactory authzFactory, String user) {
+    return feFixture_.createAnalysisCtx(queryOptions, authzFactory, user);
+  }
+
   protected AnalysisContext createAnalysisCtx(AuthorizationFactory authzFactory) {
     return feFixture_.createAnalysisCtx(authzFactory);
   }
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 6488851..65d18e3 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -322,27 +322,48 @@ function setup-ranger {
   RANGER_SETUP_DIR="${IMPALA_HOME}/testdata/cluster/ranger/setup"
 
   perl -wpl -e 's/\$\{([^}]+)\}/defined $ENV{$1} ? $ENV{$1} : $&/eg' \
-    "${RANGER_SETUP_DIR}/impala_group.json.template" > \
-    "${RANGER_SETUP_DIR}/impala_group.json"
+    "${RANGER_SETUP_DIR}/impala_group_owner.json.template" > \
+    "${RANGER_SETUP_DIR}/impala_group_owner.json"
 
-  export GROUP_ID=$(wget -qO - --auth-no-challenge --user=admin --password=admin \
-    --post-file="${RANGER_SETUP_DIR}/impala_group.json" \
+  export GROUP_ID_OWNER=$(wget -qO - --auth-no-challenge --user=admin --password=admin \
+    --post-file="${RANGER_SETUP_DIR}/impala_group_owner.json" \
     --header="accept:application/json" \
     --header="Content-Type:application/json" \
     http://localhost:6080/service/xusers/secure/groups |
     python -c "import sys, json; print json.load(sys.stdin)['id']")
 
+  export GROUP_ID_NON_OWNER=$(wget -qO - --auth-no-challenge --user=admin \
+    --password=admin --post-file="${RANGER_SETUP_DIR}/impala_group_non_owner.json" \
+    --header="accept:application/json" \
+    --header="Content-Type:application/json" \
+    http://localhost:6080/service/xusers/secure/groups |
+    python -c "import sys, json; print json.load(sys.stdin)['id']")
+
+  perl -wpl -e 's/\$\{([^}]+)\}/defined $ENV{$1} ? $ENV{$1} : $&/eg' \
+    "${RANGER_SETUP_DIR}/impala_user_owner.json.template" > \
+    "${RANGER_SETUP_DIR}/impala_user_owner.json"
+
   perl -wpl -e 's/\$\{([^}]+)\}/defined $ENV{$1} ? $ENV{$1} : $&/eg' \
-    "${RANGER_SETUP_DIR}/impala_user.json.template" > \
-    "${RANGER_SETUP_DIR}/impala_user.json"
+    "${RANGER_SETUP_DIR}/impala_user_non_owner.json.template" > \
+    "${RANGER_SETUP_DIR}/impala_user_non_owner.json"
 
-  if grep "\${[A-Z_]*}" "${RANGER_SETUP_DIR}/impala_user.json"; then
-    echo "Found undefined variables in ${RANGER_SETUP_DIR}/impala_user.json."
+  if grep "\${[A-Z_]*}" "${RANGER_SETUP_DIR}/impala_user_owner.json"; then
+    echo "Found undefined variables in ${RANGER_SETUP_DIR}/impala_user_owner.json."
+    exit 1
+  fi
+
+  if grep "\${[A-Z_]*}" "${RANGER_SETUP_DIR}/impala_user_non_owner.json"; then
+    echo "Found undefined variables in ${RANGER_SETUP_DIR}/impala_user_non_owner.json."
     exit 1
   fi
 
   wget -O /dev/null --auth-no-challenge --user=admin --password=admin \
-    --post-file="${RANGER_SETUP_DIR}/impala_user.json" \
+    --post-file="${RANGER_SETUP_DIR}/impala_user_owner.json" \
+    --header="Content-Type:application/json" \
+    http://localhost:6080/service/xusers/secure/users
+
+  wget -O /dev/null --auth-no-challenge --user=admin --password=admin \
+    --post-file="${RANGER_SETUP_DIR}/impala_user_non_owner.json" \
     --header="Content-Type:application/json" \
     http://localhost:6080/service/xusers/secure/users
 
@@ -350,6 +371,10 @@ function setup-ranger {
     --post-file="${RANGER_SETUP_DIR}/impala_service.json" \
     --header="Content-Type:application/json" \
     http://localhost:6080/service/public/v2/api/service
+
+  curl -u admin:admin -H "Accept: application/json" -H "Content-Type: application/json" \
+    -X PUT http://localhost:6080/service/public/v2/api/policy/4 \
+    -d @"${RANGER_SETUP_DIR}/policy_4_revised.json"
 }
 
 function copy-and-load-dependent-tables {
diff --git a/testdata/cluster/ranger/setup/impala_group.json.template b/testdata/cluster/ranger/setup/impala_group_non_owner.json
similarity index 51%
copy from testdata/cluster/ranger/setup/impala_group.json.template
copy to testdata/cluster/ranger/setup/impala_group_non_owner.json
index f1083cf..f1d786c 100644
--- a/testdata/cluster/ranger/setup/impala_group.json.template
+++ b/testdata/cluster/ranger/setup/impala_group_non_owner.json
@@ -1,4 +1,4 @@
 {
-  "name" : "${USER}",
+  "name" : "non_owner",
   "description" : ""
 }
diff --git a/testdata/cluster/ranger/setup/impala_group.json.template b/testdata/cluster/ranger/setup/impala_group_owner.json.template
similarity index 100%
rename from testdata/cluster/ranger/setup/impala_group.json.template
rename to testdata/cluster/ranger/setup/impala_group_owner.json.template
diff --git a/testdata/cluster/ranger/setup/impala_user.json.template b/testdata/cluster/ranger/setup/impala_user_non_owner.json.template
similarity index 50%
copy from testdata/cluster/ranger/setup/impala_user.json.template
copy to testdata/cluster/ranger/setup/impala_user_non_owner.json.template
index 02bad30..76f54b0 100644
--- a/testdata/cluster/ranger/setup/impala_user.json.template
+++ b/testdata/cluster/ranger/setup/impala_user_non_owner.json.template
@@ -1,6 +1,6 @@
 {
-  "name" : "${USER}",
+  "name" : "non_owner",
   "password" : "password123",
   "userRoleList" : [ "ROLE_USER" ],
-  "groupIdList" : [ "${GROUP_ID}" ]
+  "groupIdList" : [ "${GROUP_ID_NON_OWNER}" ]
 }
diff --git a/testdata/cluster/ranger/setup/impala_user.json.template b/testdata/cluster/ranger/setup/impala_user_owner.json.template
similarity index 68%
rename from testdata/cluster/ranger/setup/impala_user.json.template
rename to testdata/cluster/ranger/setup/impala_user_owner.json.template
index 02bad30..36f2d3e 100644
--- a/testdata/cluster/ranger/setup/impala_user.json.template
+++ b/testdata/cluster/ranger/setup/impala_user_owner.json.template
@@ -2,5 +2,5 @@
   "name" : "${USER}",
   "password" : "password123",
   "userRoleList" : [ "ROLE_USER" ],
-  "groupIdList" : [ "${GROUP_ID}" ]
+  "groupIdList" : [ "${GROUP_ID_OWNER}" ]
 }
diff --git a/testdata/cluster/ranger/setup/policy_4_revised.json b/testdata/cluster/ranger/setup/policy_4_revised.json
new file mode 100644
index 0000000..5ef889e
--- /dev/null
+++ b/testdata/cluster/ranger/setup/policy_4_revised.json
@@ -0,0 +1,117 @@
+{
+    "allowExceptions": [],
+    "createdBy": "Admin",
+    "dataMaskPolicyItems": [],
+    "denyExceptions": [],
+    "denyPolicyItems": [],
+    "description": "Policy for all - database",
+    "id": 4,
+    "isAuditEnabled": true,
+    "isDenyAllElse": false,
+    "isEnabled": true,
+    "name": "all - database",
+    "options": {},
+    "policyItems": [
+        {
+            "accesses": [
+                {
+                    "isAllowed": true,
+                    "type": "select"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "update"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "create"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "drop"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "alter"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "index"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "lock"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "all"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "read"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "write"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "repladmin"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "serviceadmin"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "tempudfadmin"
+                },
+                {
+                    "isAllowed": true,
+                    "type": "refresh"
+                }
+            ],
+            "conditions": [],
+            "delegateAdmin": true,
+            "groups": [],
+            "roles": [],
+            "users": [
+                "admin"
+            ]
+        },
+        {
+            "accesses": [
+                {
+                    "isAllowed": true,
+                    "type": "all"
+                }
+            ],
+            "conditions": [],
+            "delegateAdmin": true,
+            "groups": [],
+            "roles": [],
+            "users": [
+                "{OWNER}"
+            ]
+        }
+    ],
+    "policyLabels": [],
+    "policyPriority": 0,
+    "policyType": 0,
+    "resources": {
+        "database": {
+            "isExcludes": false,
+            "isRecursive": false,
+            "values": [
+                "*"
+            ]
+        }
+    },
+    "rowFilterPolicyItems": [],
+    "service": "test_impala",
+    "serviceType": "hive",
+    "updatedBy": "Admin",
+    "validitySchedules": [],
+    "version": 1,
+    "zoneName": ""
+}