You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/05 03:35:06 UTC
[kudu] branch master updated: KUDU-2972 Add Ranger client
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new e13fd4a KUDU-2972 Add Ranger client
e13fd4a is described below
commit e13fd4a489bb252ae80f1a2d8dd39991cfdc2a88
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Thu Mar 5 00:38:13 2020 +0100
KUDU-2972 Add Ranger client
Adds a Ranger client that communicates with the Java subprocess that
wraps the Ranger plugin. It can be used by an authorization provider to
translate between Kudu and Ranger.
Removes default_database out parameter from ParseRangerTableIdentifier
in table_util as we decided to take a slightly different path and this
information is not needed anymore.
Current test coverage consists of only unit tests with a mock subprocess
server. Real integration tests will be added in a follow-up patch once a
working MiniRanger is added to facilitate this testing.
Change-Id: Ie2e1ec19ed3aeb4d82ad38fe1fb655f57021c1a4
Reviewed-on: http://gerrit.cloudera.org:8080/15206
Reviewed-by: Hao Hao <ha...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/common/table_util-test.cc | 56 ++----
src/kudu/common/table_util.cc | 4 +-
src/kudu/common/table_util.h | 17 +-
src/kudu/ranger/CMakeLists.txt | 23 +++
src/kudu/ranger/ranger.proto | 10 ++
src/kudu/ranger/ranger_client-test.cc | 305 ++++++++++++++++++++++++++++++++
src/kudu/ranger/ranger_client.cc | 314 +++++++++++++++++++++++++++++++++
src/kudu/ranger/ranger_client.h | 108 ++++++++++++
src/kudu/subprocess/server.cc | 23 ++-
src/kudu/subprocess/server.h | 8 +-
src/kudu/subprocess/subprocess_proxy.h | 13 +-
src/kudu/util/subprocess.cc | 9 +
src/kudu/util/subprocess.h | 2 +
13 files changed, 832 insertions(+), 60 deletions(-)
diff --git a/src/kudu/common/table_util-test.cc b/src/kudu/common/table_util-test.cc
index f443205..a7b52e6 100644
--- a/src/kudu/common/table_util-test.cc
+++ b/src/kudu/common/table_util-test.cc
@@ -66,80 +66,62 @@ TEST(TestTableUtil, TestRangerTableIdentifier) {
string db;
Slice tbl;
string table;
- bool default_database;
table = "foo.bar";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("foo", db);
EXPECT_EQ("bar", tbl);
- EXPECT_FALSE(default_database);
table = "99bottles.my_awesome/table/22";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("99bottles", db);
EXPECT_EQ("my_awesome/table/22", tbl);
- EXPECT_FALSE(default_database);
table = "99/bottles.my_awesome/table/22";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("99/bottles", db);
EXPECT_EQ("my_awesome/table/22", tbl);
- EXPECT_FALSE(default_database);
table = "_leading_underscore.trailing_underscore_";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("_leading_underscore", db);
EXPECT_EQ("trailing_underscore_", tbl);
- EXPECT_FALSE(default_database);
table = "foo";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("default", db);
EXPECT_EQ("foo", tbl);
- EXPECT_TRUE(default_database);
table = "default.foo";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("default", db);
EXPECT_EQ("foo", tbl);
- EXPECT_FALSE(default_database);
table = "lots.of.tables";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("lots", db);
EXPECT_EQ("of.tables", tbl);
- EXPECT_FALSE(default_database);
table = "db_name..table_name";
- EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl, &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier(table, &db, &tbl));
EXPECT_EQ("db_name", db);
EXPECT_EQ(".table_name", tbl);
- EXPECT_FALSE(default_database);
- EXPECT_TRUE(ParseRangerTableIdentifier("", &db, &tbl, &default_database)
+ EXPECT_TRUE(ParseRangerTableIdentifier("", &db, &tbl)
.IsInvalidArgument());
- EXPECT_TRUE(ParseRangerTableIdentifier(".", &db, &tbl, &default_database)
+ EXPECT_TRUE(ParseRangerTableIdentifier(".", &db, &tbl)
.IsInvalidArgument());
- EXPECT_OK(ParseRangerTableIdentifier("no_table", &db, &tbl,
- &default_database));
- EXPECT_OK(ParseRangerTableIdentifier("lots.of.tables", &db, &tbl,
- &default_database));
- EXPECT_TRUE(ParseRangerTableIdentifier("no_table.", &db, &tbl,
- &default_database)
+ EXPECT_OK(ParseRangerTableIdentifier("no_table", &db, &tbl));
+ EXPECT_OK(ParseRangerTableIdentifier("lots.of.tables", &db, &tbl));
+ EXPECT_TRUE(ParseRangerTableIdentifier("no_table.", &db, &tbl)
.IsInvalidArgument());
- EXPECT_TRUE(ParseRangerTableIdentifier(".no_database", &db, &tbl,
- &default_database)
+ EXPECT_TRUE(ParseRangerTableIdentifier(".no_database", &db, &tbl)
.IsInvalidArgument());
- EXPECT_OK(ParseRangerTableIdentifier("punctuation?.yes", &db, &tbl,
- &default_database));
- EXPECT_OK(ParseRangerTableIdentifier("white space.yes", &db, &tbl,
- &default_database));
- EXPECT_OK(ParseRangerTableIdentifier("unicode☃tables.yes", &db, &tbl,
- &default_database));
- EXPECT_OK(ParseRangerTableIdentifier("unicode.☃tables.yes", &db, &tbl,
- &default_database));
- EXPECT_OK(ParseRangerTableIdentifier(string("\0.\0", 3), &db, &tbl,
- &default_database));
+ EXPECT_OK(ParseRangerTableIdentifier("punctuation?.yes", &db, &tbl));
+ EXPECT_OK(ParseRangerTableIdentifier("white space.yes", &db, &tbl));
+ EXPECT_OK(ParseRangerTableIdentifier("unicode☃tables.yes", &db, &tbl));
+ EXPECT_OK(ParseRangerTableIdentifier("unicode.☃tables.yes", &db, &tbl));
+ EXPECT_OK(ParseRangerTableIdentifier(string("\0.\0", 3), &db, &tbl));
}
} // namespace kudu
diff --git a/src/kudu/common/table_util.cc b/src/kudu/common/table_util.cc
index 56db851..0ae0821 100644
--- a/src/kudu/common/table_util.cc
+++ b/src/kudu/common/table_util.cc
@@ -80,8 +80,7 @@ Status ParseHiveTableIdentifier(const string& table_name,
Status ParseRangerTableIdentifier(const string& table_name,
string* ranger_database,
- Slice* ranger_table,
- bool* default_database) {
+ Slice* ranger_table) {
auto separator_idx = boost::make_optional<int>(false, 0);
for (int idx = 0; idx < table_name.size(); ++idx) {
char c = table_name[idx];
@@ -102,7 +101,6 @@ Status ParseRangerTableIdentifier(const string& table_name,
*ranger_database = FLAGS_ranger_default_database;
*ranger_table = Slice(table_name.data());
}
- *default_database = !separator_idx;
if (ranger_table->empty()) {
return Status::InvalidArgument(kInvalidRangerTableError, table_name);
diff --git a/src/kudu/common/table_util.h b/src/kudu/common/table_util.h
index a9d2e8a..e0dee2a 100644
--- a/src/kudu/common/table_util.h
+++ b/src/kudu/common/table_util.h
@@ -37,17 +37,18 @@ Status ParseHiveTableIdentifier(const std::string& table_name,
Slice* hms_database,
Slice* hms_table) WARN_UNUSED_RESULT;
-// Parses a Kudu table name of the form '<database>.<table>' into a
-// Ranger database and table name. If the table name doesn't contain a period it
+// Parses a Kudu table name of the form '<database>.<table>' into a Ranger
+// database and table name. If the table name doesn't contain a period it
// defaults to a configurable default database name. If there are multiple
// periods in the table name the first one will separate the database name from
-// the table name. The returned 'default_database' bool indicates if the default
-// database name was used (if a database name is provided in the table name but
-// it is the same as the default database it will be false). The returned
-// 'ranger_table' slice must not outlive 'table_name'.
+// the table name. The returned 'ranger_table' slice must not outlive
+// 'table_name'.
+//
+// Returns InvalidArgument if the table name doesn't conform the rules, i.e.
+// begins with a period, the only period is at the end of the string, or is an
+// empty string.
Status ParseRangerTableIdentifier(const std::string& table_name,
std::string* ranger_database,
- Slice* ranger_table,
- bool* default_database) WARN_UNUSED_RESULT;
+ Slice* ranger_table) WARN_UNUSED_RESULT;
} // namespace kudu
diff --git a/src/kudu/ranger/CMakeLists.txt b/src/kudu/ranger/CMakeLists.txt
index 6a21976..d735f33 100644
--- a/src/kudu/ranger/CMakeLists.txt
+++ b/src/kudu/ranger/CMakeLists.txt
@@ -31,3 +31,26 @@ add_library(ranger_proto
target_link_libraries(ranger_proto
protobuf
)
+
+##############################
+# kudu_ranger
+##############################
+
+set(RANGER_SRCS
+ ranger_client.cc)
+set(RANGER_DEPS
+ gflags
+ kudu_subprocess
+ ranger_proto)
+
+add_library(kudu_ranger ${RANGER_SRCS})
+target_link_libraries(kudu_ranger ${RANGER_DEPS})
+
+#######################################
+# Unit tests
+#######################################
+
+SET_KUDU_TEST_LINK_LIBS(
+ kudu_ranger)
+
+ADD_KUDU_TEST(ranger_client-test)
diff --git a/src/kudu/ranger/ranger.proto b/src/kudu/ranger/ranger.proto
index 4848ea3..939f3b0 100644
--- a/src/kudu/ranger/ranger.proto
+++ b/src/kudu/ranger/ranger.proto
@@ -19,6 +19,16 @@ package kudu.ranger;
option java_package = "org.apache.kudu.ranger";
// Describes the type of action that can be performed in Ranger.
+//
+// SQL-like action types used by Ranger. ALL implies all other privilege types
+// and all privilege types imply METADATA. METADATA is used for discovery
+// (listing tables).
+//
+// The action type mapping is similar to the one in Sentry which was implemented
+// before Ranger and the same privileges have to be enforced with both
+// authorization providers.
+//
+// For more information on fine grained authz check out docs/security.adoc
enum ActionPB {
SELECT = 0;
INSERT = 1;
diff --git a/src/kudu/ranger/ranger_client-test.cc b/src/kudu/ranger/ranger_client-test.cc
new file mode 100644
index 0000000..4213593
--- /dev/null
+++ b/src/kudu/ranger/ranger_client-test.cc
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/ranger/ranger_client.h"
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <utility>
+
+#include <boost/functional/hash/hash.hpp>
+#include <glog/logging.h>
+#include <google/protobuf/any.pb.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/ranger/ranger.pb.h"
+#include "kudu/subprocess/server.h"
+#include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace ranger {
+
+using boost::hash_combine;
+using kudu::subprocess::SubprocessRequestPB;
+using kudu::subprocess::SubprocessResponsePB;
+using kudu::subprocess::SubprocessServer;
+using std::move;
+using std::string;
+using std::unordered_set;
+using strings::Substitute;
+
+struct AuthorizedAction {
+ string user_name;
+ ActionPB action;
+ string database_name;
+ string table_name;
+ string column_name;
+
+ bool operator==(const AuthorizedAction& rhs) const {
+ return user_name == rhs.user_name &&
+ action == rhs.action &&
+ database_name == rhs.database_name &&
+ table_name == rhs.table_name &&
+ column_name == rhs.column_name;
+ }
+};
+
+struct AuthorizedActionHash {
+ public:
+ size_t operator()(const AuthorizedAction& action) const {
+ size_t seed = 0;
+ hash_combine(seed, action.action);
+ hash_combine(seed, action.column_name);
+ hash_combine(seed, action.database_name);
+ hash_combine(seed, action.table_name);
+ hash_combine(seed, action.user_name);
+
+ return seed;
+ }
+};
+
+class MockSubprocessServer : public SubprocessServer {
+ public:
+ unordered_set<AuthorizedAction, AuthorizedActionHash> next_response_;
+
+ Status Init() override {
+ // don't want to start anything
+ return Status::OK();
+ }
+
+ ~MockSubprocessServer() override {}
+
+ MockSubprocessServer() :
+ SubprocessServer({"mock"}) {}
+
+ Status Execute(SubprocessRequestPB* req,
+ SubprocessResponsePB* resp) override {
+ RangerRequestListPB req_list;
+ CHECK(req->request().UnpackTo(&req_list));
+
+ RangerResponseListPB resp_list;
+
+ for (const RangerRequestPB& req : req_list.requests()) {
+ AuthorizedAction action;
+ action.user_name = req_list.user();
+ action.action = req.action();
+ action.table_name = req.table();
+ action.database_name = req.database();
+ action.column_name = req.has_column() ? req.column() : "";
+
+ auto r = resp_list.add_responses();
+ r->set_allowed(ContainsKey(next_response_, action));
+ }
+
+ resp->mutable_response()->PackFrom(resp_list);
+
+ return Status::OK();
+ }
+};
+
+class RangerClientTest : public KuduTest {
+ public:
+ RangerClientTest() :
+ client_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "ranger_client-test")) {}
+
+ void SetUp() override {
+ std::unique_ptr<MockSubprocessServer> server(new MockSubprocessServer());
+ next_response_ = &server->next_response_;
+ client_.ReplaceServerForTests(std::move(server));
+ }
+
+ void Allow(string user_name, ActionPB action, string database_name,
+ string table_name, string column_name = "") {
+ AuthorizedAction authorized_action;
+ authorized_action.user_name = move(user_name);
+ authorized_action.action = action;
+ authorized_action.database_name = move(database_name);
+ authorized_action.table_name = move(table_name);
+ authorized_action.column_name = move(column_name);
+
+ next_response_->emplace(authorized_action);
+ }
+
+ protected:
+ MetricRegistry metric_registry_;
+ unordered_set<AuthorizedAction, AuthorizedActionHash>* next_response_;
+ RangerClient client_;
+};
+
+TEST_F(RangerClientTest, TestAuthorizeCreateTableUnauthorized) {
+ auto s = client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar.baz");
+ ASSERT_TRUE(s.IsNotAuthorized());
+}
+
+TEST_F(RangerClientTest, TestAuthorizeCreateTableAuthorized) {
+ Allow("jdoe", ActionPB::CREATE, "foo", "bar");
+ ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo.bar"));
+}
+
+TEST_F(RangerClientTest, TestAuthorizeListNoTables) {
+ unordered_set<string> tables;
+ tables.emplace("foo.bar");
+ tables.emplace("foo.baz");
+ auto s = client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables);
+ ASSERT_TRUE(s.IsNotAuthorized());
+ ASSERT_EQ(2, tables.size());
+}
+
+TEST_F(RangerClientTest, TestAuthorizeMetadataSubsetOfTablesAuthorized) {
+ Allow("jdoe", ActionPB::METADATA, "default", "foobar");
+ unordered_set<string> tables;
+ tables.emplace("default.foobar");
+ tables.emplace("barbaz");
+ ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
+ ASSERT_EQ(1, tables.size());
+ ASSERT_EQ("default.foobar", *tables.begin());
+}
+
+TEST_F(RangerClientTest, TestAuthorizeMetadataAllAuthorized) {
+ Allow("jdoe", ActionPB::METADATA, "default", "foobar");
+ Allow("jdoe", ActionPB::METADATA, "default", "barbaz");
+ unordered_set<string> tables;
+ tables.emplace("default.foobar");
+ tables.emplace("barbaz");
+ ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
+ ASSERT_EQ(2, tables.size());
+ ASSERT_TRUE(ContainsKey(tables, "default.foobar"));
+ ASSERT_TRUE(ContainsKey(tables, "barbaz"));
+}
+
+TEST_F(RangerClientTest, TestAuthorizeMetadataAllNonRanger) {
+ unordered_set<string> tables;
+ tables.emplace("foo.");
+ tables.emplace(".bar");
+ auto s = client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables);
+ ASSERT_TRUE(s.IsNotAuthorized());
+}
+
+TEST_F(RangerClientTest, TestAuthorizeMetadataNoneAuthorizedContainsNonRanger) {
+ unordered_set<string> tables;
+ tables.emplace("foo.");
+ tables.emplace(".bar");
+ tables.emplace("foo.bar");
+ tables.emplace("foo.baz");
+ auto s = client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables);
+ ASSERT_TRUE(s.IsNotAuthorized());
+}
+
+TEST_F(RangerClientTest, TestAuthorizeMetadataAllAuthorizedContainsNonRanger) {
+ Allow("jdoe", ActionPB::METADATA, "default", "foobar");
+ Allow("jdoe", ActionPB::METADATA, "default", "barbaz");
+ unordered_set<string> tables;
+ tables.emplace("default.foobar");
+ tables.emplace("barbaz");
+ tables.emplace("foo.");
+ ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
+ ASSERT_EQ(2, tables.size());
+ ASSERT_TRUE(ContainsKey(tables, "default.foobar"));
+ ASSERT_TRUE(ContainsKey(tables, "barbaz"));
+ ASSERT_FALSE(ContainsKey(tables, "foo."));
+}
+
+TEST_F(RangerClientTest, TestAuthorizeScanSubsetAuthorized) {
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col1");
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col3");
+ unordered_set<string> columns;
+ columns.emplace("col1");
+ columns.emplace("col2");
+ columns.emplace("col3");
+ columns.emplace("col4");
+ ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
+ "default.foobar", &columns));
+ ASSERT_EQ(2, columns.size());
+ ASSERT_TRUE(ContainsKey(columns, "col1"));
+ ASSERT_TRUE(ContainsKey(columns, "col3"));
+ ASSERT_FALSE(ContainsKey(columns, "col2"));
+ ASSERT_FALSE(ContainsKey(columns, "col4"));
+}
+
+TEST_F(RangerClientTest, TestAuthorizeScanAllColumnsAuthorized) {
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col1");
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col2");
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col3");
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col4");
+ unordered_set<string> columns;
+ columns.emplace("col1");
+ columns.emplace("col2");
+ columns.emplace("col3");
+ columns.emplace("col4");
+ ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
+ "default.foobar", &columns));
+ ASSERT_EQ(4, columns.size());
+ ASSERT_TRUE(ContainsKey(columns, "col1"));
+ ASSERT_TRUE(ContainsKey(columns, "col2"));
+ ASSERT_TRUE(ContainsKey(columns, "col3"));
+ ASSERT_TRUE(ContainsKey(columns, "col4"));
+}
+
+TEST_F(RangerClientTest, TestAuthorizeScanNoColumnsAuthorized) {
+ unordered_set<string> columns;
+ for (int i = 0; i < 4; ++i) {
+ columns.emplace(Substitute("col$0", i));
+ }
+ auto s = client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
+ "default.foobar", &columns);
+ ASSERT_TRUE(s.IsNotAuthorized());
+ ASSERT_EQ(4, columns.size());
+}
+
+TEST_F(RangerClientTest, TestAuthorizeActionsNoneAuthorized) {
+ unordered_set<ActionPB, ActionHash> actions;
+ actions.emplace(ActionPB::DROP);
+ actions.emplace(ActionPB::SELECT);
+ actions.emplace(ActionPB::INSERT);
+ auto s = client_.AuthorizeActions("jdoe", "default.foobar", &actions);
+ ASSERT_TRUE(s.IsNotAuthorized());
+ ASSERT_EQ(3, actions.size());
+}
+
+TEST_F(RangerClientTest, TestAuthorizeActionsSomeAuthorized) {
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar");
+ unordered_set<ActionPB, ActionHash> actions;
+ actions.emplace(ActionPB::DROP);
+ actions.emplace(ActionPB::SELECT);
+ actions.emplace(ActionPB::INSERT);
+ ASSERT_OK(client_.AuthorizeActions("jdoe", "default.foobar", &actions));
+ ASSERT_EQ(1, actions.size());
+ ASSERT_TRUE(ContainsKey(actions, ActionPB::SELECT));
+}
+
+TEST_F(RangerClientTest, TestAuthorizeActionsAllAuthorized) {
+ Allow("jdoe", ActionPB::DROP, "default", "foobar");
+ Allow("jdoe", ActionPB::SELECT, "default", "foobar");
+ Allow("jdoe", ActionPB::INSERT, "default", "foobar");
+ unordered_set<ActionPB, ActionHash> actions;
+ actions.emplace(ActionPB::DROP);
+ actions.emplace(ActionPB::SELECT);
+ actions.emplace(ActionPB::INSERT);
+ ASSERT_OK(client_.AuthorizeActions("jdoe", "default.foobar", &actions));
+ ASSERT_EQ(3, actions.size());
+}
+
+} // namespace ranger
+} // namespace kudu
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
new file mode 100644
index 0000000..36141d9
--- /dev/null
+++ b/src/kudu/ranger/ranger_client.cc
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/ranger/ranger_client.h"
+
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/table_util.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/ranger/ranger.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+DEFINE_string(ranger_config_path, "",
+ "Path to directory containing Ranger client configuration. "
+ "Enables Ranger authorization provider. "
+ "sentry_service_rpc_addresses must not be set if this is "
+ "enabled.");
+TAG_FLAG(ranger_config_path, experimental);
+
+DEFINE_string(ranger_jar_path, "",
+ "Path to the JAR file containing the Ranger subprocess.");
+TAG_FLAG(ranger_jar_path, experimental);
+
+METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_length,
+ "Ranger subprocess inbound queue length",
+ kudu::MetricUnit::kMessages,
+ "Number of request messages in the Ranger subprocess' inbound request queue",
+ kudu::MetricLevel::kInfo,
+ 1000, 1);
+METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length,
+ "Ranger subprocess outbound queue length",
+ kudu::MetricUnit::kMessages,
+ "Number of request messages in the Ranger subprocess' outbound response queue",
+ kudu::MetricLevel::kInfo,
+ 1000, 1);
+METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_time_ms,
+ "Ranger subprocess inbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Ranger subprocess' inbound request queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_time_ms,
+ "Ranger subprocess outbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Ranger subprocess' outbound response queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
+ "Ranger subprocess execution time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent executing the Ranger subprocess request, excluding "
+ "time spent spent in the subprocess queues",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+
+namespace kudu {
+namespace ranger {
+
+using kudu::subprocess::SubprocessMetrics;
+using std::move;
+using std::string;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+static const char* kUnauthorizedAction = "Unauthorized action";
+static const char* kDenyNonRangerTableTemplate = "Denying action on table with invalid name $0. "
+ "Use 'kudu table rename_table' to rename it to "
+ "a Ranger-compatible name.";
+const char* kMainClass = "org.apache.kudu.ranger.RangerSubprocessMain";
+
+#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
+RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
+ HISTINIT(inbound_queue_length, ranger_subprocess_inbound_queue_length);
+ HISTINIT(outbound_queue_length, ranger_subprocess_outbound_queue_length);
+ HISTINIT(inbound_queue_time_ms, ranger_subprocess_inbound_queue_time_ms);
+ HISTINIT(outbound_queue_time_ms, ranger_subprocess_outbound_queue_time_ms);
+ HISTINIT(execution_time_ms, ranger_subprocess_execution_time_ms);
+}
+#undef HISTINIT
+
+RangerClient::RangerClient(const scoped_refptr<MetricEntity>& metric_entity) :
+ subprocess_({"java", "-cp", GetJavaClasspath()}, metric_entity) {}
+
+Status RangerClient::Start() {
+ VLOG(1) << "Initializing Ranger subprocess server";
+ return subprocess_.Start();
+}
+
+// TODO(abukor): refactor to avoid code duplication
+Status RangerClient::AuthorizeAction(const string& user_name,
+ const ActionPB& action,
+ const string& table_name) {
+ string db;
+ Slice tbl;
+
+ auto s = ParseRangerTableIdentifier(table_name, &db, &tbl);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table_name);
+ return Status::NotAuthorized(kUnauthorizedAction);
+ }
+
+ RangerRequestListPB req_list;
+ RangerResponseListPB resp_list;
+ req_list.set_user(user_name);
+
+ RangerRequestPB* req = req_list.add_requests();
+
+ req->set_action(action);
+ req->set_database(db);
+ req->set_table(tbl.ToString());
+
+ RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+
+ CHECK_EQ(1, resp_list.responses_size());
+ if (resp_list.responses().begin()->allowed()) {
+ return Status::OK();
+ }
+
+ LOG(WARNING) << Substitute("User $0 is not authorized to perform $1 on $2",
+ user_name, ActionPB_Name(action), table_name);
+ return Status::NotAuthorized(kUnauthorizedAction);
+}
+
+Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name,
+ const ActionPB& action,
+ const string& table_name,
+ unordered_set<string>* column_names) {
+ DCHECK(!column_names->empty());
+
+ string db;
+ Slice tbl;
+
+ auto s = ParseRangerTableIdentifier(table_name, &db, &tbl);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table_name);
+ return Status::NotAuthorized(kUnauthorizedAction);
+ }
+
+ RangerRequestListPB req_list;
+ RangerResponseListPB resp_list;
+ req_list.set_user(user_name);
+
+ for (const auto& col : *column_names) {
+ auto req = req_list.add_requests();
+ req->set_action(action);
+ req->set_database(db);
+ req->set_table(tbl.ToString());
+ req->set_column(col);
+ }
+
+ RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+
+ DCHECK_EQ(column_names->size(), resp_list.responses_size());
+
+ unordered_set<string> allowed_columns;
+ for (auto i = 0; i < req_list.requests_size(); ++i) {
+ if (resp_list.responses(i).allowed()) {
+ EmplaceOrDie(&allowed_columns, move(req_list.requests(i).column()));
+ }
+ }
+
+ if (allowed_columns.empty()) {
+ LOG(WARNING) << Substitute("User $0 is not authorized to perform $1 on table $2",
+ user_name, ActionPB_Name(action), table_name);
+ return Status::NotAuthorized(kUnauthorizedAction);
+ }
+
+ *column_names = move(allowed_columns);
+
+ return Status::OK();
+}
+
+Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
+ const ActionPB& action,
+ unordered_set<string>* table_names) {
+ if (table_names->empty()) {
+ return Status::InvalidArgument("Empty set of tables");
+ }
+
+ RangerRequestListPB req_list;
+ RangerResponseListPB resp_list;
+ req_list.set_user(user_name);
+
+ vector<string> orig_table_names;
+
+ for (const auto& table : *table_names) {
+ string db;
+ Slice tbl;
+
+ auto s = ParseRangerTableIdentifier(table, &db, &tbl);
+ if (PREDICT_TRUE(s.ok())) {
+ orig_table_names.emplace_back(table);
+
+ auto req = req_list.add_requests();
+ req->set_action(action);
+ req->set_database(db);
+ req->set_table(tbl.ToString());
+ } else {
+ LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table);
+ }
+ }
+
+ RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+
+ DCHECK_EQ(orig_table_names.size(), resp_list.responses_size());
+
+ unordered_set<string> allowed_tables;
+ for (auto i = 0; i < orig_table_names.size(); ++i) {
+ if (resp_list.responses(i).allowed()) {
+ EmplaceOrDie(&allowed_tables, move(orig_table_names[i]));
+ }
+ }
+
+ if (allowed_tables.empty()) {
+ LOG(WARNING) << Substitute("User $0 is not authorized to perform $1 on $2 tables",
+ user_name, ActionPB_Name(action), table_names->size());
+ return Status::NotAuthorized(kUnauthorizedAction);
+ }
+
+ *table_names = move(allowed_tables);
+
+ return Status::OK();
+}
+
+Status RangerClient::AuthorizeActions(const string& user_name,
+ const string& table_name,
+ unordered_set<ActionPB, ActionHash>* actions) {
+ DCHECK(!actions->empty());
+
+ string db;
+ Slice tbl;
+
+ auto s = ParseRangerTableIdentifier(table_name, &db, &tbl);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table_name);
+ return Status::NotAuthorized(kUnauthorizedAction);
+ }
+
+ RangerRequestListPB req_list;
+ RangerResponseListPB resp_list;
+ req_list.set_user(user_name);
+
+ for (const auto& action : *actions) {
+ auto req = req_list.add_requests();
+ req->set_action(action);
+ req->set_database(db);
+ req->set_table(tbl.ToString());
+ }
+
+ RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+
+ DCHECK_EQ(actions->size(), resp_list.responses_size());
+
+ unordered_set<ActionPB, ActionHash> allowed_actions;
+ for (auto i = 0; i < req_list.requests_size(); ++i) {
+ if (resp_list.responses(i).allowed()) {
+ EmplaceOrDie(&allowed_actions, move(req_list.requests(i).action()));
+ }
+ }
+
+ if (allowed_actions.empty()) {
+ LOG(WARNING) << Substitute("User $0 is not authorized to perform actions $1 on table $2",
+ user_name, JoinMapped(*actions, ActionPB_Name, ", "), table_name);
+ return Status::NotAuthorized(kUnauthorizedAction);
+ }
+
+ *actions = move(allowed_actions);
+
+ return Status::OK();
+}
+
+string RangerClient::GetJavaClasspath() {
+ Env* env = Env::Default();
+ string exe;
+ CHECK_OK(env->GetExecutablePath(&exe));
+ const string bin_dir = DirName(exe);
+ string jar_path = FLAGS_ranger_jar_path.empty() ?
+ JoinPathSegments(bin_dir, "kudu-subprocess.jar") :
+ FLAGS_ranger_jar_path;
+
+ return Substitute("$0:$1", jar_path, FLAGS_ranger_config_path);
+}
+
+} // namespace ranger
+} // namespace kudu
diff --git a/src/kudu/ranger/ranger_client.h b/src/kudu/ranger/ranger_client.h
new file mode 100644
index 0000000..c1f091e
--- /dev/null
+++ b/src/kudu/ranger/ranger_client.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <utility>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/ranger/ranger.pb.h"
+#include "kudu/subprocess/server.h"
+#include "kudu/subprocess/subprocess_proxy.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class MetricEntity;
+
+namespace ranger {
+
+struct ActionHash {
+ public:
+ int operator()(const ActionPB& action) const {
+ return action;
+ }
+};
+
+struct RangerSubprocessMetrics : public subprocess::SubprocessMetrics {
+ explicit RangerSubprocessMetrics(const scoped_refptr<MetricEntity>& entity);
+};
+
+typedef subprocess::SubprocessProxy<RangerRequestListPB, RangerResponseListPB,
+ RangerSubprocessMetrics> RangerSubprocess;
+
+// A client for the Ranger service that communicates with a Java subprocess.
+class RangerClient {
+ public:
+ // Creates a Ranger client.
+ explicit RangerClient(const scoped_refptr<MetricEntity>& metric_entity);
+
+ // Starts the RangerClient, initializes the subprocess server.
+ Status Start() WARN_UNUSED_RESULT;
+
+ // Authorizes an action on the table. Returns OK if 'user_name' is authorized
+ // to perform 'action' on 'table_name', NotAuthorized otherwise.
+ Status AuthorizeAction(const std::string& user_name, const ActionPB& action,
+ const std::string& table_name) WARN_UNUSED_RESULT;
+
+ // Authorizes action on multiple tables. If there is at least one table that
+ // user is authorized to perform the action on, it sets 'table_names' to the
+ // tables the user is authorized to access and returns OK, NotAuthorized
+ // otherwise.
+ Status AuthorizeActionMultipleTables(const std::string& user_name, const ActionPB& action,
+ std::unordered_set<std::string>* table_names)
+ WARN_UNUSED_RESULT;
+
+ // Authorizes action on multiple columns. If there is at least one column that
+ // user is authorized to perform the action on, it sets 'column_names' to the
+ // columns the user is authorized to access and returns OK, NotAuthorized
+ // otherwise.
+ Status AuthorizeActionMultipleColumns(const std::string& user_name, const ActionPB& action,
+ const std::string& table_name,
+ std::unordered_set<std::string>* column_names)
+ WARN_UNUSED_RESULT;
+
+ // Authorizes multiple table-level actions on a single table. If there is at
+ // least one action that user is authorized to perform on the table, it sets
+ // 'actions' to the actions the user is authorized to perform and returns OK,
+ // NotAuthorized otherwise.
+ Status AuthorizeActions(const std::string& user_name,
+ const std::string& table_name,
+ std::unordered_set<ActionPB, ActionHash>* actions)
+ WARN_UNUSED_RESULT;
+
+ // Replaces the subprocess server in the subprocess proxy.
+ void ReplaceServerForTests(std::unique_ptr<subprocess::SubprocessServer> server) {
+ subprocess_.ReplaceServerForTests(std::move(server));
+ }
+
+ private:
+ // Sends request to the subprocess and parses the response.
+ Status SendRequest(RangerRequestListPB* req, RangerResponseListPB* resp) WARN_UNUSED_RESULT;
+
+ // Returns classpath to be used for the Ranger subprocess.
+ static std::string GetJavaClasspath();
+
+ RangerSubprocess subprocess_;
+};
+
+} // namespace ranger
+} // namespace kudu
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index c8dc754..6d81d27 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -137,22 +137,35 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
void SubprocessServer::Shutdown() {
// Stop further work from happening by killing the subprocess and shutting
- // down the queues.
+ // down the queues.
if (!closing_.CountDown()) {
// We may shut down out-of-band in tests; if we've already shut down,
// there's nothing left to do.
return;
}
// NOTE: ordering isn't too important as long as we shut everything down.
- WARN_NOT_OK(process_->KillAndWait(SIGTERM), "failed to stop subprocess");
+ //
+ // Normally the process_ should be started before we reach Shutdown() and the
+ // threads below should be running too, except in mock servers because we
+ // don't init there. Shutdown() is still called in this case from the
+ // destructor though so these checks are necessary.
+ if (process_->IsStarted()) {
+ WARN_NOT_OK(process_->KillAndWait(SIGTERM), "failed to stop subprocess");
+ }
inbound_response_queue_.Shutdown();
outbound_call_queue_.Shutdown();
// We should be able to clean up our threads; they'll see that we're closing,
// the pipe has been closed, or the queues have been shut down.
- write_thread_->Join();
- read_thread_->Join();
- deadline_checker_->Join();
+ if (write_thread_) {
+ write_thread_->Join();
+ }
+ if (read_thread_) {
+ read_thread_->Join();
+ }
+ if (deadline_checker_) {
+ deadline_checker_->Join();
+ }
for (const auto& t : responder_threads_) {
t->Join();
}
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 5e9d625..c175e22 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -184,18 +184,20 @@ typedef BlockingQueue<SubprocessResponsePB, ResponseLogicalSize> ResponseQueue;
// - One "deadline-checker" thread: this thread looks through the oldest calls
// that have been sent to the subprocess and runs their callbacks with a
// TimedOut error if they are past their deadline.
+//
+// Public methods are virtual so a mock server can be used in tests.
class SubprocessServer {
public:
explicit SubprocessServer(std::vector<std::string> subprocess_argv);
- ~SubprocessServer();
+ virtual ~SubprocessServer();
// Initialize the server, starting the subprocess and worker threads.
- Status Init() WARN_UNUSED_RESULT;
+ virtual Status Init() WARN_UNUSED_RESULT;
// Synchronously sends a request to the subprocess and populates 'resp' with
// contents returned from the subprocess, or returns an error if anything
// failed or timed out along the way.
- Status Execute(SubprocessRequestPB* req, SubprocessResponsePB* resp) WARN_UNUSED_RESULT;
+ virtual Status Execute(SubprocessRequestPB* req, SubprocessResponsePB* resp) WARN_UNUSED_RESULT;
private:
FRIEND_TEST(SubprocessServerTest, TestCallsReturnWhenShuttingDown);
diff --git a/src/kudu/subprocess/subprocess_proxy.h b/src/kudu/subprocess/subprocess_proxy.h
index 3d60311..33e0f9a 100644
--- a/src/kudu/subprocess/subprocess_proxy.h
+++ b/src/kudu/subprocess/subprocess_proxy.h
@@ -48,11 +48,11 @@ template<class ReqPB, class RespPB, class MetricsPB>
class SubprocessProxy {
public:
SubprocessProxy(std::vector<std::string> argv, const scoped_refptr<MetricEntity>& entity)
- : server_(std::move(argv)), metrics_(entity) {}
+ : server_(new SubprocessServer(std::move(argv))), metrics_(entity) {}
// Starts the underlying subprocess.
Status Start() {
- return server_.Init();
+ return server_->Init();
}
// Executes the given request and populates the given response, returning a
@@ -62,7 +62,7 @@ class SubprocessProxy {
SubprocessRequestPB sreq;
sreq.mutable_request()->PackFrom(req);
SubprocessResponsePB sresp;
- RETURN_NOT_OK(server_.Execute(&sreq, &sresp));
+ RETURN_NOT_OK(server_->Execute(&sreq, &sresp));
if (!sresp.response().UnpackTo(resp)) {
LOG(ERROR) << strings::Substitute("unable to unpack response: $0",
pb_util::SecureDebugString(sresp));
@@ -78,6 +78,11 @@ class SubprocessProxy {
}
return Status::OK();
}
+
+ // Replaces the subprocess server.
+ void ReplaceServerForTests(std::unique_ptr<SubprocessServer> server) {
+ server_ = std::move(server);
+ }
private:
// Parses the given metrics protobuf and updates 'metrics_' based on its
// contents.
@@ -94,7 +99,7 @@ class SubprocessProxy {
metrics_.execution_time_ms->Increment(pb.execution_time_ms());
}
- SubprocessServer server_;
+ std::unique_ptr<SubprocessServer> server_;
MetricsPB metrics_;
};
diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc
index 2a3e8fd..68f1560 100644
--- a/src/kudu/util/subprocess.cc
+++ b/src/kudu/util/subprocess.cc
@@ -630,6 +630,11 @@ Status Subprocess::Kill(int signal) {
}
Status Subprocess::KillAndWait(int signal) {
+ if (state_ != kRunning) {
+ const string err_str = "Sub-process is not running";
+ LOG(DFATAL) << err_str;
+ return Status::IllegalState(err_str);
+ }
string procname = Substitute("$0 (pid $1)", argv0(), pid());
// This is a fatal error because all errors in Kill() are signal-independent,
@@ -856,4 +861,8 @@ int Subprocess::ReleaseChildFd(int stdfd) {
return ret;
}
+bool Subprocess::IsStarted() {
+ return state_ != kNotStarted;
+}
+
} // namespace kudu
diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h
index 4088c2e..25947a9 100644
--- a/src/kudu/util/subprocess.h
+++ b/src/kudu/util/subprocess.h
@@ -170,6 +170,8 @@ class Subprocess {
pid_t pid() const;
const std::string& argv0() const { return argv_[0]; }
+ bool IsStarted();
+
private:
FRIEND_TEST(SubprocessTest, TestGetProcfsState);