You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/05/25 01:59:00 UTC

[1/2] kudu git commit: KUDU-1034 client does not failover due to timeout

Repository: kudu
Updated Branches:
  refs/heads/master 12683435f -> e3c5dd18c


KUDU-1034 client does not failover due to timeout

This patch fixes the issue described by KUDU-1034: the client does not
mark the failed tablet server as 'failed' in case of timeout and
continues to use it over and over again to send further requests,
even if other tablet replicas might be available.

Besides the actual fix, this patch incorporates an integration test
(ClientFailoverTServerTimeoutITest.FailoverOnLeaderTimeout) written by Mike.

Change-Id: Icfcece485e4053d921ffdc865612b3e7b9a992a3
Reviewed-on: http://gerrit.cloudera.org:8080/6924
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4263b037
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4263b037
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4263b037

Branch: refs/heads/master
Commit: 4263b037844fca595a35f99479fbb5765ba7a443
Parents: 1268343
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu May 18 13:50:01 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu May 25 01:07:43 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/meta_cache.h                    |  5 +
 .../integration-tests/client_failover-itest.cc  | 98 +++++++++++++++++++-
 src/kudu/integration-tests/tablet_copy-itest.cc |  7 +-
 src/kudu/rpc/retriable_rpc.h                    | 18 +++-
 4 files changed, 125 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/client/meta_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index a8a67f1..2e68f6c 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -130,7 +130,12 @@ class MetaCacheServerPicker : public rpc::ServerPicker<RemoteTabletServer> {
 
   virtual ~MetaCacheServerPicker() {}
   void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override;
+
+  // In the case of this MetaCacheServerPicker class, the implementation of this
+  // method is very selective. It marks only servers hosting the remote tablet
+  // the MetaCacheServerPicker object is bound to, not the entire RemoteTabletServer.
   void MarkServerFailed(RemoteTabletServer* replica, const Status& status) override;
+
   void MarkReplicaNotLeader(RemoteTabletServer* replica) override;
   void MarkResourceNotFound(RemoteTabletServer* replica) override;
  private:

http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/integration-tests/client_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc
index 0066d2f..bd0657b 100644
--- a/src/kudu/integration-tests/client_failover-itest.cc
+++ b/src/kudu/integration-tests/client_failover-itest.cc
@@ -15,10 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <boost/optional.hpp>
 #include <memory>
 #include <set>
+#include <string>
 #include <unordered_map>
+#include <vector>
+
+#include <boost/optional.hpp>
+#include <glog/logging.h>
 
 #include "kudu/client/client-test-util.h"
 #include "kudu/common/wire_protocol.h"
@@ -26,6 +30,7 @@
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
+#include "kudu/tserver/tserver.pb.h"
 
 using kudu::client::CountTableRows;
 using kudu::client::KuduInsert;
@@ -245,4 +250,95 @@ TEST_F(ClientFailoverITest, TestClusterCrashDuringWorkload) {
   workload.StopAndJoin();
 }
 
+class ClientFailoverTServerTimeoutITest : public ExternalMiniClusterITestBase {
+ public:
+  void SetUp() override {
+    ExternalMiniClusterITestBase::SetUp();
+
+    // Extra flags to speed up the test.
+    const vector<string> extra_flags_tserver = {
+      "--consensus_rpc_timeout_ms=250",
+      "--heartbeat_interval_ms=10",
+      "--raft_heartbeat_interval_ms=25",
+      "--leader_failure_exp_backoff_max_delta_ms=1000",
+    };
+    const vector<string> extra_flags_master = {
+      "--raft_heartbeat_interval_ms=25",
+      "--leader_failure_exp_backoff_max_delta_ms=1000",
+    };
+    NO_FATALS(StartCluster(extra_flags_tserver, extra_flags_master, kTSNum));
+  }
+
+ protected:
+  static const int kTSNum = 3;
+
+  Status GetLeaderReplica(TServerDetails** leader) {
+    string tablet_id;
+    RETURN_NOT_OK(GetTabletId(&tablet_id));
+    Status s;
+    for (int i = 0; i < 128; ++i) {
+      // FindTabletLeader tries to connect the the reported leader to verify
+      // that it thinks it's the leader.
+      s = itest::FindTabletLeader(
+          ts_map_, tablet_id, MonoDelta::FromMilliseconds(100), leader);
+      if (s.ok()) {
+        break;
+      }
+      SleepFor(MonoDelta::FromMilliseconds(10L * (i + 1)));
+    }
+    return s;
+  }
+
+ private:
+  // Get identifier of any tablet running on the tablet server with index 0.
+  Status GetTabletId(string* tablet_id) {
+    TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+    vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+    RETURN_NOT_OK(itest::WaitForNumTabletsOnTS(
+        ts, 1, MonoDelta::FromSeconds(32), &tablets));
+    *tablet_id = tablets[0].tablet_status().tablet_id();
+
+    return Status::OK();
+  }
+};
+
+// Test that a client fails over to other available tablet replicas when a RPC
+// with the former leader times out. This is a regression test for KUDU-1034.
+TEST_F(ClientFailoverTServerTimeoutITest, FailoverOnLeaderTimeout) {
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(kTSNum);
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  TServerDetails* leader;
+  ASSERT_OK(GetLeaderReplica(&leader));
+  ASSERT_NE(nullptr, leader);
+
+  // Pause the leader: this will cause the client to get timeout errors
+  // if trying to send RPCs to the corresponding tablet server.
+  ExternalTabletServer* ts(cluster_->tablet_server_by_uuid(leader->uuid()));
+  ASSERT_NE(nullptr, ts);
+  ScopedResumeExternalDaemon leader_resumer(ts);
+  ASSERT_OK(ts->Pause());
+
+  // Write 100 more rows.
+  int rows_target = workload.rows_inserted() + 100;
+  workload.set_timeout_allowed(true);
+  workload.set_write_timeout_millis(500);
+  workload.Start();
+  for (int i = 0; i < 1000; ++i) {
+    if (workload.rows_inserted() >= rows_target) {
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Verify all rows have reached the destiation.
+  EXPECT_GE(workload.rows_inserted(), rows_target);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/integration-tests/tablet_copy-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 85e25f7..2f747c5 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -157,7 +157,12 @@ TEST_F(TabletCopyITest, TestRejectRogueLeader) {
   // leader's tablet copy request when we bring it back online.
   int log_index = workload.batches_completed() + 2; // 2 terms == 2 additional NO_OP entries.
   ASSERT_OK(WaitForServersToAgree(timeout, active_ts_map, tablet_id, log_index));
-  // TODO: Write more rows to the new leader once KUDU-1034 is fixed.
+  // Write more rows to the new leader.
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
 
   // Now kill the new leader and tombstone the replica on TS 0.
   cluster_->tablet_server(new_leader_index)->Shutdown();

http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index a5ff7ad..c896027 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -179,6 +179,9 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(
       // test coverage here to understand why the back-off is not taking effect.
       if (server != nullptr) {
         VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString();
+        // Mark the server as failed. As for details on the only existing
+        // implementation of ServerPicker::MarkServerFailed(), see the note on
+        // the MetaCacheServerPicker::MarkServerFailed() method.
         server_picker_->MarkServerFailed(server, result.status);
       }
       break;
@@ -209,8 +212,21 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(
       return false;
     }
 
+    case RetriableRpcStatus::NON_RETRIABLE_ERROR:
+      if (server != nullptr && result.status.IsTimedOut()) {
+        // For the NON_RETRIABLE_ERROR result in case of TimedOut status,
+        // mark the server as failed. As for details on the only existing
+        // implementation of ServerPicker::MarkServerFailed(), see the note on
+        // the MetaCacheServerPicker::MarkServerFailed() method.
+        VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString();
+        server_picker_->MarkServerFailed(server, result.status);
+      }
+      // Do not retry in the case of non-retriable error.
+      return false;
+
     default:
-      // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry.
+      // For the OK case we should not retry.
+      DCHECK(result.result == RetriableRpcStatus::OK);
       return false;
   }
   resp_.Clear();


[2/2] kudu git commit: KUDU-1580 retry tserver RPC if negotiation times out

Posted by al...@apache.org.
KUDU-1580 retry tserver RPC if negotiation times out

This patch addresses KUDU-1580, i.e. with this patch the Kudu C++ client
retries an RPC with other tablet replica if the connection negotiation
with current replica timed out.

Added new integration test to cover the updated client's behavior.

Change-Id: Icee8bf4978365a23d6627e7bc411b63f53540a3b
Reviewed-on: http://gerrit.cloudera.org:8080/6926
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e3c5dd18
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e3c5dd18
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e3c5dd18

Branch: refs/heads/master
Commit: e3c5dd18c22b9e20358f05dcd301e7736c0a3321
Parents: 4263b03
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue May 23 00:19:39 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu May 25 01:07:52 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc                      |  13 ++
 .../integration-tests/client_failover-itest.cc  | 132 +++++++++++++++++++
 .../integration-tests/raft_consensus-itest.cc   |   5 -
 src/kudu/rpc/connection.cc                      |  20 ++-
 src/kudu/rpc/outbound_call.cc                   |  70 +++++++---
 src/kudu/rpc/outbound_call.h                    |  20 ++-
 src/kudu/rpc/reactor.cc                         |  15 +--
 src/kudu/rpc/rpc-test-base.h                    |   9 +-
 src/kudu/rpc/rpc-test.cc                        |   5 +-
 src/kudu/rpc/rpc_controller.cc                  |  10 +-
 src/kudu/rpc/rpc_controller.h                   |   3 +
 src/kudu/rpc/rpc_introspection.proto            |   2 +
 12 files changed, 265 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 498bebe..dd9f5a8 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -402,6 +402,19 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     return result;
   }
 
+  // Handle the connection negotiation failure case if overall RPC's timeout
+  // hasn't expired yet: if the connection negotiation returned non-OK status,
+  // mark the server as not accessible and rely on the RetriableRpc's logic
+  // to switch to an alternative tablet replica.
+  //
+  // NOTE: Connection negotiation errors related to security are handled in the
+  //       code above: see the handlers for IsNotAuthorized(), IsRemoteError().
+  if (!rpc_cb_status.IsTimedOut() && !result.status.ok() &&
+      mutable_retrier()->controller().negotiation_failed()) {
+    result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;
+    return result;
+  }
+
   if (result.status.ok()) {
     result.result = RetriableRpcStatus::OK;
   } else {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/integration-tests/client_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc
index bd0657b..7f25fe7 100644
--- a/src/kudu/integration-tests/client_failover-itest.cc
+++ b/src/kudu/integration-tests/client_failover-itest.cc
@@ -18,33 +18,50 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <vector>
 
 #include <boost/optional.hpp>
 #include <glog/logging.h>
 
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
 #include "kudu/client/client-test-util.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/tablet/key_value_test_schema.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/scoped_cleanup.h"
 
 using kudu::client::CountTableRows;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
 using kudu::client::KuduInsert;
 using kudu::client::KuduSession;
+using kudu::client::KuduSchema;
 using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
 using kudu::client::KuduUpdate;
 using kudu::client::sp::shared_ptr;
 using kudu::itest::TServerDetails;
 using kudu::tablet::TABLET_DATA_TOMBSTONED;
 using std::set;
 using std::string;
+using std::thread;
 using std::vector;
+using std::unique_ptr;
 using std::unordered_map;
 
+DECLARE_bool(rpc_reopen_outbound_connections);
+DECLARE_int64(rpc_negotiation_timeout_ms);
+
 namespace kudu {
 
 enum ClientTestBehavior {
@@ -341,4 +358,119 @@ TEST_F(ClientFailoverTServerTimeoutITest, FailoverOnLeaderTimeout) {
   EXPECT_GE(workload.rows_inserted(), rows_target);
 }
 
+// Series of tests to verify that client fails over to another available server
+// if it experiences a timeout on connection negotiation with current server.
+// The 'server' can be both a master and a tablet server.
+class ClientFailoverOnNegotiationTimeoutITest : public KuduTest {
+ public:
+  ClientFailoverOnNegotiationTimeoutITest() {
+    // Since we want to catch timeout during connection negotiation phase,
+    // let's make the client re-establishing connections on every RPC.
+    FLAGS_rpc_reopen_outbound_connections = true;
+    // Set the connection negotiation timeout shorter than its default value
+    // to run the test faster.
+    FLAGS_rpc_negotiation_timeout_ms = 1000;
+
+    cluster_opts_.extra_tserver_flags = {
+        // Speed up Raft elections.
+        "--raft_heartbeat_interval_ms=25",
+        "--leader_failure_exp_backoff_max_delta_ms=1000",
+        // Decreasing TS->master heartbeat interval speeds up the test.
+        "--heartbeat_interval_ms=25",
+    };
+    cluster_opts_.extra_master_flags = {
+        // Speed up Raft elections.
+        "--raft_heartbeat_interval_ms=25",
+        "--leader_failure_exp_backoff_max_delta_ms=1000",
+    };
+  }
+
+  Status CreateAndStartCluster() {
+    cluster_.reset(new ExternalMiniCluster(cluster_opts_));
+    return cluster_->Start();
+  }
+
+  void TearDown() override {
+    if (cluster_) {
+      cluster_->Shutdown();
+    }
+    KuduTest::TearDown();
+  }
+ protected:
+  ExternalMiniClusterOptions cluster_opts_;
+  shared_ptr<ExternalMiniCluster> cluster_;
+};
+
+// Regression test for KUDU-1580: if a client times out on negotiating a connection
+// to a tablet server, it should retry with other available tablet server.
+TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) {
+  static const int kNumTabletServers = 3;
+  static const int kTimeoutMs = 5 * 60 * 1000;
+  static const char* kTableName = "kudu1580";
+
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  cluster_opts_.num_tablet_servers = kNumTabletServers;
+  ASSERT_OK(CreateAndStartCluster());
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(
+      &KuduClientBuilder()
+          .default_admin_operation_timeout(MonoDelta::FromMilliseconds(kTimeoutMs))
+          .default_rpc_timeout(MonoDelta::FromMilliseconds(kTimeoutMs)),
+      &client));
+  unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+  KuduSchema schema(client::KuduSchemaFromSchema(CreateKeyValueTestSchema()));
+  ASSERT_OK(table_creator->table_name(kTableName)
+      .schema(&schema)
+      .add_hash_partitions({ "key" }, kNumTabletServers)
+      .num_replicas(kNumTabletServers)
+      .Create());
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+
+  shared_ptr<KuduSession> session = client->NewSession();
+  session->SetTimeoutMillis(kTimeoutMs);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  // Running multiple iterations to cover possible variations of tablet leader
+  // placement among tablet servers.
+  for (int i = 0; i < 8 * kNumTabletServers; ++i) {
+    vector<unique_ptr<ScopedResumeExternalDaemon>> resumers;
+    for (int tsi = 0; tsi < kNumTabletServers; ++tsi) {
+      ExternalTabletServer* ts(cluster_->tablet_server(tsi));
+      ASSERT_NE(nullptr, ts);
+      ASSERT_OK(ts->Pause());
+      resumers.emplace_back(new ScopedResumeExternalDaemon(ts));
+    }
+
+    // Resume 2 out of 3 tablet servers (i.e. the majority), so the client
+    // could eventially succeed with its write operations.
+    thread resume_thread([&]() {
+        const int idx0 = rand() % kNumTabletServers;
+        unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release());
+        const int idx1 = (idx0 + 1) % kNumTabletServers;
+        unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release());
+        SleepFor(MonoDelta::FromSeconds(1));
+      });
+    // An automatic clean-up to handle both success and failure cases
+    // in the code below.
+    auto cleanup = MakeScopedCleanup([&]() {
+        resume_thread.join();
+      });
+
+    // Since the table is hash-partitioned with kNumTabletServer partitions,
+    // hopefully three sequential numbers would go into different partitions.
+    for (int ii = 0; ii < kNumTabletServers; ++ii) {
+      unique_ptr<KuduInsert> ins(table->NewInsert());
+      ASSERT_OK(ins->mutable_row()->SetInt32(0, kNumTabletServers * i + ii));
+      ASSERT_OK(ins->mutable_row()->SetInt32(1, 0));
+      ASSERT_OK(session->Apply(ins.release()));
+    }
+  }
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index ae3ca79..ded7710 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -52,7 +52,6 @@ DEFINE_int64(client_inserts_per_thread, 50,
 DEFINE_int64(client_num_batches_per_thread, 5,
              "In how many batches to group the rows, for each client");
 DECLARE_int32(consensus_rpc_timeout_ms);
-DECLARE_int64(rpc_negotiation_timeout_ms);
 
 METRIC_DECLARE_entity(tablet);
 METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
@@ -1044,10 +1043,6 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
   // Reset consensus rpc timeout to the default value or the election might fail often.
   FLAGS_consensus_rpc_timeout_ms = 1000;
 
-  // TODO(KUDU-1580): this test seems to frequently trigger RPC negotiation timeouts,
-  // and the client doesn't properly fail over in this case.
-  FLAGS_rpc_negotiation_timeout_ms = 10000;
-
   // Start a 7 node configuration cluster (since we can't bring leaders back we start with a
   // higher replica count so that we kill more leaders).
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 3d3c9f2..fc46d67 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -61,6 +61,8 @@ using strings::Substitute;
 namespace kudu {
 namespace rpc {
 
+typedef OutboundCall::Phase Phase;
+
 ///
 /// Connection
 ///
@@ -164,7 +166,10 @@ void Connection::Shutdown(const Status &status,
       if (rpc_error) {
         error.reset(new ErrorStatusPB(*rpc_error));
       }
-      c->call->SetFailed(status, error.release());
+      c->call->SetFailed(status,
+                         negotiation_complete_ ? Phase::REMOTE_CALL
+                                               : Phase::CONNECTION_NEGOTIATION,
+                         error.release());
     }
     // And we must return the CallAwaitingResponse to the pool
     car_pool_.Destroy(c);
@@ -238,7 +243,8 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
   DCHECK(!car->call->IsFinished());
 
   // Mark the call object as failed.
-  car->call->SetTimedOut();
+  car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL
+                                               : Phase::CONNECTION_NEGOTIATION);
 
   // Drop the reference to the call. If the original caller has moved on after
   // seeing the timeout, we no longer need to hold onto the allocated memory
@@ -289,7 +295,9 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
 
   if (PREDICT_FALSE(!shutdown_status_.ok())) {
     // Already shutdown
-    call->SetFailed(shutdown_status_);
+    call->SetFailed(shutdown_status_,
+                    negotiation_complete_ ? Phase::REMOTE_CALL
+                                          : Phase::CONNECTION_NEGOTIATION);
     return;
   }
 
@@ -305,7 +313,8 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   slices_tmp_.clear();
   Status s = call->SerializeTo(&slices_tmp_);
   if (PREDICT_FALSE(!s.ok())) {
-    call->SetFailed(s);
+    call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL
+                                             : Phase::CONNECTION_NEGOTIATION);
     return;
   }
 
@@ -599,7 +608,8 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
           outbound_transfers_.pop_front();
           Status s = Status::NotSupported("server does not support the required RPC features");
           transfer->Abort(s);
-          car->call->SetFailed(s);
+          car->call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL
+                                                        : Phase::CONNECTION_NEGOTIATION);
           car->call.reset();
           delete transfer;
           continue;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 73025d6..af03f1c 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -153,8 +153,12 @@ string OutboundCall::StateName(State state) {
       return "SENDING";
     case SENT:
       return "SENT";
+    case NEGOTIATION_TIMED_OUT:
+      return "NEGOTIATION_TIMED_OUT";
     case TIMED_OUT:
       return "TIMED_OUT";
+    case FINISHED_NEGOTIATION_ERROR:
+      return "FINISHED_NEGOTIATION_ERROR";
     case FINISHED_ERROR:
       return "FINISHED_ERROR";
     case FINISHED_SUCCESS:
@@ -191,6 +195,9 @@ void OutboundCall::set_state_unlocked(State new_state) {
     case SENT:
       DCHECK_EQ(state_, SENDING);
       break;
+    case NEGOTIATION_TIMED_OUT:
+      DCHECK(state_ == ON_OUTBOUND_QUEUE);
+      break;
     case TIMED_OUT:
       DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
       break;
@@ -251,7 +258,7 @@ void OutboundCall::SetResponse(gscoped_ptr<CallResponse> resp) {
       return;
     }
     ErrorStatusPB* err_raw = err.release();
-    SetFailed(Status::RemoteError(err_raw->message()), err_raw);
+    SetFailed(Status::RemoteError(err_raw->message()), Phase::REMOTE_CALL, err_raw);
   }
 }
 
@@ -279,38 +286,65 @@ void OutboundCall::SetSent() {
 }
 
 void OutboundCall::SetFailed(const Status &status,
+                             Phase phase,
                              ErrorStatusPB* err_pb) {
+  DCHECK(!status.ok());
+  DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
   {
     std::lock_guard<simple_spinlock> l(lock_);
     status_ = status;
     if (err_pb) {
       error_pb_.reset(err_pb);
     }
-    set_state_unlocked(FINISHED_ERROR);
+    set_state_unlocked(phase == Phase::CONNECTION_NEGOTIATION
+        ? FINISHED_NEGOTIATION_ERROR
+        : FINISHED_ERROR);
   }
   CallCallback();
 }
 
-void OutboundCall::SetTimedOut() {
+void OutboundCall::SetTimedOut(Phase phase) {
+  static const char* kErrMsgNegotiation =
+      "connection negotiation to $1 for RPC $0 timed out after $2 ($3)";
+  static const char* kErrMsgCall = "$0 RPC to $1 timed out after $2 ($3)";
+  DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
+
   // We have to fetch timeout outside the lock to avoid a lock
   // order inversion between this class and RpcController.
-  MonoDelta timeout = controller_->timeout();
+  const MonoDelta timeout = controller_->timeout();
   {
     std::lock_guard<simple_spinlock> l(lock_);
-    status_ = Status::TimedOut(Substitute(
-        "$0 RPC to $1 timed out after $2 ($3)",
-        remote_method_.method_name(),
-        conn_id_.remote().ToString(),
-        timeout.ToString(),
-        StateName(state_)));
-    set_state_unlocked(TIMED_OUT);
+    status_ = Status::TimedOut(
+        Substitute((phase == Phase::REMOTE_CALL) ? kErrMsgCall : kErrMsgNegotiation,
+                   remote_method_.method_name(),
+                   conn_id_.remote().ToString(),
+                   timeout.ToString(),
+                   StateName(state_)));
+    set_state_unlocked((phase == Phase::REMOTE_CALL) ? TIMED_OUT : NEGOTIATION_TIMED_OUT);
   }
   CallCallback();
 }
 
 bool OutboundCall::IsTimedOut() const {
   std::lock_guard<simple_spinlock> l(lock_);
-  return state_ == TIMED_OUT;
+  switch (state_) {
+    case NEGOTIATION_TIMED_OUT:       // fall-through
+    case TIMED_OUT:
+      return true;
+    default:
+      return false;
+  }
+}
+
+bool OutboundCall::IsNegotiationError() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  switch (state_) {
+    case FINISHED_NEGOTIATION_ERROR:  // fall-through
+    case NEGOTIATION_TIMED_OUT:
+      return true;
+    default:
+      return false;
+  }
 }
 
 bool OutboundCall::IsFinished() const {
@@ -321,7 +355,9 @@ bool OutboundCall::IsFinished() const {
     case ON_OUTBOUND_QUEUE:
     case SENT:
       return false;
+    case NEGOTIATION_TIMED_OUT:
     case TIMED_OUT:
+    case FINISHED_NEGOTIATION_ERROR:
     case FINISHED_ERROR:
     case FINISHED_SUCCESS:
       return true;
@@ -339,8 +375,7 @@ void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
                           RpcCallInProgressPB* resp) {
   std::lock_guard<simple_spinlock> l(lock_);
   resp->mutable_header()->CopyFrom(header_);
-  resp->set_micros_elapsed(
-      (MonoTime::Now() - start_time_).ToMicroseconds());
+  resp->set_micros_elapsed((MonoTime::Now() - start_time_).ToMicroseconds());
 
   switch (state_) {
     case READY:
@@ -356,9 +391,15 @@ void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
     case SENT:
       resp->set_state(RpcCallInProgressPB::SENT);
       break;
+    case NEGOTIATION_TIMED_OUT:
+      resp->set_state(RpcCallInProgressPB::NEGOTIATION_TIMED_OUT);
+      break;
     case TIMED_OUT:
       resp->set_state(RpcCallInProgressPB::TIMED_OUT);
       break;
+    case FINISHED_NEGOTIATION_ERROR:
+      resp->set_state(RpcCallInProgressPB::FINISHED_NEGOTIATION_ERROR);
+      break;
     case FINISHED_ERROR:
       resp->set_state(RpcCallInProgressPB::FINISHED_ERROR);
       break;
@@ -366,7 +407,6 @@ void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
       resp->set_state(RpcCallInProgressPB::FINISHED_SUCCESS);
       break;
   }
-
 }
 
 ///

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index ee7bc64..ebed9b5 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -55,7 +55,6 @@ class RpcCallInProgressPB;
 class RpcController;
 class RpcSidecar;
 
-
 // Used to key on Connection information.
 // For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
 // This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
@@ -120,6 +119,18 @@ class ConnectionIdEqual {
 // of different threads, making it tricky to enforce single ownership.
 class OutboundCall {
  public:
+
+  // Phases of an outbound RPC. Making an outbound RPC might involve establishing
+  // a connection to the remote server first, and the actual call is made only
+  // once the connection to the server is established.
+  enum class Phase {
+    // The phase of connection negotiation between the caller and the callee.
+    CONNECTION_NEGOTIATION,
+
+    // The phase of sending a call over already established connection.
+    REMOTE_CALL,
+  };
+
   OutboundCall(const ConnectionId& conn_id, const RemoteMethod& remote_method,
                google::protobuf::Message* response_storage,
                RpcController* controller, ResponseCallback callback);
@@ -160,13 +171,16 @@ class OutboundCall {
   // should be set to the error returned by the remote server. Takes
   // ownership of 'err_pb'.
   void SetFailed(const Status& status,
+                 Phase phase = Phase::REMOTE_CALL,
                  ErrorStatusPB* err_pb = nullptr);
 
   // Mark the call as timed out. This also triggers the callback to notify
   // the caller.
-  void SetTimedOut();
+  void SetTimedOut(Phase phase);
   bool IsTimedOut() const;
 
+  bool IsNegotiationError() const;
+
   // Is the call finished?
   bool IsFinished() const;
 
@@ -212,7 +226,9 @@ class OutboundCall {
     ON_OUTBOUND_QUEUE,
     SENDING,
     SENT,
+    NEGOTIATION_TIMED_OUT,
     TIMED_OUT,
+    FINISHED_NEGOTIATION_ERROR,
     FINISHED_ERROR,
     FINISHED_SUCCESS
   };

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 4141ac3..e235dd4 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -70,17 +70,14 @@ using std::shared_ptr;
 using std::unique_ptr;
 using strings::Substitute;
 
-// TODO(KUDU-1580). This timeout has been bumped from 3 seconds up to
-// 15 seconds to workaround a bug. We should drop it back down when
-// KUDU-1580 is fixed.
-DEFINE_int64(rpc_negotiation_timeout_ms, 15000,
+DEFINE_int64(rpc_negotiation_timeout_ms, 3000,
              "Timeout for negotiating an RPC connection.");
 TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
 TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
 
 DEFINE_bool(rpc_reopen_outbound_connections, false,
-            "Open a new connection to the server for every RPC call, "
-            "if possible. If not enabled, an already existing connection to a "
+            "Open a new connection to the server for every RPC call. "
+            "If not enabled, an already existing connection to a "
             "server is reused upon making another call to the same server. "
             "When this flag is enabled, an already existing _idle_ connection "
             "to the server is closed upon making another RPC call which would "
@@ -253,7 +250,7 @@ void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
                                    call->controller()->credentials_policy(),
                                    &conn);
   if (PREDICT_FALSE(!s.ok())) {
-    call->SetFailed(s);
+    call->SetFailed(s, OutboundCall::Phase::CONNECTION_NEGOTIATION);
     return;
   }
 
@@ -710,7 +707,9 @@ class AssignOutboundCallTask : public ReactorTask {
   }
 
   void Abort(const Status& status) override {
-    call_->SetFailed(status);
+    // It doesn't matter what is the actual phase of the OutboundCall: just set
+    // it to Phase::REMOTE_CALL to finilize the state of the call.
+    call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL);
     delete this;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 6b97b4b..c40f546 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -486,7 +486,9 @@ class RpcTestBase : public KuduTest {
     CHECK_EQ(resp.data2(), s2);
   }
 
-  void DoTestExpectTimeout(const Proxy &p, const MonoDelta &timeout) {
+  void DoTestExpectTimeout(const Proxy& p,
+                           const MonoDelta& timeout,
+                           bool* is_negotiaton_error = nullptr) {
     SleepRequestPB req;
     SleepResponsePB resp;
     // Sleep for 500ms longer than the call timeout.
@@ -498,8 +500,11 @@ class RpcTestBase : public KuduTest {
     Stopwatch sw;
     sw.start();
     Status s = p.SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c);
-    ASSERT_FALSE(s.ok());
     sw.stop();
+    ASSERT_FALSE(s.ok());
+    if (is_negotiaton_error != nullptr) {
+      *is_negotiaton_error = c.negotiation_failed();
+    }
 
     int expected_millis = timeout.ToMilliseconds();
     int elapsed_millis = sw.elapsed().wall_millis();

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 643e61f..63b7b73 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -541,7 +541,10 @@ TEST_F(TestRpc, TestNegotiationTimeout) {
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
   Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
 
-  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(100)));
+  bool is_negotiation_error = false;
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(
+      p, MonoDelta::FromMilliseconds(100), &is_negotiation_error));
+  EXPECT_TRUE(is_negotiation_error);
 
   acceptor_thread->Join();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 9120b72..505db22 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -23,8 +23,8 @@
 
 #include <glog/logging.h>
 
-#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/rpc_header.pb.h"
 
 using std::unique_ptr;
 
@@ -72,6 +72,14 @@ bool RpcController::finished() const {
   return false;
 }
 
+bool RpcController::negotiation_failed() const {
+  if (call_) {
+    DCHECK(finished());
+    return call_->IsNegotiationError();
+  }
+  return false;
+}
+
 Status RpcController::status() const {
   if (call_) {
     return call_->status();

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index db714bf..ab611a8 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -89,6 +89,9 @@ class RpcController {
   // has timed out.
   bool finished() const;
 
+  // Whether the call failed due to connection negotiation error.
+  bool negotiation_failed() const;
+
   // Return the current status of a call.
   //
   // A call is "OK" status until it finishes, at which point it may

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3c5dd18/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_introspection.proto b/src/kudu/rpc/rpc_introspection.proto
index f7e2309..9d2f9b5 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -40,6 +40,8 @@ message RpcCallInProgressPB {
     TIMED_OUT = 4;
     FINISHED_ERROR = 5;
     FINISHED_SUCCESS = 6;
+    NEGOTIATION_TIMED_OUT = 7;
+    FINISHED_NEGOTIATION_ERROR = 8;
 
     // TODO(todd): add states for InboundCall
   }