You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2021/08/10 18:29:42 UTC

[kudu] branch master updated (a5307b3 -> d65d75f)

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from a5307b3  [raft_consensus_election-itest] fix flakiness in one scenario
     new 6be5282  [consensus] remove ForTests suffix from WaitUntilLeader
     new 592ba3c  KUDU-1921 Add ability to require auth/encryption to C++ client
     new d65d75f  [java] KUDU-1921 Add ability to require authn/encryption to Java client

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kudu/client/AsyncKuduClient.java    | 50 ++++++++++++++-
 .../java/org/apache/kudu/client/Connection.java    | 17 ++++-
 .../org/apache/kudu/client/ConnectionCache.java    | 19 +++++-
 .../java/org/apache/kudu/client/KuduClient.java    | 32 +++++++++
 .../java/org/apache/kudu/client/Negotiator.java    | 29 +++++++--
 .../org/apache/kudu/client/TestNegotiator.java     |  3 +-
 .../java/org/apache/kudu/client/TestSecurity.java  | 61 ++++++++++++++++++
 src/kudu/client/client.cc                          | 19 ++++++
 src/kudu/client/client.h                           | 42 ++++++++++++
 src/kudu/client/client_builder-internal.cc         | 11 ++--
 src/kudu/client/client_builder-internal.h          |  2 +
 src/kudu/consensus/raft_consensus.cc               |  2 +-
 src/kudu/consensus/raft_consensus.h                |  5 +-
 src/kudu/consensus/raft_consensus_quorum-test.cc   | 14 +---
 src/kudu/integration-tests/alter_table-test.cc     |  2 +-
 src/kudu/integration-tests/fuzz-itest.cc           |  2 +-
 src/kudu/integration-tests/security-itest.cc       | 75 +++++++++++++++++++++-
 .../integration-tests/ts_tablet_manager-itest.cc   |  2 +-
 .../integration-tests/txn_participant-itest.cc     | 42 ++++++------
 src/kudu/rpc/client_negotiation.cc                 |  4 +-
 src/kudu/rpc/client_negotiation.h                  |  2 +
 src/kudu/rpc/messenger.cc                          |  2 +
 src/kudu/rpc/messenger.h                           |  8 +++
 src/kudu/rpc/negotiation-test.cc                   | 20 ++++--
 src/kudu/rpc/negotiation.cc                        | 10 ++-
 src/kudu/rpc/negotiation.h                         |  1 +
 src/kudu/rpc/reactor.cc                            |  7 +-
 src/kudu/rpc/server_negotiation.cc                 |  6 +-
 src/kudu/rpc/server_negotiation.h                  |  2 +
 src/kudu/tablet/tablet_replica-test-base.cc        |  4 +-
 .../tserver/tablet_copy_source_session-test.cc     |  4 +-
 src/kudu/tserver/tablet_server-test-base.cc        |  2 +-
 src/kudu/tserver/ts_tablet_manager-test.cc         |  2 +-
 33 files changed, 429 insertions(+), 74 deletions(-)

[kudu] 01/03: [consensus] remove ForTests suffix from WaitUntilLeader

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6be5282d09d3c9d4c465f43fd292c6914182e7d5
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Aug 9 15:50:16 2021 -0700

    [consensus] remove ForTests suffix from WaitUntilLeader
    
    I found WaitUntilLeaderForTests useful in making the master rebuilder
    tool more robust to slow startup. Since I intend on using it outside of
    tests, I removed the ForTests suffix.
    
    I also left a note in the header mentioning it should be used sparingly,
    given the implementation is admittedly simple and not suited for heavy
    usage.
    
    This patch contains no functional changes.
    
    Change-Id: I4f5d61a59651c3a5f11e317a77c0344f5dd3e707
    Reviewed-on: http://gerrit.cloudera.org:8080/17763
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/consensus/raft_consensus.cc               |  2 +-
 src/kudu/consensus/raft_consensus.h                |  5 ++-
 src/kudu/consensus/raft_consensus_quorum-test.cc   | 14 +-------
 src/kudu/integration-tests/alter_table-test.cc     |  2 +-
 src/kudu/integration-tests/fuzz-itest.cc           |  2 +-
 .../integration-tests/ts_tablet_manager-itest.cc   |  2 +-
 .../integration-tests/txn_participant-itest.cc     | 42 +++++++++++-----------
 src/kudu/tablet/tablet_replica-test-base.cc        |  4 +--
 .../tserver/tablet_copy_source_session-test.cc     |  4 +--
 src/kudu/tserver/tablet_server-test-base.cc        |  2 +-
 src/kudu/tserver/ts_tablet_manager-test.cc         |  2 +-
 11 files changed, 36 insertions(+), 45 deletions(-)

diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 2904365..827c6bc 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -547,7 +547,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
   return Status::OK();
 }
 
-Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
+Status RaftConsensus::WaitUntilLeader(const MonoDelta& timeout) {
   MonoTime deadline = MonoTime::Now() + timeout;
   while (role() != consensus::RaftPeerPB::LEADER) {
     if (MonoTime::Now() >= deadline) {
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 762f95f..aa458d5 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -182,7 +182,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   // Wait until the node has LEADER role.
   // Returns Status::TimedOut if the role is not LEADER within 'timeout'.
-  Status WaitUntilLeaderForTests(const MonoDelta& timeout);
+  // NOTE: the implementation is a busy loop; as such, this method should be
+  // used sparingly, e.g. only in tests, or in applications that don't require
+  // high concurrency.
+  Status WaitUntilLeader(const MonoDelta& timeout) WARN_UNUSED_RESULT;
 
   // Return a copy of the failure detector instance. Only for use in tests.
   std::shared_ptr<rpc::PeriodicTimer> GetFailureDetectorForTests() const {
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 1a1994a..81bc554 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -96,18 +96,6 @@ const char* kTestTablet = "TestTablet";
 void DoNothing(const string& s) {
 }
 
-Status WaitUntilLeaderForTests(RaftConsensus* raft) {
-  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(15);
-  while (MonoTime::Now() < deadline) {
-    if (raft->role() == RaftPeerPB::LEADER) {
-      return Status::OK();
-    }
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-
-  return Status::TimedOut("Timed out waiting to become leader");
-}
-
 // Test suite for tests that focus on multiple peer interaction, but
 // without integrating with other components, such as ops.
 class RaftConsensusQuorumTest : public KuduTest {
@@ -902,7 +890,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
     LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
     ASSERT_OK(new_leader->StartElection(RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
                                         RaftConsensus::EXTERNAL_REQUEST));
-    WaitUntilLeaderForTests(new_leader.get());
+    ASSERT_OK(new_leader->WaitUntilLeader(MonoDelta::FromSeconds(15)));
     LOG(INFO) << "Election won";
     int64_t flush_count_after =
         new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 09bcb05..9b28292 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -157,7 +157,7 @@ class AlterTableTest : public KuduTest {
 
     if (num_replicas() == 1) {
       tablet_replica_ = LookupTabletReplica();
-      ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+      ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10)));
     }
     LOG(INFO) << "Tablet successfully located";
   }
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index 579c660..225c0c0 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -382,7 +382,7 @@ class FuzzTest : public KuduTest {
 
   Status CallParticipantOpCheckResp(int64_t txn_id, ParticipantOpPB::ParticipantOpType op_type,
                                     int64_t ts_val) {
-    RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(
+    RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeader(
         MonoDelta::FromSeconds(10)));
     ParticipantResponsePB resp;
     RETURN_NOT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op_type, ts_val, &resp));
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 1aa0b39..2081cb8 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1082,7 +1082,7 @@ class TxnStatusTabletManagementTest : public TsTabletManagerITest {
 
     // Wait for the tablet to be in RUNNING state and its consensus running too.
     RETURN_NOT_OK(r->WaitUntilConsensusRunning(kTimeout));
-    auto s = r->consensus()->WaitUntilLeaderForTests(kTimeout);
+    auto s = r->consensus()->WaitUntilLeader(kTimeout);
     if (replica) {
       *replica = std::move(r);
     }
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index 5a0fd26..51dbd24 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -298,7 +298,7 @@ TEST_F(TxnParticipantITest, TestReplicateParticipantOps) {
   // tserver so we can ensure a specific leader.
   const int kLeaderIdx = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeader(kDefaultTimeout));
   // Try submitting the ops on all replicas. They should succeed on the leaders
   // and fail on followers.
   const int64_t kTxnId = 1;
@@ -369,7 +369,7 @@ TEST_P(ParticipantCopyITest, TestCopyParticipantOps) {
   constexpr const int kDeadServerIdx = kLeaderIdx + 1;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
 
   // Apply some operations.
   vector<TxnParticipant::TxnEntry> expected_txns;
@@ -442,7 +442,7 @@ TEST_F(TxnParticipantITest, TestWaitOnFinalizeCommit) {
   auto* follower_replica = replicas[kLeaderIdx + 1];
   auto* clock = leader_replica->clock();
   const int64_t kTxnId = 1;
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN));
   const MonoDelta kAgreeTimeout = kDefaultTimeout;
   const auto& tablet_id = leader_replica->tablet()->tablet_id();
@@ -503,7 +503,7 @@ TEST_F(TxnParticipantITest, TestWaitOnAbortCommit) {
   auto* follower_replica = replicas[kLeaderIdx + 1];
   auto* clock = leader_replica->clock();
   const int64_t kTxnId = 1;
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN));
   const MonoDelta kAgreeTimeout = kDefaultTimeout;
   const auto& tablet_id = leader_replica->tablet()->tablet_id();
@@ -541,7 +541,7 @@ TEST_F(TxnParticipantITest, TestProxyBasicCalls) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeader(kDefaultTimeout));
   auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
   for (const auto& op : kCommitSequence) {
     const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(), kTxnId, op);
@@ -555,7 +555,7 @@ TEST_F(TxnParticipantITest, TestBeginCommitAfterFinalize) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeader(kDefaultTimeout));
   auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
   const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
   {
@@ -598,7 +598,7 @@ TEST_F(TxnParticipantITest, TestProxyErrorWhenNotBegun) {
   constexpr const int kLeaderIdx = 0;
   auto txn_id = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeader(kDefaultTimeout));
   auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
   const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
   for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
@@ -619,7 +619,7 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeader(kDefaultTimeout));
   auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
 
   // Begin after already beginning.
@@ -703,7 +703,7 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInAbortSequence) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeader(kDefaultTimeout));
   auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
 
   // Try our illegal ops when our transaction is open.
@@ -755,7 +755,7 @@ TEST_F(TxnParticipantITest, TestProxyNonLeader) {
   constexpr const int kNonLeaderIdx = kLeaderIdx + 1;
   constexpr const int kTxnId = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeader(kDefaultTimeout));
   auto admin_proxy = cluster_->tserver_admin_proxy(kNonLeaderIdx);
   for (const auto& op : kCommitSequence) {
     const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(), kTxnId, op);
@@ -774,7 +774,7 @@ TEST_F(TxnParticipantITest, TestProxyTabletBootstrapping) {
   constexpr const int kTxnId = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
 
   FLAGS_tablet_bootstrap_inject_latency_ms = 1000;
   cluster_->mini_tablet_server(kLeaderIdx)->Shutdown();
@@ -799,7 +799,7 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotRunning) {
   constexpr const int kTxnId = 0;
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   auto* tablet_manager = cluster_->mini_tablet_server(kLeaderIdx)->server()->tablet_manager();
   ASSERT_OK(tablet_manager->DeleteTablet(leader_replica->tablet_id(),
       tablet::TABLET_DATA_TOMBSTONED, boost::none));
@@ -842,7 +842,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
 
   // Get commit-related metadata.
   TxnMetadataPB meta_pb;
@@ -900,7 +900,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
 
   // Start a transaction and make sure it results in the expected state
   // server-side.
@@ -926,7 +926,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
 
   // Start a transaction and make sure it results in the expected state
   // server-side.
@@ -1000,7 +1000,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   unique_ptr<TxnSystemClient> txn_client;
   ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
                                     cluster_->messenger()->sasl_proto_name(),
@@ -1056,7 +1056,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   unique_ptr<TxnSystemClient> txn_client;
   ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
                                     cluster_->messenger()->sasl_proto_name(),
@@ -1082,7 +1082,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientRepeatCalls) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   unique_ptr<TxnSystemClient> txn_client;
   ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
                                     cluster_->messenger()->sasl_proto_name(),
@@ -1115,7 +1115,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   // Bring down the other servers so we can't get a majority.
   for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
     if (i == kLeaderIdx) continue;
@@ -1176,7 +1176,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientSucceedsOnBootstrap) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   // Start a thread that sends participant ops to the tablet.
   int next_txn_id = 0;
   unique_ptr<TxnSystemClient> txn_client;
@@ -1230,7 +1230,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientRetriesWhenReplicaNotFound) {
   vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   auto* leader_replica = replicas[kLeaderIdx];
   const auto tablet_id = leader_replica->tablet_id();
-  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeader(kDefaultTimeout));
   // Start a thread that sends participant ops to the tablet.
   int next_txn_id = 0;
   unique_ptr<TxnSystemClient> txn_client;
diff --git a/src/kudu/tablet/tablet_replica-test-base.cc b/src/kudu/tablet/tablet_replica-test-base.cc
index d9e7329..d522949 100644
--- a/src/kudu/tablet/tablet_replica-test-base.cc
+++ b/src/kudu/tablet/tablet_replica-test-base.cc
@@ -184,7 +184,7 @@ Status TabletReplicaTestBase::StartReplica(const ConsensusBootstrapInfo& info) {
 
 Status TabletReplicaTestBase::StartReplicaAndWaitUntilLeader(const ConsensusBootstrapInfo& info) {
   RETURN_NOT_OK(StartReplica(info));
-  return tablet_replica_->consensus()->WaitUntilLeaderForTests(kLeadershipTimeout);
+  return tablet_replica_->consensus()->WaitUntilLeader(kLeadershipTimeout);
 }
 
 Status TabletReplicaTestBase::RestartReplica(bool reset_tablet) {
@@ -226,7 +226,7 @@ Status TabletReplicaTestBase::RestartReplica(bool reset_tablet) {
                                        prepare_pool_.get(),
                                        dns_resolver_.get()));
   // Wait for the replica to be usable.
-  return tablet_replica_->consensus()->WaitUntilLeaderForTests(kLeadershipTimeout);
+  return tablet_replica_->consensus()->WaitUntilLeader(kLeadershipTimeout);
 }
 
 } // namespace tablet
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 68471f4..94fc6e0 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -32,9 +32,9 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -189,7 +189,7 @@ class TabletCopyTest : public KuduTabletTest {
                                      prepare_pool_.get(),
                                      dns_resolver_.get()));
     ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
-    ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+    ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10)));
   }
 
   void TabletReplicaStateChangedCallback(const string& tablet_id, const string& reason) {
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index 1337ee1..dd2df29 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -138,7 +138,7 @@ Status TabletServerTestBase::WaitForTabletRunning(const char *tablet_id) {
   RETURN_NOT_OK(tablet_manager->GetTabletReplica(tablet_id, &tablet_replica));
   RETURN_NOT_OK(tablet_replica->WaitUntilConsensusRunning(kTimeout));
   RETURN_NOT_OK(
-      tablet_replica->consensus()->WaitUntilLeaderForTests(kTimeout));
+      tablet_replica->consensus()->WaitUntilLeader(kTimeout));
 
   // KUDU-2463: Even though the tablet thinks its leader, for correctness, it
   // must wait to finish replicating its no-op (even as a single replica)
diff --git a/src/kudu/tserver/ts_tablet_manager-test.cc b/src/kudu/tserver/ts_tablet_manager-test.cc
index 915e349..8ce9bc9 100644
--- a/src/kudu/tserver/ts_tablet_manager-test.cc
+++ b/src/kudu/tserver/ts_tablet_manager-test.cc
@@ -128,7 +128,7 @@ class TsTabletManagerTest : public KuduTest {
     }
 
     RETURN_NOT_OK(tablet_replica->WaitUntilConsensusRunning(MonoDelta::FromMilliseconds(2000)));
-    return tablet_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10));
+    return tablet_replica->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10));
   }
 
   void GenerateFullTabletReport(TabletReportPB* report) {

[kudu] 02/03: KUDU-1921 Add ability to require auth/encryption to C++ client

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 592ba3c5f7abfba43ee5ef1b339a4743006b6e3b
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Tue Jul 27 15:00:26 2021 +0200

    KUDU-1921 Add ability to require auth/encryption to C++ client
    
    Kudu servers support requiring authentication and encryption to be
    enabled, and clients prefer connecting in a secure way, but if a server
    doesn't support authentication and/or encryption, the client will
    silently connect insecurely, which can lead to a downgrade attack.
    
    With this patch, clients can require authentication and encryption to be
    set using the client API, where if such an attack is attempted, the
    client will fail to connect to the cluster.
    
    Change-Id: Ia3e800eb7c4e2f8787f0adf1f040d47358d29320
    Reviewed-on: http://gerrit.cloudera.org:8080/17731
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/client.cc                    | 19 +++++++
 src/kudu/client/client.h                     | 42 ++++++++++++++++
 src/kudu/client/client_builder-internal.cc   | 11 ++--
 src/kudu/client/client_builder-internal.h    |  2 +
 src/kudu/integration-tests/security-itest.cc | 75 +++++++++++++++++++++++++++-
 src/kudu/rpc/client_negotiation.cc           |  4 +-
 src/kudu/rpc/client_negotiation.h            |  2 +
 src/kudu/rpc/messenger.cc                    |  2 +
 src/kudu/rpc/messenger.h                     |  8 +++
 src/kudu/rpc/negotiation-test.cc             | 20 +++++---
 src/kudu/rpc/negotiation.cc                  | 10 +++-
 src/kudu/rpc/negotiation.h                   |  1 +
 src/kudu/rpc/reactor.cc                      |  7 ++-
 src/kudu/rpc/server_negotiation.cc           |  6 ++-
 src/kudu/rpc/server_negotiation.h            |  2 +
 15 files changed, 192 insertions(+), 19 deletions(-)

diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 8c57302..d05aaa8 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -325,6 +325,16 @@ KuduClientBuilder& KuduClientBuilder::sasl_protocol_name(const string& sasl_prot
   return *this;
 }
 
+KuduClientBuilder& KuduClientBuilder::encryption_policy(EncryptionPolicy encryption_policy) {
+  data_->encryption_policy_ = encryption_policy;
+  return *this;
+}
+
+KuduClientBuilder& KuduClientBuilder::require_authentication(bool require_authentication) {
+  data_->require_authentication_ = require_authentication;
+  return *this;
+}
+
 namespace {
 Status ImportAuthnCreds(const string& authn_creds,
                         Messenger* messenger,
@@ -371,6 +381,15 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>* client) {
   if (!data_->sasl_protocol_name_.empty()) {
     builder.set_sasl_proto_name(data_->sasl_protocol_name_);
   }
+  if (data_->require_authentication_) {
+    builder.set_rpc_authentication("required");
+  }
+  if (data_->encryption_policy_ != OPTIONAL) {
+    builder.set_rpc_encryption("required");
+    if (data_->encryption_policy_ == REQUIRED) {
+      builder.set_rpc_loopback_encryption(true);
+    }
+  }
   std::shared_ptr<Messenger> messenger;
   RETURN_NOT_OK(builder.Build(&messenger));
   UserCredentials user_credentials;
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 3ff9141..853d0a1 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -230,6 +230,18 @@ class KUDU_EXPORT KuduClientBuilder {
   KuduClientBuilder();
   ~KuduClientBuilder();
 
+  /// Policy for on-the-wire encryption
+  enum EncryptionPolicy {
+    OPTIONAL,        ///< Optional, it uses encrypted connection if the server supports
+                     ///< it, but it can connect to insecure servers too.
+
+    REQUIRED_REMOTE, ///< Only connects to remote servers that support encryption, fails
+                     ///< otherwise. It can connect to insecure servers only locally.
+
+    REQUIRED         ///< Only connects to any server, including on the loopback interface,
+                     ///< that support encryption, fails otherwise.
+  };
+
   /// Clear the set of master addresses.
   ///
   /// @return Reference to the updated object.
@@ -319,6 +331,36 @@ class KUDU_EXPORT KuduClientBuilder {
   /// @return Reference to the updated object.
   KuduClientBuilder& sasl_protocol_name(const std::string& sasl_protocol_name);
 
+  /// Require authentication for the connection to a remote server.
+  ///
+  /// If it's set to true, the client will require mutual authentication between
+  /// the server and the client. If the server doesn't support authentication,
+  /// or it's disabled, the client will fail to connect.
+  ///
+  /// @param [in] require_authentication
+  ///   Whether to require authentication.
+  /// @return Reference to the updated object.
+  KuduClientBuilder& require_authentication(bool require_authentication);
+
+  /// Require encryption for the connection to a remote server.
+  ///
+  /// If it's set to REQUIRED_REMOTE or REQUIRED, the client will
+  /// require encrypting the traffic between the server and the client.
+  /// If the server doesn't support encryption, or if it's disabled, the
+  /// client will fail to connect.
+  ///
+  /// Loopback connections are encrypted only if 'encryption_policy' is
+  /// set to REQUIRED, or if it's required by the server.
+  ///
+  /// The default value is OPTIONAL, which allows connecting to servers without
+  /// encryption as well, but it will still attempt to use it if the server
+  /// supports it.
+  ///
+  /// @param [in] encryption_policy
+  ///   Which encryption policy to use.
+  /// @return Reference to the updated object.
+  KuduClientBuilder& encryption_policy(EncryptionPolicy encryption_policy);
+
   /// Create a client object.
   ///
   /// @note KuduClients objects are shared amongst multiple threads and,
diff --git a/src/kudu/client/client_builder-internal.cc b/src/kudu/client/client_builder-internal.cc
index 7d0d496..b3c027c 100644
--- a/src/kudu/client/client_builder-internal.cc
+++ b/src/kudu/client/client_builder-internal.cc
@@ -25,11 +25,12 @@ namespace client {
 KuduClientBuilder::Data::Data()
     : default_admin_operation_timeout_(MonoDelta::FromSeconds(30)),
       default_rpc_timeout_(MonoDelta::FromSeconds(10)),
-      replica_visibility_(internal::ReplicaController::Visibility::VOTERS) {
-}
+      replica_visibility_(internal::ReplicaController::Visibility::VOTERS),
+      require_authentication_(false),
+      encryption_policy_(EncryptionPolicy::OPTIONAL) {
+  }
 
-KuduClientBuilder::Data::~Data() {
-}
+  KuduClientBuilder::Data::~Data() {}
 
-} // namespace client
+}  // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/client_builder-internal.h b/src/kudu/client/client_builder-internal.h
index f097b3e..8da30b4 100644
--- a/src/kudu/client/client_builder-internal.h
+++ b/src/kudu/client/client_builder-internal.h
@@ -43,6 +43,8 @@ class KuduClientBuilder::Data {
   internal::ReplicaController::Visibility replica_visibility_;
   boost::optional<int> num_reactors_;
   std::string sasl_protocol_name_;
+  bool require_authentication_;
+  EncryptionPolicy encryption_policy_;
 
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
diff --git a/src/kudu/integration-tests/security-itest.cc b/src/kudu/integration-tests/security-itest.cc
index c2d6bd2..60646aa 100644
--- a/src/kudu/integration-tests/security-itest.cc
+++ b/src/kudu/integration-tests/security-itest.cc
@@ -25,6 +25,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <tuple>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -87,7 +88,9 @@ using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::rpc::Messenger;
+using std::get;
 using std::string;
+using std::tuple;
 using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
@@ -639,6 +642,76 @@ TEST_F(SecurityITest, TestMismatchingPrincipals) {
   ASSERT_TRUE(s.IsTimedOut());
 }
 
+TEST_F(SecurityITest, TestRequireAuthenticationInsecureCluster) {
+  cluster_opts_.enable_kerberos = false;
+  ASSERT_OK(StartCluster());
+
+  shared_ptr<KuduClient> client;
+  KuduClientBuilder b;
+  b.require_authentication(true);
+  Status s = cluster_->CreateClient(&b, &client);
+  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "client requires authentication, but server does not have Kerberos enabled");
+}
+
+TEST_F(SecurityITest, TestRequireEncryptionInsecureCluster) {
+  cluster_opts_.enable_kerberos = false;
+  cluster_opts_.extra_master_flags.emplace_back("--rpc_encryption=disabled");
+  cluster_opts_.extra_tserver_flags.emplace_back("--rpc_encryption=disabled");
+  cluster_opts_.extra_master_flags.emplace_back("--rpc_authentication=disabled");
+  cluster_opts_.extra_tserver_flags.emplace_back("--rpc_authentication=disabled");
+  ASSERT_OK(StartCluster());
+
+  shared_ptr<KuduClient> client;
+  KuduClientBuilder b;
+  b.encryption_policy(client::KuduClientBuilder::REQUIRED);
+  Status s = cluster_->CreateClient(&b, &client);
+  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_STR_CONTAINS(s.ToString(), "server does not support required TLS encryption");
+}
+
+TEST_F(SecurityITest, TestRequireAuthenticationSecureCluster) {
+  ASSERT_OK(StartCluster());
+
+  shared_ptr<KuduClient> client;
+  KuduClientBuilder b;
+  b.require_authentication(true);
+  ASSERT_OK(cluster_->CreateClient(&b, &client));
+  SmokeTestCluster(client, /* transactional */ false);
+}
+
+class EncryptionPolicyTest :
+    public SecurityITest,
+    public ::testing::WithParamInterface<tuple<
+        KuduClientBuilder::EncryptionPolicy, bool>> {
+};
+
+INSTANTIATE_TEST_SUITE_P(,
+    EncryptionPolicyTest,
+    ::testing::Combine(
+        ::testing::Values(
+            KuduClientBuilder::EncryptionPolicy::OPTIONAL,
+            KuduClientBuilder::EncryptionPolicy::REQUIRED,
+            KuduClientBuilder::EncryptionPolicy::REQUIRED_REMOTE),
+        ::testing::Values(true, false)));
+
+TEST_P(EncryptionPolicyTest, TestEncryptionPolicy) {
+  const auto& params = GetParam();
+  if (!get<1>(params)) {
+    cluster_opts_.enable_kerberos = false;
+    cluster_opts_.extra_master_flags.emplace_back("--rpc_authentication=disabled");
+    cluster_opts_.extra_tserver_flags.emplace_back("--rpc_authentication=disabled");
+  }
+  ASSERT_OK(StartCluster());
+
+  shared_ptr<KuduClient> client;
+  KuduClientBuilder b;
+  b.encryption_policy(get<0>(params));
+  ASSERT_OK(cluster_->CreateClient(&b, &client));
+  SmokeTestCluster(client, /* transactional */ false);
+}
+
 Status AssignIPToClient(bool external) {
   // If the test does not require an external IP address
   // assign loopback IP to FLAGS_local_ip_for_outbound_sockets.
@@ -924,4 +997,4 @@ TEST_P(ConnectToFollowerMasterTest, AuthnTokenVerifierHaveKeys) {
   }
 }
 
-} // namespace kudu
+}  // namespace kudu
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index 7c3992b..9557ff3 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -119,6 +119,7 @@ ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
                                      const security::TlsContext* tls_context,
                                      boost::optional<security::SignedTokenPB> authn_token,
                                      RpcEncryption encryption,
+                                     bool encrypt_loopback,
                                      std::string sasl_proto_name)
     : socket_(std::move(socket)),
       helper_(SaslHelper::CLIENT),
@@ -126,6 +127,7 @@ ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
       tls_handshake_(security::TlsHandshakeType::CLIENT),
       encryption_(encryption),
       tls_negotiated_(false),
+      encrypt_loopback_(encrypt_loopback),
       authn_token_(std::move(authn_token)),
       psecret_(nullptr, std::free),
       negotiated_authn_(AuthenticationType::INVALID),
@@ -324,7 +326,7 @@ Status ClientNegotiation::SendNegotiate() {
     client_features_.insert(TLS);
     // If the remote peer is local, then we allow using TLS for authentication
     // without encryption or integrity.
-    if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+    if (socket_->IsLoopbackConnection() && !encrypt_loopback_) {
       client_features_.insert(TLS_AUTHENTICATION_ONLY);
     }
   }
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
index e9f0c7d..4d3e6d5 100644
--- a/src/kudu/rpc/client_negotiation.h
+++ b/src/kudu/rpc/client_negotiation.h
@@ -66,6 +66,7 @@ class ClientNegotiation {
                     const security::TlsContext* tls_context,
                     boost::optional<security::SignedTokenPB> authn_token,
                     security::RpcEncryption encryption,
+                    bool encrypt_loopback,
                     std::string sasl_proto_name);
 
   // Enable PLAIN authentication.
@@ -229,6 +230,7 @@ class ClientNegotiation {
   security::TlsHandshake tls_handshake_;
   const security::RpcEncryption encryption_;
   bool tls_negotiated_;
+  bool encrypt_loopback_;
 
   // TSK state.
   boost::optional<security::SignedTokenPB> authn_token_;
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 87a7bae..a517a6f 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -76,6 +76,7 @@ MessengerBuilder::MessengerBuilder(string name)
       sasl_proto_name_("kudu"),
       rpc_authentication_("optional"),
       rpc_encryption_("optional"),
+      rpc_loopback_encryption_(false),
       rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
       rpc_tls_ciphersuites_(kudu::security::SecurityDefaults::kDefaultTlsCipherSuites),
       rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
@@ -98,6 +99,7 @@ Status MessengerBuilder::Build(shared_ptr<Messenger>* msgr) {
   RETURN_NOT_OK(ParseTriState("--rpc_encryption",
                               rpc_encryption_,
                               &new_msgr->encryption_));
+  new_msgr->loopback_encryption_ = rpc_loopback_encryption_;
   RETURN_NOT_OK(new_msgr->Init());
   if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) {
     auto* tls_context = new_msgr->mutable_tls_context();
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 983d959..4c99f6f 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -160,6 +160,11 @@ class MessengerBuilder {
     return *this;
   }
 
+  MessengerBuilder& set_rpc_loopback_encryption(bool rpc_loopback_encryption) {
+    rpc_loopback_encryption_ = rpc_loopback_encryption;
+    return *this;
+  }
+
   // Set TLSv1.2 and earlier cipher suite preferences to use for TLS-secured RPC
   // connections. Uses the OpenSSL cipher preference list format. Under the
   // hood, SSL_CTX_set_cipher_list() is eventually being called with
@@ -261,6 +266,7 @@ class MessengerBuilder {
   std::string sasl_proto_name_;
   std::string rpc_authentication_;
   std::string rpc_encryption_;
+  bool rpc_loopback_encryption_;
   std::string rpc_tls_ciphers_;       // pre-TLSv1.3 cipher suites
   std::string rpc_tls_ciphersuites_;  // TLSv1.3-related cipher suites
   std::string rpc_tls_min_protocol_;
@@ -381,6 +387,7 @@ class Messenger {
 
   security::RpcAuthentication authentication() const { return authentication_; }
   security::RpcEncryption encryption() const { return encryption_; }
+  bool loopback_encryption() const { return loopback_encryption_; }
 
   ThreadPool* negotiation_pool(Connection::Direction dir);
 
@@ -473,6 +480,7 @@ class Messenger {
   // reused by different clients.
   security::RpcAuthentication authentication_;
   security::RpcEncryption encryption_;
+  bool loopback_encryption_;
 
   // Pools which are listening on behalf of this messenger.
   // Note that the user may have called Shutdown() on one of these
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 3983ff0..e0a8321 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -252,11 +252,13 @@ TEST_P(TestNegotiation, TestNegotiation) {
                                        &client_tls_context,
                                        authn_token,
                                        desc.client.encryption,
+                                       desc.rpc_encrypt_loopback,
                                        "kudu");
   ServerNegotiation server_negotiation(std::move(server_socket),
                                        &server_tls_context,
                                        &token_verifier,
                                        desc.server.encryption,
+                                       desc.rpc_encrypt_loopback,
                                        "kudu");
 
   // Set client and server SASL mechanisms.
@@ -1034,7 +1036,8 @@ static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
   ServerNegotiation server_negotiation(std::move(socket), &tls_context,
-                                       &token_verifier, RpcEncryption::OPTIONAL, "kudu");
+                                       &token_verifier, RpcEncryption::OPTIONAL,
+                                       /* encrypt_loopback */ false, "kudu");
   server_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(server_negotiation.EnableGSSAPI());
   post_check(server_negotiation.Negotiate());
@@ -1047,7 +1050,8 @@ static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn,
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   ClientNegotiation client_negotiation(std::move(conn), &tls_context,
-                                       boost::none, RpcEncryption::OPTIONAL, "kudu");
+                                       boost::none, RpcEncryption::OPTIONAL,
+                                       /* encrypt_loopback */ false, "kudu");
   client_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(client_negotiation.EnableGSSAPI());
   post_check(client_negotiation.Negotiate());
@@ -1210,7 +1214,8 @@ static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
   ServerNegotiation server_negotiation(std::move(socket), &tls_context,
-                                       &token_verifier, RpcEncryption::OPTIONAL, "kudu");
+                                       &token_verifier, RpcEncryption::OPTIONAL,
+                                       /* encrypt_loopback */ false, "kudu");
   CHECK_OK(server_negotiation.EnablePlain());
   Status s = server_negotiation.Negotiate();
   ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
@@ -1221,7 +1226,8 @@ static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   ClientNegotiation client_negotiation(std::move(sock), &tls_context,
-                                       boost::none, RpcEncryption::OPTIONAL, "kudu");
+                                       boost::none, RpcEncryption::OPTIONAL,
+                                       /* encrypt_loopback */ false, "kudu");
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
   client_negotiation.set_deadline(deadline);
@@ -1242,7 +1248,8 @@ static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
   ServerNegotiation server_negotiation(std::move(socket), &tls_context,
-                                       &token_verifier, RpcEncryption::OPTIONAL, "kudu");
+                                       &token_verifier, RpcEncryption::OPTIONAL,
+                                       /* encrypt_loopback */ false, "kudu");
   CHECK_OK(server_negotiation.EnablePlain());
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
   server_negotiation.set_deadline(deadline);
@@ -1255,7 +1262,8 @@ static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   ClientNegotiation client_negotiation(std::move(socket), &tls_context,
-                                       boost::none, RpcEncryption::OPTIONAL, "kudu");
+                                       boost::none, RpcEncryption::OPTIONAL,
+                                       /* encrypt_loopback */ false, "kudu");
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   Status s = client_negotiation.Negotiate();
   ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index 71c23f7..e429b4c 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -166,6 +166,7 @@ static Status DisableSocketTimeouts(Socket* socket) {
 static Status DoClientNegotiation(Connection* conn,
                                   RpcAuthentication authentication,
                                   RpcEncryption encryption,
+                                  bool encrypt_loopback,
                                   MonoTime deadline,
                                   unique_ptr<ErrorStatusPB>* rpc_error) {
   const auto* messenger = conn->reactor_thread()->reactor()->messenger();
@@ -176,6 +177,7 @@ static Status DoClientNegotiation(Connection* conn,
                                        &messenger->tls_context(),
                                        authn_token,
                                        encryption,
+                                       encrypt_loopback,
                                        messenger->sasl_proto_name());
 
   client_negotiation.set_server_fqdn(conn->outbound_connection_id().hostname());
@@ -236,6 +238,7 @@ static Status DoClientNegotiation(Connection* conn,
 static Status DoServerNegotiation(Connection* conn,
                                   RpcAuthentication authentication,
                                   RpcEncryption encryption,
+                                  bool encrypt_loopback,
                                   const MonoTime& deadline) {
   const auto* messenger = conn->reactor_thread()->reactor()->messenger();
   if (authentication == RpcAuthentication::REQUIRED &&
@@ -257,6 +260,7 @@ static Status DoServerNegotiation(Connection* conn,
                                        &messenger->tls_context(),
                                        &messenger->token_verifier(),
                                        encryption,
+                                       encrypt_loopback,
                                        messenger->sasl_proto_name());
 
   if (authentication != RpcAuthentication::DISABLED && !messenger->keytab_file().empty()) {
@@ -286,13 +290,15 @@ static Status DoServerNegotiation(Connection* conn,
 void Negotiation::RunNegotiation(const scoped_refptr<Connection>& conn,
                                  RpcAuthentication authentication,
                                  RpcEncryption encryption,
+                                 bool loopback_encryption,
                                  MonoTime deadline) {
   Status s;
   unique_ptr<ErrorStatusPB> rpc_error;
+  bool encrypt_loopback = FLAGS_rpc_encrypt_loopback_connections || loopback_encryption;
   if (conn->direction() == Connection::SERVER) {
-    s = DoServerNegotiation(conn.get(), authentication, encryption, deadline);
+    s = DoServerNegotiation(conn.get(), authentication, encryption, encrypt_loopback, deadline);
   } else {
-    s = DoClientNegotiation(conn.get(), authentication, encryption, deadline,
+    s = DoClientNegotiation(conn.get(), authentication, encryption, encrypt_loopback, deadline,
                             &rpc_error);
   }
 
diff --git a/src/kudu/rpc/negotiation.h b/src/kudu/rpc/negotiation.h
index 9f06c64..8f48e71 100644
--- a/src/kudu/rpc/negotiation.h
+++ b/src/kudu/rpc/negotiation.h
@@ -47,6 +47,7 @@ class Negotiation {
   static void RunNegotiation(const scoped_refptr<Connection>& conn,
                              security::RpcAuthentication authentication,
                              security::RpcEncryption encryption,
+                             bool loopback_encryption,
                              MonoTime deadline);
  private:
   DISALLOW_IMPLICIT_CONSTRUCTORS(Negotiation);
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 771b9f8..4f276ea 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -609,10 +609,13 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
   TRACE("Submitting negotiation task for $0", conn->ToString());
   auto authentication = reactor()->messenger()->authentication();
   auto encryption = reactor()->messenger()->encryption();
+  auto loopback_encryption = reactor()->messenger()->loopback_encryption();
   ThreadPool* negotiation_pool =
       reactor()->messenger()->negotiation_pool(conn->direction());
-  RETURN_NOT_OK(negotiation_pool->Submit([conn, authentication, encryption, deadline]() {
-        Negotiation::RunNegotiation(conn, authentication, encryption, deadline);
+  RETURN_NOT_OK(negotiation_pool->Submit([conn, authentication, encryption, loopback_encryption,
+                                          deadline]() {
+        Negotiation::RunNegotiation(conn, authentication, encryption, loopback_encryption,
+                                    deadline);
       }));
   return Status::OK();
 }
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 5cf8c42..0f373e5 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -156,6 +156,7 @@ ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
                                      const security::TlsContext* tls_context,
                                      const security::TokenVerifier* token_verifier,
                                      RpcEncryption encryption,
+                                     bool encrypt_loopback,
                                      std::string sasl_proto_name)
     : socket_(std::move(socket)),
       helper_(SaslHelper::SERVER),
@@ -163,6 +164,7 @@ ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
       tls_handshake_(security::TlsHandshakeType::SERVER),
       encryption_(encryption),
       tls_negotiated_(false),
+      encrypt_loopback_(encrypt_loopback),
       token_verifier_(token_verifier),
       negotiated_authn_(AuthenticationType::INVALID),
       negotiated_mech_(SaslMechanism::INVALID),
@@ -304,7 +306,7 @@ Status ServerNegotiation::PreflightCheckGSSAPI(const std::string& sasl_proto_nam
   // We aren't going to actually send/receive any messages, but
   // this makes it easier to reuse the initialization code.
   ServerNegotiation server(
-      nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL, sasl_proto_name);
+      nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL, false, sasl_proto_name);
   Status s = server.EnableGSSAPI();
   if (!s.ok()) {
     return Status::RuntimeError(s.message());
@@ -542,7 +544,7 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
     server_features_.insert(TLS);
     // If the remote peer is local, then we allow using TLS for authentication
     // without encryption or integrity.
-    if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+    if (socket_->IsLoopbackConnection() && !encrypt_loopback_) {
       server_features_.insert(TLS_AUTHENTICATION_ONLY);
     }
   }
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index 021f55d..6f22798 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -66,6 +66,7 @@ class ServerNegotiation {
                     const security::TlsContext* tls_context,
                     const security::TokenVerifier* token_verifier,
                     security::RpcEncryption encryption,
+                    bool encrypt_loopback,
                     std::string sasl_proto_name);
 
   // Enable PLAIN authentication.
@@ -229,6 +230,7 @@ class ServerNegotiation {
   security::TlsHandshake tls_handshake_;
   const security::RpcEncryption encryption_;
   bool tls_negotiated_;
+  bool encrypt_loopback_;
 
   // TSK state.
   const security::TokenVerifier* token_verifier_;

[kudu] 03/03: [java] KUDU-1921 Add ability to require authn/encryption to Java client

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit d65d75fb6473e1ce1db108758289456a71101690
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Tue Jul 27 15:01:31 2021 +0200

    [java] KUDU-1921 Add ability to require authn/encryption to Java client
    
    Change-Id: Ic951b2090a4933eca70dc53b6f93cdcff5a74929
    Reviewed-on: http://gerrit.cloudera.org:8080/17732
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 50 +++++++++++++++++-
 .../java/org/apache/kudu/client/Connection.java    | 17 +++++-
 .../org/apache/kudu/client/ConnectionCache.java    | 19 ++++++-
 .../java/org/apache/kudu/client/KuduClient.java    | 32 ++++++++++++
 .../java/org/apache/kudu/client/Negotiator.java    | 29 ++++++++--
 .../org/apache/kudu/client/TestNegotiator.java     |  3 +-
 .../java/org/apache/kudu/client/TestSecurity.java  | 61 ++++++++++++++++++++++
 7 files changed, 201 insertions(+), 10 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index cfa03cd..390bdd3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -425,7 +425,9 @@ public class AsyncKuduClient implements AutoCloseable {
     this.requestTracker = new RequestTracker(clientId);
 
     this.securityContext = new SecurityContext();
-    this.connectionCache = new ConnectionCache(securityContext, bootstrap, b.saslProtocolName);
+    this.connectionCache = new ConnectionCache(securityContext, bootstrap, b.saslProtocolName,
+        b.requireAuthentication, !b.encryptionPolicy.equals(EncryptionPolicy.OPTIONAL),
+        b.encryptionPolicy.equals(EncryptionPolicy.REQUIRED));
     this.tokenReacquirer = new AuthnTokenReacquirer(this);
     this.authzTokenCache = new AuthzTokenCache(this);
   }
@@ -2726,6 +2728,18 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
+  enum EncryptionPolicy {
+    // Optional, it uses encrypted connection if the server supports it,
+    // but it can connect to insecure servers too.
+    OPTIONAL,
+    // Only connects to remote servers that support encryption, fails
+    // otherwise. It can connect to insecure servers only locally.
+    REQUIRED_REMOTE,
+    // Only connects to any server, including on the loopback interface,
+    // that support encryption, fails otherwise.
+    REQUIRED,
+  }
+
   /**
    * Builder class to use in order to connect to Kudu.
    * All the parameters beyond those in the constructors are optional.
@@ -2746,6 +2760,8 @@ public class AsyncKuduClient implements AutoCloseable {
     private int workerCount = DEFAULT_WORKER_COUNT;
     private boolean statisticsDisabled = false;
     private String saslProtocolName = "kudu";
+    private boolean requireAuthentication = false;
+    private EncryptionPolicy encryptionPolicy = EncryptionPolicy.OPTIONAL;
 
     /**
      * Creates a new builder for a client that will connect to the specified masters.
@@ -2870,6 +2886,38 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     /**
+     * Require authentication for the connection to a remote server.
+     *
+     * If it's set to true, the client will require mutual authentication between
+     * the server and the client. If the server doesn't support authentication,
+     * or it's disabled, the client will fail to connect.
+     */
+    public AsyncKuduClientBuilder requireAuthentication(boolean requireAuthentication) {
+      this.requireAuthentication = requireAuthentication;
+      return this;
+    }
+
+    /**
+     * Require encryption for the connection to a remote server.
+     *
+     * If it's set to REQUIRED_REMOTE or REQUIRED, the client will
+     * require encrypting the traffic between the server and the client.
+     * If the server doesn't support encryption, or if it's disabled, the
+     * client will fail to connect.
+     *
+     * Loopback connections are encrypted only if 'encryption_policy' is
+     * set to REQUIRED, or if it's required by the server.
+     *
+     * The default value is OPTIONAL, which allows connecting to servers without
+     * encryption as well, but it will still attempt to use it if the server
+     * supports it.
+     */
+    public AsyncKuduClientBuilder encryptionPolicy(EncryptionPolicy encryptionPolicy) {
+      this.encryptionPolicy = encryptionPolicy;
+      return this;
+    }
+
+    /**
      * Creates the client bootstrap for Netty. The user can specify the executor, but
      * if they don't, we'll use a simple thread pool.
      */
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index 74445ef..0cc7900 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -110,6 +110,12 @@ class Connection extends SimpleChannelInboundHandler<Object> {
 
   private final String saslProtocolName;
 
+  private final boolean requireAuthentication;
+
+  private final boolean requireEncryption;
+
+  private final boolean encryptLoopback;
+
   /** The underlying Netty's socket channel. */
   private SocketChannel channel;
 
@@ -187,7 +193,10 @@ class Connection extends SimpleChannelInboundHandler<Object> {
              SecurityContext securityContext,
              Bootstrap bootstrap,
              CredentialsPolicy credentialsPolicy,
-             String saslProtocolName) {
+             String saslProtocolName,
+             boolean requireAuthentication,
+             boolean requireEncryption,
+             boolean encryptLoopback) {
     this.serverInfo = serverInfo;
     this.securityContext = securityContext;
     this.saslProtocolName = saslProtocolName;
@@ -195,6 +204,9 @@ class Connection extends SimpleChannelInboundHandler<Object> {
     this.credentialsPolicy = credentialsPolicy;
     this.bootstrap = bootstrap.clone();
     this.bootstrap.handler(new ConnectionChannelInitializer());
+    this.requireAuthentication = requireAuthentication;
+    this.requireEncryption = requireEncryption;
+    this.encryptLoopback = encryptLoopback;
   }
 
   /** {@inheritDoc} */
@@ -213,7 +225,8 @@ class Connection extends SimpleChannelInboundHandler<Object> {
     }
     ctx.writeAndFlush(Unpooled.wrappedBuffer(CONNECTION_HEADER), ctx.voidPromise());
     Negotiator negotiator = new Negotiator(serverInfo.getAndCanonicalizeHostname(), securityContext,
-        (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS), saslProtocolName);
+        (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS), saslProtocolName,
+        requireAuthentication, requireEncryption, encryptLoopback);
     ctx.pipeline().addBefore(ctx.name(), "negotiation", negotiator);
     negotiator.sendHello(ctx);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 705f962..706bc98 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -58,6 +58,12 @@ class ConnectionCache {
 
   private final String saslProtocolName;
 
+  private boolean requireAuthentication;
+
+  private boolean requireEncryption;
+
+  private boolean encryptLoopback;
+
   /**
    * Container mapping server IP/port into the established connection from the client to the
    * server. It may be up to two connections per server: one established with secondary
@@ -70,10 +76,16 @@ class ConnectionCache {
   /** Create a new empty ConnectionCache given the specified parameters. */
   ConnectionCache(SecurityContext securityContext,
                   Bootstrap bootstrap,
-                  String saslProtocolName) {
+                  String saslProtocolName,
+                  boolean requireAuthentication,
+                  boolean requireEncryption,
+                  boolean encryptLoopback) {
     this.securityContext = securityContext;
     this.bootstrap = bootstrap;
     this.saslProtocolName = saslProtocolName;
+    this.requireAuthentication = requireAuthentication;
+    this.requireEncryption = requireEncryption;
+    this.encryptLoopback = encryptLoopback;
   }
 
   /**
@@ -127,7 +139,10 @@ class ConnectionCache {
                                 securityContext,
                                 bootstrap,
                                 credentialsPolicy,
-                                saslProtocolName);
+                                saslProtocolName,
+                                requireAuthentication,
+                                requireEncryption,
+                                encryptLoopback);
         connections.add(result);
         // There can be at most 2 connections to the same destination: one with primary and another
         // with secondary credentials.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index ad518a6..3cc1d31 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -623,6 +623,38 @@ public class KuduClient implements AutoCloseable {
     }
 
     /**
+     * Require authentication for the connection to a remote server.
+     *
+     * If it's set to true, the client will require mutual authentication between
+     * the server and the client. If the server doesn't support authentication,
+     * or it's disabled, the client will fail to connect.
+     */
+    public KuduClientBuilder requireAuthentication(boolean requireAuthentication) {
+      clientBuilder.requireAuthentication(requireAuthentication);
+      return this;
+    }
+
+    /**
+     * Require encryption for the connection to a remote server.
+     *
+     * If it's set to REQUIRED or REQUIRED_LOOPBACK, the client will
+     * require encrypting the traffic between the server and the client.
+     * If the server doesn't support encryption, or if it's disabled, the
+     * client will fail to connect.
+     *
+     * Loopback connections are encrypted only if 'encryption_policy' is
+     * set to REQUIRE_LOOPBACK, or if it's required by the server.
+     *
+     * The default value is OPTIONAL, which allows connecting to servers without
+     * encryption as well, but it will still attempt to use it if the server
+     * supports it.
+     */
+    public KuduClientBuilder encryptionPolicy(AsyncKuduClient.EncryptionPolicy encryptionPolicy) {
+      clientBuilder.encryptionPolicy(encryptionPolicy);
+      return this;
+    }
+
+    /**
      * Creates a new client that connects to the masters.
      * Doesn't block and won't throw an exception if the masters don't exist.
      * @return a new asynchronous Kudu client
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 234984b..a9c511d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -222,7 +222,13 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
 
   private Certificate peerCert;
 
-  private String saslProtocolName;
+  private final String saslProtocolName;
+
+  private final boolean requireAuthentication;
+
+  private final boolean requireEncryption;
+
+  private final boolean encryptLoopback;
 
   @InterfaceAudience.LimitedPrivate("Test")
   boolean overrideLoopbackForTests;
@@ -230,10 +236,16 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
   public Negotiator(String remoteHostname,
                     SecurityContext securityContext,
                     boolean ignoreAuthnToken,
-                    String saslProtocolName) {
+                    String saslProtocolName,
+                    boolean requireAuthentication,
+                    boolean requireEncryption,
+                    boolean encryptLoopback) {
     this.remoteHostname = remoteHostname;
     this.securityContext = securityContext;
     this.saslProtocolName = saslProtocolName;
+    this.requireAuthentication = requireAuthentication;
+    this.requireEncryption = requireEncryption;
+    this.encryptLoopback = encryptLoopback;
     SignedTokenPB token = securityContext.getAuthenticationToken();
     if (token != null) {
       if (ignoreAuthnToken) {
@@ -264,7 +276,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
     for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
       builder.addSupportedFeatures(flag);
     }
-    if (isLoopbackConnection(ctx.channel())) {
+    if (isLoopbackConnection(ctx.channel()) && !encryptLoopback) {
       builder.addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY);
     }
 
@@ -371,6 +383,10 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
     serverFeatures = getFeatureFlags(response);
     // If the server supports TLS, we will always speak TLS to it.
     final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+    if (!negotiatedTls && requireEncryption) {
+      throw new NonRecoverableException(Status.NotAuthorized(
+          "server does not support required TLS encryption"));
+    }
 
     // Check the negotiated authentication type sent by the server.
     chosenAuthnType = chooseAuthenticationType(response);
@@ -473,6 +489,11 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
 
     if (chosenMech != null) {
       LOG.debug("SASL mechanism {} chosen for peer {}", chosenMech.name(), remoteHostname);
+      if (chosenMech.equals(SaslMechanism.PLAIN) && requireAuthentication) {
+        String message = "client requires authentication, " +
+            "but server does not have Kerberos enabled";
+        throw new NonRecoverableException(Status.NotAuthorized(message));
+      }
       return;
     }
 
@@ -658,7 +679,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
 
     // Don't wrap the TLS socket if we are using TLS for authentication only.
     boolean isAuthOnly = serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) &&
-        isLoopbackConnection(ctx.channel());
+        isLoopbackConnection(ctx.channel()) && !encryptLoopback;
     if (!isAuthOnly) {
       ctx.pipeline().addFirst("tls", handler);
     }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index 0da8368..5734795 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -117,7 +117,8 @@ public class TestNegotiator {
   }
 
   private void startNegotiation(boolean fakeLoopback) {
-    Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false, "kudu");
+    Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false, "kudu",
+        false, false, false);
     negotiator.overrideLoopbackForTests = fakeLoopback;
     embedder = new EmbeddedChannel(negotiator);
     negotiator.sendHello(embedder.pipeline().firstContext());
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 7d88d34..cbe84d4 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -524,4 +524,65 @@ public class TestSecurity {
         getBasicCreateTableOptions()));
   }
 
+  @Test(timeout = 60000)
+  public void testKuduRequireAuthenticationInsecureCluster() throws Exception {
+    try {
+      KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+          .requireAuthentication(true)
+          .build();
+      client.createTable("TestSecurity-authentication-required-1",
+          getBasicSchema(), getBasicCreateTableOptions());
+      Assert.fail("client shouldn't be able to connect to the cluster.");
+    } catch (NonRecoverableException e) {
+      Assert.assertThat(e.getMessage(), CoreMatchers.containsString(
+          "client requires authentication, but server does not have Kerberos enabled"
+      ));
+    }
+  }
+
+  @Test(timeout = 60000)
+  @KuduTestHarness.MasterServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  @KuduTestHarness.TabletServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  public void testKuduRequireEncryptionInsecureCluster() throws Exception {
+    try {
+      KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+          .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.REQUIRED_REMOTE)
+          .build();
+      client.createTable("TestSecurity-encryption-required-1",
+          getBasicSchema(), getBasicCreateTableOptions());
+      Assert.fail("client shouldn't be able to connect to the cluster.");
+    } catch (NonRecoverableException e) {
+      Assert.assertThat(e.getMessage(), CoreMatchers.containsString(
+          "server does not support required TLS encryption"
+      ));
+    }
+  }
+
+  @Test
+  @KuduTestHarness.EnableKerberos
+  public void testKuduRequireAuthenticationAndEncryptionSecureCluster() throws KuduException {
+    KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+        .requireAuthentication(true)
+        .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.REQUIRED)
+        .build();
+    KuduTable table = client.createTable("TestSecurity-authentication-required-1",
+        getBasicSchema(), getBasicCreateTableOptions());
+    Assert.assertNotNull(table);
+  }
+
+  @Test
+  @KuduTestHarness.MasterServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  @KuduTestHarness.TabletServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  public void testKuduOptionalEncryption() throws KuduException {
+    KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+        .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.OPTIONAL)
+        .build();
+    KuduTable table = client.createTable("testSecurity-encryption-optional-1",
+        getBasicSchema(), getBasicCreateTableOptions());
+    Assert.assertNotNull(table);
+  }
 }