You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/02/26 22:05:30 UTC

[kudu] branch master updated (6eec549 -> 877b9f6)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 6eec549  [tests] address flake in TestClusterWithLocation
     new c557576  Add GetTableLocations tracing
     new 16d9a86  KUDU-2690: don't roll log schema on failed alter
     new 877b9f6  KUDU-1868: Part 1: Add timer-based RPC timeouts

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kudu/client/AlterTableRequest.java  |  12 +-
 .../org/apache/kudu/client/AlterTableResponse.java |   6 +-
 .../org/apache/kudu/client/AsyncKuduClient.java    | 169 ++++++++++-----
 .../org/apache/kudu/client/AsyncKuduScanner.java   |   9 +-
 .../org/apache/kudu/client/AsyncKuduSession.java   |  21 +-
 .../main/java/org/apache/kudu/client/Batch.java    |  28 ++-
 .../java/org/apache/kudu/client/BatchResponse.java |  14 +-
 .../org/apache/kudu/client/ConnectToCluster.java   |  20 +-
 .../apache/kudu/client/ConnectToMasterRequest.java |   7 +-
 .../org/apache/kudu/client/CreateTableRequest.java |  11 +-
 .../apache/kudu/client/CreateTableResponse.java    |   6 +-
 .../org/apache/kudu/client/DeleteTableRequest.java |   8 +-
 .../apache/kudu/client/DeleteTableResponse.java    |   6 +-
 .../kudu/client/GetTableLocationsRequest.java      |  13 +-
 .../apache/kudu/client/GetTableSchemaRequest.java  |   9 +-
 .../apache/kudu/client/GetTableSchemaResponse.java |   6 +-
 .../kudu/client/IsAlterTableDoneRequest.java       |  11 +-
 .../kudu/client/IsCreateTableDoneRequest.java      |  12 +-
 .../main/java/org/apache/kudu/client/KuduRpc.java  |  41 +++-
 .../org/apache/kudu/client/ListTablesRequest.java  |  11 +-
 .../org/apache/kudu/client/ListTablesResponse.java |   4 +-
 .../kudu/client/ListTabletServersRequest.java      |  14 +-
 .../kudu/client/ListTabletServersResponse.java     |   6 +-
 .../org/apache/kudu/client/ListTabletsRequest.java |   8 +-
 .../apache/kudu/client/ListTabletsResponse.java    |   4 +-
 .../java/org/apache/kudu/client/Operation.java     |  27 ++-
 .../org/apache/kudu/client/OperationResponse.java  |  14 +-
 .../java/org/apache/kudu/client/PingRequest.java   |  12 +-
 .../org/apache/kudu/client/RowResultIterator.java  |  19 +-
 .../main/java/org/apache/kudu/client/RpcProxy.java |   6 +-
 .../apache/kudu/client/TestAsyncKuduSession.java   |   2 +-
 .../apache/kudu/client/TestConnectionCache.java    |   2 +-
 .../java/org/apache/kudu/client/TestTimeouts.java  |  66 +++++-
 src/kudu/consensus/log.cc                          |   2 +
 src/kudu/master/catalog_manager.cc                 |   2 +-
 src/kudu/master/master_service.cc                  |   4 +
 src/kudu/tablet/tablet.cc                          |  13 +-
 src/kudu/tablet/tablet_bootstrap.cc                |  37 +++-
 src/kudu/tablet/tablet_replica-test.cc             | 226 ++++++++++++++++++---
 .../transactions/alter_schema_transaction.cc       |  43 ++--
 .../tablet/transactions/alter_schema_transaction.h |  57 +++---
 41 files changed, 722 insertions(+), 266 deletions(-)


[kudu] 02/03: KUDU-2690: don't roll log schema on failed alter

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 16d9a863d0f31069965aee1a71912a65339a4160
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Feb 7 14:33:13 2019 -0800

    KUDU-2690: don't roll log schema on failed alter
    
    It is possible to update the log segment header schema version based on
    an AlterSchema operation that failed. This is because an alter operation
    that didn't succeed (e.g. because of a schema version mismatch) is
    treated as a successful transaction (similar to how we treat a
    duplicated insert transaction as a successful transaction), and can thus
    mistakenly lead to updating the log segment schema version, even on
    failure. This can lead to a mismatch of schemas between the log segment
    headers and the write ops in those log segments, which can lead to a
    failure to bootstrap.
    
    This patch addresses this by:
    1. making the tablet no-op if it sees that an alter didn't go through,
    2. storing the error in the commit message, so further bootstraps will
       skip over the op (this is KUDU-860).
    
    Only the former is necessary to prevent the issue, and though the latter
    uses extra space, it may be helpful for added visibility and debugging.
    
    This patch adds a unit test that reproduced the scenario at the
    TabletReplica level, as well as a less targeted test that uses the same
    principal to test that we update the log segment schema upon replaying
    an alter. A more end-to-end test that arrives at the reported state by
    working around master-level table locking can be found here[1].
    
    [1] https://gist.github.com/andrwng/3a049bb038680cc0254c5ba52b9a7507
    
    Change-Id: Id761851741297e29a4666bec0c34fc4f7285f715
    Reviewed-on: http://gerrit.cloudera.org:8080/12462
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Todd Lipcon <to...@apache.org>
---
 src/kudu/consensus/log.cc                          |   2 +
 src/kudu/tablet/tablet.cc                          |  13 +-
 src/kudu/tablet/tablet_bootstrap.cc                |  37 +++-
 src/kudu/tablet/tablet_replica-test.cc             | 226 ++++++++++++++++++---
 .../transactions/alter_schema_transaction.cc       |  43 ++--
 .../tablet/transactions/alter_schema_transaction.h |  57 +++---
 6 files changed, 296 insertions(+), 82 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index c61fcde..43b11a1 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -933,6 +933,8 @@ int64_t Log::OnDiskSize() {
 
 void Log::SetSchemaForNextLogSegment(const Schema& schema,
                                      uint32_t version) {
+  VLOG_WITH_PREFIX(2) << Substitute("Setting schema version $0 for next log segment $1",
+                                    version, schema.ToString());
   std::lock_guard<rw_spinlock> l(schema_lock_);
   schema_ = schema;
   schema_version_ = version;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 899fc7f..05d4d2f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1246,9 +1246,9 @@ Status Tablet::CreatePreparedAlterSchema(AlterSchemaTransactionState *tx_state,
   return Status::OK();
 }
 
-Status Tablet::AlterSchema(AlterSchemaTransactionState *tx_state) {
-  DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(tx_state->schema()))) <<
-    "Schema keys cannot be altered(except name)";
+Status Tablet::AlterSchema(AlterSchemaTransactionState* tx_state) {
+  DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(tx_state->schema())))
+      << "Schema keys cannot be altered(except name)";
 
   // Prevent any concurrent flushes. Otherwise, we run into issues where
   // we have an MRS in the rowset tree, and we can't alter its schema
@@ -1258,8 +1258,11 @@ Status Tablet::AlterSchema(AlterSchemaTransactionState *tx_state) {
   // If the current version >= new version, there is nothing to do.
   bool same_schema = schema()->Equals(*tx_state->schema());
   if (metadata_->schema_version() >= tx_state->schema_version()) {
-    LOG_WITH_PREFIX(INFO) << "Already running schema version " << metadata_->schema_version()
-                          << " got alter request for version " << tx_state->schema_version();
+    const string msg =
+        Substitute("Skipping requested alter to schema version $0, tablet already "
+                   "version $1", tx_state->schema_version(), metadata_->schema_version());
+    LOG_WITH_PREFIX(INFO) << msg;
+    tx_state->SetError(Status::InvalidArgument(msg));
     return Status::OK();
   }
 
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 3294149..2b03540 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -27,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -1446,6 +1447,29 @@ Status TabletBootstrap::PlayWriteRequest(const IOContext* io_context,
 Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
                                                ReplicateMsg* replicate_msg,
                                                const CommitMsg& commit_msg) {
+  // There are three potential outcomes to expect with this replay:
+  // 1. There is no 'result' in the commit message. The alter succeeds, and the
+  //    log updates its schema.
+  // 2. There is no 'result' in the commit message. The alter fails, and the
+  //    log doesn't update its schema. This can happen if trying to replay an
+  //    invalid alter schema request from before we started putting the results
+  //    in the commit message. Note that we'll leave the commit message as is;
+  //    it's harmless since replaying the operation should be a no-op anyway.
+  // 3. The commit message contains a 'result', which should only happen if the
+  //    alter resulted in a failure. Exit out without attempting the alter.
+  if (commit_msg.has_result()) {
+    // If we put a result in the commit message, it should be an error and we
+    // don't need to replay it. In case, in the future, we decide to put
+    // positive results in the commit messages, just filter ops that have
+    // failed statuses instead of D/CHECKing.
+    DCHECK_EQ(1, commit_msg.result().ops_size());
+    const OperationResultPB& op = commit_msg.result().ops(0);
+    if (op.has_failed_status()) {
+      Status error = StatusFromPB(op.failed_status());
+      VLOG(1) << "Played a failed alter request: " << error.ToString();
+      return AppendCommitMsg(commit_msg);
+    }
+  }
   AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request();
 
   // Decode schema
@@ -1453,19 +1477,16 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
   RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema));
 
   AlterSchemaTransactionState tx_state(nullptr, alter_schema, nullptr);
-
-  // TODO(KUDU-860): we should somehow distinguish if an alter table failed on its original
-  // attempt (e.g due to being an invalid request, or a request with a too-early
-  // schema version).
-
   RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&tx_state, &schema));
 
   // Apply the alter schema to the tablet
   RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&tx_state), "Failed to AlterSchema:");
 
-  // Also update the log information. Normally, the AlterSchema() call above
-  // takes care of this, but our new log isn't hooked up to the tablet yet.
-  log_->SetSchemaForNextLogSegment(schema, tx_state.schema_version());
+  if (!tx_state.error()) {
+    // If the alter completed successfully, update the log segment header. Note
+    // that our new log isn't hooked up to the tablet yet.
+    log_->SetSchemaForNextLogSegment(schema, tx_state.schema_version());
+  }
 
   return AppendCommitMsg(commit_msg);
 }
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 8508dd5..4cf9f03 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/tablet/tablet_replica.h"
+
 #include <cstdint>
 #include <memory>
 #include <ostream>
@@ -35,6 +37,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -49,20 +52,21 @@
 #include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metadata.h"
-#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/tablet_replica_mm_ops.h"
+#include "kudu/tablet/transactions/alter_schema_transaction.h"
 #include "kudu/tablet/transactions/transaction.h"
 #include "kudu/tablet/transactions/transaction_driver.h"
 #include "kudu/tablet/transactions/transaction_tracker.h"
 #include "kudu/tablet/transactions/write_transaction.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
@@ -78,10 +82,14 @@ METRIC_DECLARE_entity(tablet);
 DECLARE_int32(flush_threshold_mb);
 
 namespace kudu {
+
+class MemTracker;
+
 namespace tablet {
 
 using consensus::CommitMsg;
 using consensus::ConsensusBootstrapInfo;
+using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataManager;
 using consensus::OpId;
 using consensus::RECEIVED_OPID;
@@ -93,9 +101,12 @@ using log::LogOptions;
 using pb_util::SecureDebugString;
 using pb_util::SecureShortDebugString;
 using rpc::Messenger;
+using rpc::ResultTracker;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using tserver::AlterSchemaRequestPB;;
+using tserver::AlterSchemaResponsePB;;
 using tserver::WriteRequestPB;
 using tserver::WriteResponsePB;
 
@@ -111,17 +122,8 @@ class TabletReplicaTest : public KuduTabletTest {
       delete_counter_(0) {
   }
 
-  virtual void SetUp() OVERRIDE {
-    KuduTabletTest::SetUp();
-
-    ASSERT_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
-    ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
-    ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
-
-    rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
-    ASSERT_OK(builder.Build(&messenger_));
-
-    metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
+  void SetUpReplica(bool new_replica = true) {
+    ASSERT_TRUE(tablet_replica_.get() == nullptr);
 
     RaftConfigPB config;
     config.set_opid_index(consensus::kInvalidOpIdIndex);
@@ -132,15 +134,15 @@ class TabletReplicaTest : public KuduTabletTest {
     config_peer->mutable_last_known_addr()->set_port(0);
     config_peer->set_member_type(RaftPeerPB::VOTER);
 
-    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
-        new ConsensusMetadataManager(tablet()->metadata()->fs_manager()));
 
-    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm));
+    if (new_replica) {
+      ASSERT_OK(cmeta_manager_->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm));
+    }
 
     // "Bootstrap" and start the TabletReplica.
     tablet_replica_.reset(
       new TabletReplica(tablet()->shared_metadata(),
-                        cmeta_manager,
+                        cmeta_manager_,
                         *config_peer,
                         apply_pool_.get(),
                         Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
@@ -154,6 +156,22 @@ class TabletReplicaTest : public KuduTabletTest {
     tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
   }
 
+  virtual void SetUp() override {
+    KuduTabletTest::SetUp();
+
+    ASSERT_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
+    ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
+    ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
+
+    rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
+    ASSERT_OK(builder.Build(&messenger_));
+
+    cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager()));
+
+    metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
+    NO_FATALS(SetUpReplica());
+  }
+
   Status StartReplica(const ConsensusBootstrapInfo& info) {
     scoped_refptr<Log> log;
     RETURN_NOT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
@@ -164,7 +182,7 @@ class TabletReplicaTest : public KuduTabletTest {
                                   tablet(),
                                   clock(),
                                   messenger_,
-                                  scoped_refptr<rpc::ResultTracker>(),
+                                  scoped_refptr<ResultTracker>(),
                                   log,
                                   prepare_pool_.get());
   }
@@ -179,22 +197,55 @@ class TabletReplicaTest : public KuduTabletTest {
     LOG(INFO) << "Tablet replica state changed for tablet " << tablet_id << ". Reason: " << reason;
   }
 
-  virtual void TearDown() OVERRIDE {
+  virtual void TearDown() override {
     tablet_replica_->Shutdown();
     prepare_pool_->Shutdown();
     apply_pool_->Shutdown();
     KuduTabletTest::TearDown();
   }
 
+  void RestartReplica() {
+    tablet_replica_->Shutdown();
+    tablet_replica_.reset();
+    NO_FATALS(SetUpReplica(/*new_replica=*/ false));
+    scoped_refptr<ConsensusMetadata> cmeta;
+    ASSERT_OK(cmeta_manager_->Load(tablet_replica_->tablet_id(), &cmeta));
+    shared_ptr<Tablet> tablet;
+    scoped_refptr<Log> log;
+    ConsensusBootstrapInfo bootstrap_info;
+
+    tablet_replica_->SetBootstrapping();
+    ASSERT_OK(BootstrapTablet(tablet_replica_->tablet_metadata(),
+                              cmeta->CommittedConfig(),
+                              clock(),
+                              shared_ptr<MemTracker>(),
+                              scoped_refptr<ResultTracker>(),
+                              &metric_registry_,
+                              tablet_replica_,
+                              &tablet,
+                              &log,
+                              tablet_replica_->log_anchor_registry(),
+                              &bootstrap_info));
+    ASSERT_OK(tablet_replica_->Start(bootstrap_info,
+                                     tablet,
+                                     clock(),
+                                     messenger_,
+                                     scoped_refptr<ResultTracker>(),
+                                     log,
+                                     prepare_pool_.get()));
+  }
+
  protected:
   // Generate monotonic sequence of key column integers.
-  Status GenerateSequentialInsertRequest(WriteRequestPB* write_req) {
-    Schema schema(GetTestSchema());
+  Status GenerateSequentialInsertRequest(const Schema& schema,
+                                         WriteRequestPB* write_req) {
     write_req->set_tablet_id(tablet()->tablet_id());
-    CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
+    RETURN_NOT_OK(SchemaToPB(schema, write_req->mutable_schema()));
 
     KuduPartialRow row(&schema);
-    CHECK_OK(row.SetInt32("key", insert_counter_++));
+    for (int i = 0; i < schema.num_columns(); i++) {
+      RETURN_NOT_OK(row.SetInt32(i, insert_counter_++));
+    }
 
     RowOperationsPBEncoder enc(write_req->mutable_row_operations());
     enc.Add(RowOperationsPB::INSERT, row);
@@ -217,9 +268,9 @@ class TabletReplicaTest : public KuduTabletTest {
     return Status::OK();
   }
 
-  Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB& req) {
-    gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
-    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_replica,
+  Status ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
+    unique_ptr<WriteResponsePB> resp(new WriteResponsePB());
+    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(replica,
                                                                          &req,
                                                                          nullptr, // No RequestIdPB
                                                                          resp.get()));
@@ -228,18 +279,50 @@ class TabletReplicaTest : public KuduTabletTest {
     tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
         new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
 
-    CHECK_OK(tablet_replica->SubmitWrite(std::move(tx_state)));
+    RETURN_NOT_OK(replica->SubmitWrite(std::move(tx_state)));
+    rpc_latch.Wait();
+    CHECK(!resp->has_error())
+        << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
+    return Status::OK();
+  }
+
+  Status UpdateSchema(const SchemaPB& schema, int schema_version) {
+    AlterSchemaRequestPB alter;
+    alter.set_dest_uuid(tablet()->metadata()->fs_manager()->uuid());
+    alter.set_tablet_id(tablet()->tablet_id());
+    alter.set_schema_version(schema_version);
+    *alter.mutable_schema() = schema;
+    return ExecuteAlter(tablet_replica_.get(), alter);
+  }
+
+  Status ExecuteAlter(TabletReplica* replica, const AlterSchemaRequestPB& req) {
+    unique_ptr<AlterSchemaResponsePB> resp(new AlterSchemaResponsePB());
+    unique_ptr<AlterSchemaTransactionState> tx_state(
+        new AlterSchemaTransactionState(replica, &req, resp.get()));
+    CountDownLatch rpc_latch(1);
+    tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
+          new LatchTransactionCompletionCallback<AlterSchemaResponsePB>(&rpc_latch, resp.get())));
+    RETURN_NOT_OK(replica->SubmitAlterSchema(std::move(tx_state)));
     rpc_latch.Wait();
     CHECK(!resp->has_error())
         << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
+    return Status::OK();
+  }
+
+  Status RollLog(TabletReplica* replica) {
+    RETURN_NOT_OK(replica->log_->WaitUntilAllFlushed());
+    return replica->log_->AllocateSegmentAndRollOver();
+  }
+
+  Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB& req) {
+    RETURN_NOT_OK(ExecuteWrite(tablet_replica, req));
 
     // Roll the log after each write.
     // Usually the append thread does the roll and no additional sync is required. However in
     // this test the thread that is appending is not the same thread that is rolling the log
     // so we must make sure the Log's queue is flushed before we roll or we might have a race
     // between the appender thread and the thread executing the test.
-    CHECK_OK(tablet_replica->log_->WaitUntilAllFlushed());
-    CHECK_OK(tablet_replica->log_->AllocateSegmentAndRollOver());
+    CHECK_OK(RollLog(tablet_replica));
     return Status::OK();
   }
 
@@ -247,7 +330,7 @@ class TabletReplicaTest : public KuduTabletTest {
   Status ExecuteInsertsAndRollLogs(int num_inserts) {
     for (int i = 0; i < num_inserts; i++) {
       gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
-      RETURN_NOT_OK(GenerateSequentialInsertRequest(req.get()));
+      RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), req.get()));
       RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
     }
 
@@ -298,6 +381,8 @@ class TabletReplicaTest : public KuduTabletTest {
   gscoped_ptr<ThreadPool> apply_pool_;
   gscoped_ptr<ThreadPool> raft_pool_;
 
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
+
   // Must be destroyed before thread pools.
   scoped_refptr<TabletReplica> tablet_replica_;
 };
@@ -313,7 +398,7 @@ class DelayedApplyTransaction : public WriteTransaction {
         apply_continue_(DCHECK_NOTNULL(apply_continue)) {
   }
 
-  virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) OVERRIDE {
+  virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) override {
     apply_started_->CountDown();
     LOG(INFO) << "Delaying apply...";
     apply_continue_->Wait();
@@ -599,5 +684,84 @@ TEST_F(TabletReplicaTest, TestFlushOpsPerfImprovements) {
   stats.Clear();
 }
 
+// Test that the schema of a tablet will be rolled forward upon replaying an
+// alter schema request.
+TEST_F(TabletReplicaTest, TestRollLogSegmentSchemaOnAlter) {
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+  SchemaPB orig_schema_pb;
+  ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+  const int orig_schema_version = tablet()->metadata()->schema_version();
+
+  // Add a new column.
+  SchemaBuilder builder(tablet()->metadata()->schema());
+  ASSERT_OK(builder.AddColumn("new_col", INT32));
+  Schema new_client_schema = builder.BuildWithoutIds();
+  SchemaPB new_schema;
+  ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
+  ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
+
+  const auto write = [&] {
+    unique_ptr<WriteRequestPB> req(new WriteRequestPB());
+    ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
+    ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
+  };
+  // Upon restarting, our log segment header schema should have "new_col".
+  NO_FATALS(write());
+  NO_FATALS(RestartReplica());
+
+  // Get rid of the alter in the WALs.
+  NO_FATALS(write());
+  ASSERT_OK(RollLog(tablet_replica_.get()));
+  NO_FATALS(write());
+  tablet_replica_->tablet()->Flush();
+  ASSERT_OK(tablet_replica_->RunLogGC());
+
+  // Now write some more and restart. If our segment header schema previously
+  // didn't have "new_col", bootstrapping would fail, complaining about a
+  // mismatch between the segment header schema and the write request schema.
+  NO_FATALS(write());
+  NO_FATALS(RestartReplica());
+}
+
+// Regression test for KUDU-2690, wherein a alter schema request that failed
+// (e.g. because of an invalid schema) would roll forward the log segment
+// header schema, causing a failure or crash upon bootstrapping.
+TEST_F(TabletReplicaTest, Kudu2690Test) {
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+  SchemaPB orig_schema_pb;
+  ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+  const int orig_schema_version = tablet()->metadata()->schema_version();
+
+  // First things first, add a new column.
+  SchemaBuilder builder(tablet()->metadata()->schema());
+  ASSERT_OK(builder.AddColumn("new_col", INT32));
+  Schema new_client_schema = builder.BuildWithoutIds();
+  SchemaPB new_schema;
+  ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
+  ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
+
+  // Try to update the schema to an older version. Before the fix for
+  // KUDU-2690, this would revert the schema in the next log segment header
+  // upon rolling the log below.
+  ASSERT_OK(UpdateSchema(orig_schema_pb, orig_schema_version));
+
+  // Roll onto a new segment so we can begin filling a new segment. This allows
+  // us to GC the first segment.
+  ASSERT_OK(RollLog(tablet_replica_.get()));
+  {
+    unique_ptr<WriteRequestPB> req(new WriteRequestPB());
+    ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
+    ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
+  }
+  ASSERT_OK(tablet_replica_->RunLogGC());
+
+  // Before KUDU-2960 was fixed, bootstrapping would fail, complaining that the
+  // write requests contained a column that was not in the log segment header's
+  // schema.
+  NO_FATALS(RestartReplica());
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.cc b/src/kudu/tablet/transactions/alter_schema_transaction.cc
index c9f3a70..4da364e 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.cc
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.cc
@@ -18,6 +18,7 @@
 #include "kudu/tablet/transactions/alter_schema_transaction.h"
 
 #include <memory>
+#include <ostream>
 #include <utility>
 
 #include <glog/logging.h>
@@ -26,8 +27,10 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -47,16 +50,6 @@ using std::string;
 using std::unique_ptr;
 using strings::Substitute;
 using tserver::TabletServerErrorPB;
-using tserver::AlterSchemaRequestPB;
-using tserver::AlterSchemaResponsePB;
-
-string AlterSchemaTransactionState::ToString() const {
-  return Substitute("AlterSchemaTransactionState "
-                    "[timestamp=$0, schema=$1, request=$2]",
-                    has_timestamp() ? timestamp().ToString() : "<unassigned>",
-                    schema_ == nullptr ? "(none)" : schema_->ToString(),
-                    request_ == nullptr ? "(none)" : SecureShortDebugString(*request_));
-}
 
 void AlterSchemaTransactionState::AcquireSchemaLock(rw_semaphore* l) {
   TRACE("Acquiring schema lock in exclusive mode");
@@ -70,6 +63,21 @@ void AlterSchemaTransactionState::ReleaseSchemaLock() {
   TRACE("Released schema lock");
 }
 
+void AlterSchemaTransactionState::SetError(const Status& s) {
+  CHECK(!s.ok()) << "Expected an error status";
+  error_ = OperationResultPB();
+  StatusToPB(s, error_->mutable_failed_status());
+}
+
+string AlterSchemaTransactionState::ToString() const {
+  return Substitute("AlterSchemaTransactionState "
+                    "[timestamp=$0, schema=$1, request=$2, error=$3]",
+                    has_timestamp() ? timestamp().ToString() : "<unassigned>",
+                    schema_ == nullptr ? "(none)" : schema_->ToString(),
+                    request_ == nullptr ? "(none)" : SecureShortDebugString(*request_),
+                    error_ ? "(none)" : SecureShortDebugString(error_->failed_status()));
+}
+
 AlterSchemaTransaction::AlterSchemaTransaction(unique_ptr<AlterSchemaTransactionState> state,
                                                DriverType type)
     : Transaction(state.get(), type, Transaction::ALTER_SCHEMA_TXN),
@@ -115,6 +123,18 @@ Status AlterSchemaTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
 
   Tablet* tablet = state_->tablet_replica()->tablet();
   RETURN_NOT_OK(tablet->AlterSchema(state()));
+
+  commit_msg->reset(new CommitMsg());
+  (*commit_msg)->set_op_type(ALTER_SCHEMA_OP);
+
+  // If there was a logical error (e.g. bad schema version) with the alter,
+  // record the error and exit.
+  if (state_->error()) {
+    TxResultPB* result = (*commit_msg)->mutable_result();
+    *result->add_ops() = std::move(*state_->error());
+    return Status::OK();
+  }
+
   state_->tablet_replica()->log()
     ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema()),
                                                  state_->schema_version());
@@ -122,9 +142,6 @@ Status AlterSchemaTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
   // Altered tablets should be included in the next tserver heartbeat so that
   // clients waiting on IsAlterTableDone() are unblocked promptly.
   state_->tablet_replica()->MarkTabletDirty("Alter schema finished");
-
-  commit_msg->reset(new CommitMsg());
-  (*commit_msg)->set_op_type(ALTER_SCHEMA_OP);
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.h b/src/kudu/tablet/transactions/alter_schema_transaction.h
index ae17933..b33d1a8 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.h
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.h
@@ -15,19 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_TABLET_ALTER_SCHEMA_TRANSACTION_H_
-#define KUDU_TABLET_ALTER_SCHEMA_TRANSACTION_H_
+#pragma once
 
-#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <mutex>
 #include <string>
 
+#include <boost/optional/optional.hpp>
+
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
+#include "kudu/tablet/tablet.pb.h"
 #include "kudu/tablet/transactions/transaction.h"
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/status.h"
@@ -52,13 +52,13 @@ class AlterSchemaTransactionState : public TransactionState {
                               const tserver::AlterSchemaRequestPB* request,
                               tserver::AlterSchemaResponsePB* response)
       : TransactionState(tablet_replica),
-        schema_(NULL),
+        schema_(nullptr),
         request_(request),
         response_(response) {
   }
 
-  const tserver::AlterSchemaRequestPB* request() const OVERRIDE { return request_; }
-  tserver::AlterSchemaResponsePB* response() const OVERRIDE { return response_; }
+  const tserver::AlterSchemaRequestPB* request() const override { return request_; }
+  tserver::AlterSchemaResponsePB* response() const override { return response_; }
 
   void set_schema(const Schema* schema) { schema_ = schema; }
   const Schema* schema() const { return schema_; }
@@ -81,15 +81,22 @@ class AlterSchemaTransactionState : public TransactionState {
   // Crashes if the lock was not already acquired.
   void ReleaseSchemaLock();
 
-  // Note: request_ and response_ are set to NULL after this method returns.
+  // Note: request_ and response_ are set to null after this method returns.
   void Finish() {
-    // Make the request NULL since after this transaction commits
+    // Make the request null since after this transaction commits
     // the request may be deleted at any moment.
-    request_ = NULL;
-    response_ = NULL;
+    request_ = nullptr;
+    response_ = nullptr;
+  }
+
+  // Sets the fact that the alter had an error.
+  void SetError(const Status& s);
+
+  boost::optional<OperationResultPB> error() const {
+    return error_;
   }
 
-  virtual std::string ToString() const OVERRIDE;
+  std::string ToString() const override;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AlterSchemaTransactionState);
@@ -103,35 +110,36 @@ class AlterSchemaTransactionState : public TransactionState {
 
   // The lock held on the tablet's schema_lock_.
   std::unique_lock<rw_semaphore> schema_lock_;
+
+  // The error result of this alter schema transaction. May be empty if the
+  // transaction hasn't been applied or if the alter succeeded.
+  boost::optional<OperationResultPB> error_;
 };
 
 // Executes the alter schema transaction,.
 class AlterSchemaTransaction : public Transaction {
  public:
-  AlterSchemaTransaction(std::unique_ptr<AlterSchemaTransactionState> tx_state,
+  AlterSchemaTransaction(std::unique_ptr<AlterSchemaTransactionState> state,
                          consensus::DriverType type);
 
-  virtual AlterSchemaTransactionState* state() OVERRIDE { return state_.get(); }
-  virtual const AlterSchemaTransactionState* state() const OVERRIDE { return state_.get(); }
+  AlterSchemaTransactionState* state() override { return state_.get(); }
+  const AlterSchemaTransactionState* state() const override { return state_.get(); }
 
-  void NewReplicateMsg(gscoped_ptr<consensus::ReplicateMsg>* replicate_msg) OVERRIDE;
+  void NewReplicateMsg(gscoped_ptr<consensus::ReplicateMsg>* replicate_msg) override;
 
   // Executes a Prepare for the alter schema transaction.
-  //
-  // TODO: need a schema lock?
-
-  virtual Status Prepare() OVERRIDE;
+  Status Prepare() override;
 
   // Starts the AlterSchemaTransaction by assigning it a timestamp.
-  virtual Status Start() OVERRIDE;
+  Status Start() override;
 
   // Executes an Apply for the alter schema transaction
-  virtual Status Apply(gscoped_ptr<consensus::CommitMsg>* commit_msg) OVERRIDE;
+  Status Apply(gscoped_ptr<consensus::CommitMsg>* commit_msg) override;
 
   // Actually commits the transaction.
-  virtual void Finish(TransactionResult result) OVERRIDE;
+  void Finish(TransactionResult result) override;
 
-  virtual std::string ToString() const OVERRIDE;
+  std::string ToString() const override;
 
  private:
   std::unique_ptr<AlterSchemaTransactionState> state_;
@@ -141,4 +149,3 @@ class AlterSchemaTransaction : public Transaction {
 }  // namespace tablet
 }  // namespace kudu
 
-#endif /* KUDU_TABLET_ALTER_SCHEMA_TRANSACTION_H_ */


[kudu] 01/03: Add GetTableLocations tracing

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c557576195f6283718fa8baf1279ee1e2ad52b74
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Mon Feb 25 14:52:26 2019 -0800

    Add GetTableLocations tracing
    
    This adds minimal tracing to GetTableLocations. It's useful because
    adding a trace event for the call makes it easier to count the calls in
    bulk in a trace, and having the requestor string traced makes it
    possible to attribute GetTableLocations calls to specific clients in
    many cases, given that the trace will also include the hostport of the
    client.
    
    Change-Id: I92e0dd18de52596f2b9a419c1b6cf7a5666d24bc
    Reviewed-on: http://gerrit.cloudera.org:8080/12585
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/master/catalog_manager.cc | 2 +-
 src/kudu/master/master_service.cc  | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 3e36cab..63747f4 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -4592,7 +4592,7 @@ Status CatalogManager::ReplaceTablet(const string& tablet_id, ReplaceTabletRespo
 
 Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
                                          GetTableLocationsResponsePB* resp) {
-  // If start-key is > end-key report an error instead of swap the two
+  // If start-key is > end-key report an error instead of swapping the two
   // since probably there is something wrong app-side.
   if (req->has_partition_key_start() && req->has_partition_key_end()
       && req->partition_key_start() > req->partition_key_end()) {
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 375b7d0..7daefae 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -47,6 +47,7 @@
 #include "kudu/security/token_signer.h"
 #include "kudu/security/token_verifier.h"
 #include "kudu/server/server_base.h"
+#include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
@@ -399,6 +400,9 @@ void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
 void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
                                           GetTableLocationsResponsePB* resp,
                                           rpc::RpcContext* rpc) {
+  TRACE_EVENT1("master", "GetTableLocations",
+               "requestor", rpc->requestor_string());
+
   CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
   if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;


[kudu] 03/03: KUDU-1868: Part 1: Add timer-based RPC timeouts

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 877b9f64d8b93353647e7dab2e66b4783322f7ad
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Fri Jan 25 14:09:54 2019 -0800

    KUDU-1868: Part 1: Add timer-based RPC timeouts
    
    Currently, the Java client requires some kind of event to detect the
    timeout of an RPC: either a response from the server in the chain of
    sub-RPCs or a socket read timeout on the connection. This patch adds a
    timer task to actively time out an RPC once it passes its deadline.
    
    Part 2 will eliminate socket read timeouts from the Java client, except
    possibly in the case of negotiation, which will fully resolve KUDU-1868.
    
    There is one test included, which checks that timeouts occur without an
    "outside stimulus" like a response from the server.
    
    This patch should not degrade the performance of the client. Even though
    every timer task holds a reference to its RPC, when the RPC completes it
    cancels the timer task, which will make the timer release it at the next
    tick. This means the RPC and its task should be available to be GC'd
    after the next tick of the timer.
    
    Change-Id: I8d823b63ac0a41cc5e42b63a7c19e0ef777e1dea
    Reviewed-on: http://gerrit.cloudera.org:8080/12338
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../org/apache/kudu/client/AlterTableRequest.java  |  12 +-
 .../org/apache/kudu/client/AlterTableResponse.java |   6 +-
 .../org/apache/kudu/client/AsyncKuduClient.java    | 169 ++++++++++++++-------
 .../org/apache/kudu/client/AsyncKuduScanner.java   |   9 +-
 .../org/apache/kudu/client/AsyncKuduSession.java   |  21 ++-
 .../main/java/org/apache/kudu/client/Batch.java    |  28 +++-
 .../java/org/apache/kudu/client/BatchResponse.java |  14 +-
 .../org/apache/kudu/client/ConnectToCluster.java   |  20 +--
 .../apache/kudu/client/ConnectToMasterRequest.java |   7 +-
 .../org/apache/kudu/client/CreateTableRequest.java |  11 +-
 .../apache/kudu/client/CreateTableResponse.java    |   6 +-
 .../org/apache/kudu/client/DeleteTableRequest.java |   8 +-
 .../apache/kudu/client/DeleteTableResponse.java    |   6 +-
 .../kudu/client/GetTableLocationsRequest.java      |  13 +-
 .../apache/kudu/client/GetTableSchemaRequest.java  |   9 +-
 .../apache/kudu/client/GetTableSchemaResponse.java |   6 +-
 .../kudu/client/IsAlterTableDoneRequest.java       |  11 +-
 .../kudu/client/IsCreateTableDoneRequest.java      |  12 +-
 .../main/java/org/apache/kudu/client/KuduRpc.java  |  41 +++--
 .../org/apache/kudu/client/ListTablesRequest.java  |  11 +-
 .../org/apache/kudu/client/ListTablesResponse.java |   4 +-
 .../kudu/client/ListTabletServersRequest.java      |  14 +-
 .../kudu/client/ListTabletServersResponse.java     |   6 +-
 .../org/apache/kudu/client/ListTabletsRequest.java |   8 +-
 .../apache/kudu/client/ListTabletsResponse.java    |   4 +-
 .../java/org/apache/kudu/client/Operation.java     |  27 +++-
 .../org/apache/kudu/client/OperationResponse.java  |  14 +-
 .../java/org/apache/kudu/client/PingRequest.java   |  12 +-
 .../org/apache/kudu/client/RowResultIterator.java  |  19 ++-
 .../main/java/org/apache/kudu/client/RpcProxy.java |   6 +-
 .../apache/kudu/client/TestAsyncKuduSession.java   |   2 +-
 .../apache/kudu/client/TestConnectionCache.java    |   2 +-
 .../java/org/apache/kudu/client/TestTimeouts.java  |  66 +++++++-
 33 files changed, 421 insertions(+), 183 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
index b6d8cf7..9e2dfd0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
@@ -43,8 +44,12 @@ class AlterTableRequest extends KuduRpc<AlterTableResponse> {
   private final AlterTableRequestPB.Builder builder;
   private final List<Integer> requiredFeatures;
 
-  AlterTableRequest(KuduTable masterTable, String name, AlterTableOptions ato) {
-    super(masterTable);
+  AlterTableRequest(KuduTable masterTable,
+                    String name,
+                    AlterTableOptions ato,
+                    Timer timer,
+                    long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.name = name;
     this.builder = ato.getProtobuf();
     this.requiredFeatures = ato.hasAddDropRangePartitions() ?
@@ -75,7 +80,8 @@ class AlterTableRequest extends KuduRpc<AlterTableResponse> {
     final AlterTableResponsePB.Builder respBuilder = AlterTableResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
     AlterTableResponse response = new AlterTableResponse(
-        deadlineTracker.getElapsedMillis(), tsUUID,
+        deadlineTracker.getElapsedMillis(),
+        tsUUID,
         respBuilder.hasTableId() ? respBuilder.getTableId().toStringUtf8() : null);
 
     return new Pair<AlterTableResponse, Object>(
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java
index 62a3b74..bc4e9cb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableResponse.java
@@ -27,10 +27,10 @@ public class AlterTableResponse extends KuduRpcResponse {
   private String tableId;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    */
-  AlterTableResponse(long ellapsedMillis, String tsUUID, String tableId) {
-    super(ellapsedMillis, tsUUID);
+  AlterTableResponse(long elapsedMillis, String tsUUID, String tableId) {
+    super(elapsedMillis, tsUUID);
     this.tableId = tableId;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 494cb4d..ae2482f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -63,6 +63,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioWorkerPool;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.Timer;
 import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -523,6 +524,16 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Returns the {@link Timer} instance held by this client. This timer should
+   * be used everywhere for scheduling tasks after a delay, e.g., for
+   * timeouts.
+   * @return the time instance held by this client
+   */
+  Timer getTimer() {
+    return timer;
+  }
+
+  /**
    * Returns a synchronous {@link KuduClient} which wraps this asynchronous client.
    * Calling {@link KuduClient#close} on the returned client will close this client.
    * If this asynchronous client should outlive the returned synchronous client,
@@ -558,9 +569,12 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     // Send the CreateTable RPC.
-    final CreateTableRequest create = new CreateTableRequest(
-        this.masterTable, name, schema, builder);
-    create.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    final CreateTableRequest create = new CreateTableRequest(this.masterTable,
+                                                             name,
+                                                             schema,
+                                                             builder,
+                                                             timer,
+                                                             defaultAdminOperationTimeoutMs);
     Deferred<CreateTableResponse> createTableD = sendRpcToTablet(create);
 
     // Add a callback that converts the response into a KuduTable.
@@ -607,9 +621,10 @@ public class AsyncKuduClient implements AutoCloseable {
       @Nonnull TableIdentifierPB.Builder table,
       @Nullable KuduRpc<?> parent) {
     checkIsClosed();
-    IsCreateTableDoneRequest request = new IsCreateTableDoneRequest(
-        this.masterTable, table);
-    request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    IsCreateTableDoneRequest request = new IsCreateTableDoneRequest(this.masterTable,
+                                                                    table,
+                                                                    timer,
+                                                                    defaultAdminOperationTimeoutMs);
     if (parent != null) {
       request.setParentRpc(parent);
     }
@@ -623,8 +638,10 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   public Deferred<DeleteTableResponse> deleteTable(String name) {
     checkIsClosed();
-    DeleteTableRequest delete = new DeleteTableRequest(this.masterTable, name);
-    delete.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    DeleteTableRequest delete = new DeleteTableRequest(this.masterTable,
+                                                       name,
+                                                       timer,
+                                                       defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(delete);
   }
 
@@ -637,8 +654,11 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   public Deferred<AlterTableResponse> alterTable(String name, AlterTableOptions ato) {
     checkIsClosed();
-    final AlterTableRequest alter = new AlterTableRequest(this.masterTable, name, ato);
-    alter.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    final AlterTableRequest alter = new AlterTableRequest(this.masterTable,
+                                                          name,
+                                                          ato,
+                                                          timer,
+                                                          defaultAdminOperationTimeoutMs);
     Deferred<AlterTableResponse> responseD = sendRpcToTablet(alter);
 
     if (ato.hasAddDropRangePartitions()) {
@@ -705,12 +725,11 @@ public class AsyncKuduClient implements AutoCloseable {
       @Nonnull TableIdentifierPB.Builder table,
       @Nullable KuduRpc<?> parent) {
     checkIsClosed();
-    IsAlterTableDoneRequest request = new IsAlterTableDoneRequest(
-        this.masterTable, table);
-    request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    if (parent != null) {
-      request.setParentRpc(parent);
-    }
+    IsAlterTableDoneRequest request = new IsAlterTableDoneRequest(this.masterTable,
+                                                                  table,
+                                                                  timer,
+                                                                  defaultAdminOperationTimeoutMs);
+    request.setParentRpc(parent);
     return sendRpcToTablet(request);
   }
 
@@ -720,8 +739,9 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   public Deferred<ListTabletServersResponse> listTabletServers() {
     checkIsClosed();
-    ListTabletServersRequest rpc = new ListTabletServersRequest(this.masterTable);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    ListTabletServersRequest rpc = new ListTabletServersRequest(this.masterTable,
+                                                                timer,
+                                                                defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(rpc);
   }
 
@@ -740,13 +760,13 @@ public class AsyncKuduClient implements AutoCloseable {
     Preconditions.checkNotNull(tableName);
 
     // Prefer a lookup by table ID over name, since the former is immutable.
-    GetTableSchemaRequest rpc = new GetTableSchemaRequest(
-        this.masterTable, tableId, tableId != null ? null : tableName);
+    GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable,
+                                                          tableId,
+                                                          tableId != null ? null : tableName,
+                                                          timer,
+                                                          defaultAdminOperationTimeoutMs);
 
-    if (parent != null) {
-      rpc.setParentRpc(parent);
-    }
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    rpc.setParentRpc(parent);
     return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() {
       @Override
       public KuduTable call(GetTableSchemaResponse resp) throws Exception {
@@ -784,8 +804,10 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return a deferred that yields the list of table names
    */
   public Deferred<ListTablesResponse> getTablesList(String nameFilter) {
-    ListTablesRequest rpc = new ListTablesRequest(this.masterTable, nameFilter);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
+    ListTablesRequest rpc = new ListTablesRequest(this.masterTable,
+                                                  nameFilter,
+                                                  timer,
+                                                  defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(rpc);
   }
 
@@ -970,7 +992,7 @@ public class AsyncKuduClient implements AutoCloseable {
               RpcTraceFrame.Action.SLEEP_THEN_RETRY)
           .callStatus(ex.getStatus())
           .build());
-      newTimeout(retryTask, sleepTime);
+      newTimeout(timer, retryTask, sleepTime);
       return null;
 
       // fakeRpc.Deferred was not invoked; the user continues to wait until
@@ -1347,12 +1369,14 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param method fake RPC method (shows up in RPC traces)
    * @param parent parent RPC (for tracing), if any
    * @param <R> the expected return type of the fake RPC
+   * @param timeoutMs the timeout in milliseconds for the fake RPC
    * @return created fake RPC
    */
   private <R> KuduRpc<R> buildFakeRpc(
       @Nonnull final String method,
-      @Nullable final KuduRpc<?> parent) {
-    KuduRpc<R> rpc = new KuduRpc<R>(null) {
+      @Nullable final KuduRpc<?> parent,
+      long timeoutMs) {
+    KuduRpc<R> rpc = new KuduRpc<R>(null, timer, timeoutMs) {
       @Override
       Message createRequestPB() {
         return null;
@@ -1374,14 +1398,25 @@ public class AsyncKuduClient implements AutoCloseable {
         return null;
       }
     };
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    if (parent != null) {
-      rpc.setParentRpc(parent);
-    }
+    rpc.setParentRpc(parent);
     return rpc;
   }
 
   /**
+   * Creates an RPC that will never be sent, and will instead be used
+   * exclusively for timeouts.
+   * @param method fake RPC method (shows up in RPC traces)
+   * @param parent parent RPC (for tracing), if any
+   * @param <R> the expected return type of the fake RPC
+   * @return created fake RPC
+   */
+  private <R> KuduRpc<R> buildFakeRpc(
+      @Nonnull final String method,
+      @Nullable final KuduRpc<?> parent) {
+    return buildFakeRpc(method, parent, defaultAdminOperationTimeoutMs);
+  }
+
+  /**
    * Schedules a IsAlterTableDone RPC. When the response comes in, if the table
    * is done altering, the RPC's callback chain is triggered with 'resp' as its
    * value. If not, another IsAlterTableDone RPC is scheduled and the cycle
@@ -1550,7 +1585,7 @@ public class AsyncKuduClient implements AutoCloseable {
       tooManyAttemptsOrTimeout(rpc, null);
       return;
     }
-    newTimeout(new RetryTimer(), sleepTimeMillis);
+    newTimeout(timer, new RetryTimer(), sleepTimeMillis);
   }
 
   /**
@@ -1580,7 +1615,7 @@ public class AsyncKuduClient implements AutoCloseable {
       tooManyAttemptsOrTimeout(rpc, null);
       return;
     }
-    newTimeout(new RetryTimer(), sleepTimeMillis);
+    newTimeout(timer, new RetryTimer(), sleepTimeMillis);
   }
 
   private final class ReleaseMasterLookupPermit<T> implements Callback<T, T> {
@@ -1688,15 +1723,18 @@ public class AsyncKuduClient implements AutoCloseable {
     if (isMasterTable(tableId)) {
       d = getMasterTableLocationsPB(parentRpc);
     } else {
+      long timeoutMillis = parentRpc == null ? defaultAdminOperationTimeoutMs :
+                                               parentRpc.deadlineTracker.getMillisBeforeDeadline();
       // Leave the end of the partition key range empty in order to pre-fetch tablet locations.
       GetTableLocationsRequest rpc =
-          new GetTableLocationsRequest(masterTable, partitionKey, null, tableId, fetchBatchSize);
-      if (parentRpc != null) {
-        rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
-        rpc.setParentRpc(parentRpc);
-      } else {
-        rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-      }
+          new GetTableLocationsRequest(masterTable,
+                                       partitionKey,
+                                       null,
+                                       tableId,
+                                       fetchBatchSize,
+                                       timer,
+                                       timeoutMillis);
+      rpc.setParentRpc(parentRpc);
       d = sendRpcToTablet(rpc);
     }
     d.addCallback(new MasterLookupCB(table, partitionKey, fetchBatchSize));
@@ -1823,15 +1861,20 @@ public class AsyncKuduClient implements AutoCloseable {
       final byte[] lookupKey = partitionKey;
 
       // Build a fake RPC to encapsulate and propagate the timeout. There's no actual "RPC" to send.
-      KuduRpc fakeRpc = buildFakeRpc("loopLocateTable", null);
-      fakeRpc.setTimeoutMillis(deadlineTracker.getMillisBeforeDeadline());
+      KuduRpc fakeRpc = buildFakeRpc("loopLocateTable",
+                                     null,
+                                     deadlineTracker.getMillisBeforeDeadline());
 
       return locateTablet(table, key, fetchBatchSize, fakeRpc).addCallbackDeferring(
           new Callback<Deferred<List<LocatedTablet>>, GetTableLocationsResponsePB>() {
             @Override
             public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB resp) {
-              return loopLocateTable(table, lookupKey, endPartitionKey, fetchBatchSize,
-                                     ret, deadlineTracker);
+              return loopLocateTable(table,
+                                     lookupKey,
+                                     endPartitionKey,
+                                     fetchBatchSize,
+                                     ret,
+                                     deadlineTracker);
             }
 
             @Override
@@ -1865,8 +1908,12 @@ public class AsyncKuduClient implements AutoCloseable {
     final List<LocatedTablet> ret = Lists.newArrayList();
     final DeadlineTracker deadlineTracker = new DeadlineTracker();
     deadlineTracker.setDeadline(deadline);
-    return loopLocateTable(table, startPartitionKey, endPartitionKey, fetchBatchSize,
-                           ret, deadlineTracker);
+    return loopLocateTable(table,
+                           startPartitionKey,
+                           endPartitionKey,
+                           fetchBatchSize,
+                           ret,
+                           deadlineTracker);
   }
 
   /**
@@ -1952,11 +1999,12 @@ public class AsyncKuduClient implements AutoCloseable {
             .build());
 
     long sleepTime = getSleepTimeForRpcMillis(rpc);
-    if (cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) {
+    if (cannotRetryRequest(rpc) ||
+        rpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) {
       // Don't let it retry.
       return tooManyAttemptsOrTimeout(rpc, ex);
     }
-    newTimeout(new RetryTimer(), sleepTime);
+    newTimeout(timer, new RetryTimer(), sleepTime);
     return rpc.getDeferred();
   }
 
@@ -2345,16 +2393,29 @@ public class AsyncKuduClient implements AutoCloseable {
     return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
   }
 
-  void newTimeout(final TimerTask task, final long timeoutMs) {
+  /**
+   * Utility function to register a timeout task 'task' on timer 'timer' that
+   * will fire after 'timeoutMillis' milliseconds. Returns a handle to the
+   * scheduled timeout, which can be used to cancel the task and release its
+   * resources.
+   * @param timer the timer on which the task is scheduled
+   * @param task the task that will be run when the timeout hits
+   * @param timeoutMillis the timeout, in milliseconds
+   * @return a handle to the scheduled timeout
+   */
+  static Timeout newTimeout(final Timer timer,
+                            final TimerTask task,
+                            final long timeoutMillis) {
+    Preconditions.checkNotNull(timer);
     try {
-      timer.newTimeout(task, timeoutMs, MILLISECONDS);
+      return timer.newTimeout(task, timeoutMillis, MILLISECONDS);
     } catch (IllegalStateException e) {
       // This can happen if the timer fires just before shutdown()
       // is called from another thread, and due to how threads get
       // scheduled we tried to call newTimeout() after timer.stop().
-      LOG.warn("Failed to schedule timer." +
-          " Ignore this if we're shutting down.", e);
+      LOG.warn("Failed to schedule timer. Ignore this if we're shutting down.", e);
     }
+    return null;
   }
 
   /**
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 36bb0af..d31c0a2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -835,9 +835,8 @@ public final class AsyncKuduScanner {
   final class KeepAliveRequest extends KuduRpc<Void> {
 
     KeepAliveRequest(KuduTable table, RemoteTablet tablet) {
-      super(table);
+      super(table, client.getTimer(), scanRequestTimeout);
       setTablet(tablet);
-      this.setTimeoutMillis(scanRequestTimeout);
     }
 
     @Override
@@ -882,10 +881,9 @@ public final class AsyncKuduScanner {
     State state;
 
     ScanRequest(KuduTable table, State state, RemoteTablet tablet) {
-      super(table);
+      super(table, client.getTimer(), scanRequestTimeout);
       setTablet(tablet);
       this.state = state;
-      this.setTimeoutMillis(scanRequestTimeout);
     }
 
     @Override
@@ -1017,8 +1015,7 @@ public final class AsyncKuduScanner {
         }
       }
       RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
-          deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
-          callResponse);
+          deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(), callResponse);
 
       boolean hasMore = resp.getHasMoreResults();
       if (id.length != 0 && scannerId != null && !Bytes.equals(scannerId, id)) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index d815fdb..0e174f7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -126,7 +126,7 @@ public class AsyncKuduSession implements SessionConfiguration {
   private int mutationBufferLowWatermark;
   private FlushMode flushMode;
   private ExternalConsistencyMode consistencyMode;
-  private long timeoutMs;
+  private long timeoutMillis;
 
   /**
    * Protects internal state from concurrent access. {@code AsyncKuduSession} is not threadsafe
@@ -188,7 +188,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     this.client = client;
     flushMode = FlushMode.AUTO_FLUSH_SYNC;
     consistencyMode = CLIENT_PROPAGATED;
-    timeoutMs = client.getDefaultOperationTimeoutMs();
+    timeoutMillis = client.getDefaultOperationTimeoutMs();
     inactiveBuffers.add(bufferA);
     inactiveBuffers.add(bufferB);
     errorCollector = new ErrorCollector(mutationBufferSpace);
@@ -257,12 +257,12 @@ public class AsyncKuduSession implements SessionConfiguration {
 
   @Override
   public void setTimeoutMillis(long timeout) {
-    this.timeoutMs = timeout;
+    this.timeoutMillis = timeout;
   }
 
   @Override
   public long getTimeoutMillis() {
-    return this.timeoutMs;
+    return this.timeoutMillis;
   }
 
   @Override
@@ -390,9 +390,8 @@ public class AsyncKuduSession implements SessionConfiguration {
       }
 
       for (Batch batch : batches.values()) {
-        if (timeoutMs != 0) {
-          batch.deadlineTracker.reset();
-          batch.setTimeoutMillis(timeoutMs);
+        if (timeoutMillis != 0) {
+          batch.resetTimeoutMillis(client.getTimer(), timeoutMillis);
         }
         addBatchCallbacks(batch);
         batchResponses.add(client.sendRpcToTablet(batch));
@@ -544,8 +543,8 @@ public class AsyncKuduSession implements SessionConfiguration {
 
     // If immediate flush mode, send the operation directly.
     if (flushMode == FlushMode.AUTO_FLUSH_SYNC) {
-      if (timeoutMs != 0) {
-        operation.setTimeoutMillis(timeoutMs);
+      if (timeoutMillis != 0) {
+        operation.resetTimeoutMillis(client.getTimer(), timeoutMillis);
       }
       operation.setExternalConsistencyMode(this.consistencyMode);
       operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
@@ -568,7 +567,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     Deferred<LocatedTablet> tablet = client.getTabletLocation(operation.getTable(),
                                                               operation.partitionKey(),
                                                               LookupType.POINT,
-                                                              timeoutMs);
+                                                              timeoutMillis);
 
     // Holds a buffer that should be flushed outside the synchronized block, if necessary.
     Buffer fullBuffer = null;
@@ -646,7 +645,7 @@ public class AsyncKuduSession implements SessionConfiguration {
             activeBufferSize = 0;
           } else if (activeBufferSize == 0) {
             // If this is the first operation in the buffer, start a background flush timer.
-            client.newTimeout(activeBuffer.getFlusherTask(), interval);
+            AsyncKuduClient.newTimeout(client.getTimer(), activeBuffer.getFlusherTask(), interval);
           }
         }
       }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index d2034e2..a89374e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.WireProtocol;
 import org.apache.kudu.client.Statistics.Statistic;
@@ -57,14 +58,30 @@ class Batch extends KuduRpc<BatchResponse> {
   /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
   private final boolean ignoreAllDuplicateRows;
 
-
   Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows) {
-    super(table);
+    super(table, null, 0);
     this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
     this.tablet = tablet;
   }
 
   /**
+   * Reset the timeout of this batch.
+   *
+   * TODO(wdberkeley): The fact we have to do this is a sign an Operation should not subclass
+   * KuduRpc.
+   *
+   * @param timeoutMillis the new timeout of the batch in milliseconds
+   */
+  void resetTimeoutMillis(Timer timer, long timeoutMillis) {
+    deadlineTracker.reset();
+    deadlineTracker.setDeadline(timeoutMillis);
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
+    }
+    timeoutTask = AsyncKuduClient.newTimeout(timer, new RpcTimeoutTask(), timeoutMillis);
+  }
+
+  /**
    * Returns the bytes size of this batch's row operations after serialization.
    * @return size in bytes
    * @throws IllegalStateException thrown if this RPC hasn't been serialized eg sent to a TS
@@ -128,8 +145,11 @@ class Batch extends KuduRpc<BatchResponse> {
       }
     }
 
-    BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID,
-                                               builder.getTimestamp(), errorsPB, operations,
+    BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(),
+                                               tsUUID,
+                                               builder.getTimestamp(),
+                                               errorsPB,
+                                               operations,
                                                operationIndexes);
 
     if (injectedError != null) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
index a426ac4..408c4cd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
@@ -46,9 +46,12 @@ public class BatchResponse extends KuduRpcResponse {
    * @param operations the list of operations which created this response
    * @param indexes the list of operations' order index
    */
-  BatchResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
+  BatchResponse(long elapsedMillis,
+                String tsUUID,
+                long writeTimestamp,
                 List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB,
-                List<Operation> operations, List<Integer> indexes) {
+                List<Operation> operations,
+                List<Integer> indexes) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     individualResponses = new ArrayList<>(operations.size());
@@ -75,8 +78,11 @@ public class BatchResponse extends KuduRpcResponse {
         currentErrorIndex++;
       }
       individualResponses.add(
-          new OperationResponse(currentOperation.deadlineTracker.getElapsedMillis(), tsUUID,
-              writeTimestamp, currentOperation, rowError));
+          new OperationResponse(currentOperation.deadlineTracker.getElapsedMillis(),
+                                tsUUID,
+                                writeTimestamp,
+                                currentOperation,
+                                rowError));
     }
     assert (rowErrors.size() == errorsPB.size());
     assert (individualResponses.size() == operations.size());
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index aecb51f..1ee9767 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -33,6 +33,8 @@ import com.stumbleupon.async.Deferred;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.jboss.netty.util.Timer;
+
 import org.apache.kudu.Common.HostPortPB;
 import org.apache.kudu.consensus.Metadata.RaftPeerPB.Role;
 import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
@@ -93,16 +95,14 @@ final class ConnectToCluster {
       final KuduTable masterTable,
       final RpcProxy masterProxy,
       KuduRpc<?> parentRpc,
+      Timer timer,
       long defaultTimeoutMs) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
     // basically reuse in some way the master permits.
-    final ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable);
-    if (parentRpc != null) {
-      rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
-      rpc.setParentRpc(parentRpc);
-    } else {
-      rpc.setTimeoutMillis(defaultTimeoutMs);
-    }
+    long timeoutMillis = parentRpc == null ? defaultTimeoutMs :
+                                             parentRpc.deadlineTracker.getMillisBeforeDeadline();
+    final ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable, timer, timeoutMillis);
+    rpc.setParentRpc(parentRpc);
     Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
     rpc.attempt++;
     masterProxy.sendRpc(rpc);
@@ -168,10 +168,10 @@ final class ConnectToCluster {
     List<Deferred<ConnectToMasterResponsePB>> deferreds = new ArrayList<>();
     for (HostAndPort hostAndPort : masterAddrs) {
       Deferred<ConnectToMasterResponsePB> d;
-      RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(
-          hostAndPort, credentialsPolicy);
+      AsyncKuduClient client = masterTable.getAsyncClient();
+      RpcProxy proxy = client.newMasterRpcProxy(hostAndPort, credentialsPolicy);
       if (proxy != null) {
-        d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
+        d = connectToMaster(masterTable, proxy, parentRpc, client.getTimer(), defaultTimeoutMs);
       } else {
         String message = "Couldn't resolve this master's address " + hostAndPort.toString();
         LOG.warn(message);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
index cf0a579..f4e2769 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
 import org.apache.kudu.master.Master.MasterFeatures;
@@ -51,8 +52,10 @@ public class ConnectToMasterRequest extends KuduRpc<ConnectToMasterResponsePB> {
    */
   private String method = CONNECT_TO_MASTER;
 
-  public ConnectToMasterRequest(KuduTable masterTable) {
-    super(masterTable);
+  public ConnectToMasterRequest(KuduTable masterTable,
+                                Timer timer,
+                                long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     // TODO(todd): get rid of 'masterTable' hack
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
index d5616b5..f1d5f20 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master;
@@ -40,9 +41,13 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
   private final Master.CreateTableRequestPB.Builder builder;
   private final List<Integer> featureFlags;
 
-  CreateTableRequest(KuduTable masterTable, String name, Schema schema,
-                     CreateTableOptions builder) {
-    super(masterTable);
+  CreateTableRequest(KuduTable masterTable,
+                     String name,
+                     Schema schema,
+                     CreateTableOptions builder,
+                     Timer timer,
+                     long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.schema = schema;
     this.name = name;
     this.builder = builder.getBuilder();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java
index 6f4427f..8cf41bc 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableResponse.java
@@ -24,10 +24,10 @@ public class CreateTableResponse extends KuduRpcResponse {
   private final String tableId;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    */
-  CreateTableResponse(long ellapsedMillis, String tsUUID, String tableId) {
-    super(ellapsedMillis, tsUUID);
+  CreateTableResponse(long elapsedMillis, String tsUUID, String tableId) {
+    super(elapsedMillis, tsUUID);
     this.tableId = tableId;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
index b7e9441..80c207d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -33,8 +34,11 @@ class DeleteTableRequest extends KuduRpc<DeleteTableResponse> {
 
   private final String name;
 
-  DeleteTableRequest(KuduTable table, String name) {
-    super(table);
+  DeleteTableRequest(KuduTable table,
+                     String name,
+                     Timer timer,
+                     long timeoutMillis) {
+    super(table, timer, timeoutMillis);
     this.name = name;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java
index a99d68a..403ef41 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableResponse.java
@@ -25,9 +25,9 @@ import org.apache.yetus.audience.InterfaceStability;
 public class DeleteTableResponse extends KuduRpcResponse {
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    */
-  DeleteTableResponse(long ellapsedMillis, String tsUUID) {
-    super(ellapsedMillis, tsUUID);
+  DeleteTableResponse(long elapsedMillis, String tsUUID) {
+    super(elapsedMillis, tsUUID);
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
index 796c3db..a7fe825 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -21,6 +21,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -36,10 +37,14 @@ class GetTableLocationsRequest extends KuduRpc<Master.GetTableLocationsResponseP
   private final String tableId;
   private final int maxReturnedLocations;
 
-  GetTableLocationsRequest(KuduTable table, byte[] startPartitionKey,
-                           byte[] endPartitionKey, String tableId,
-                           int maxReturnedLocations) {
-    super(table);
+  GetTableLocationsRequest(KuduTable table,
+                           byte[] startPartitionKey,
+                           byte[] endPartitionKey,
+                           String tableId,
+                           int maxReturnedLocations,
+                           Timer timer,
+                           long timeoutMillis) {
+    super(table, timer, timeoutMillis);
     if (startPartitionKey != null && endPartitionKey != null &&
         Bytes.memcmp(startPartitionKey, endPartitionKey) > 0) {
       throw new IllegalArgumentException(
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
index ec4afef..93671bf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master.TableIdentifierPB.Builder;
@@ -39,8 +40,12 @@ public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> {
   private final String name;
 
 
-  GetTableSchemaRequest(KuduTable masterTable, String id, String name) {
-    super(masterTable);
+  GetTableSchemaRequest(KuduTable masterTable,
+                        String id,
+                        String name,
+                        Timer timer,
+                        long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     Preconditions.checkArgument(id != null ^ name != null,
         "Only one of table ID or the table name should be provided");
     this.id = id;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
index a426768..a3d10ab 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
@@ -30,20 +30,20 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
   private final int numReplicas;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now
+   * @param elapsedMillis Time in milliseconds since RPC creation to now
    * @param tsUUID the UUID of the tablet server that sent the response
    * @param schema the table's schema
    * @param tableId the UUID of the table in the response
    * @param numReplicas the table's replication factor
    * @param partitionSchema the table's partition schema
    */
-  GetTableSchemaResponse(long ellapsedMillis,
+  GetTableSchemaResponse(long elapsedMillis,
                          String tsUUID,
                          Schema schema,
                          String tableId,
                          int numReplicas,
                          PartitionSchema partitionSchema) {
-    super(ellapsedMillis, tsUUID);
+    super(elapsedMillis, tsUUID);
     this.schema = schema;
     this.partitionSchema = partitionSchema;
     this.tableId = tableId;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
index 73c4972..2866faf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
@@ -23,6 +23,7 @@ import static org.apache.kudu.master.Master.TableIdentifierPB;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
@@ -33,8 +34,11 @@ import org.apache.kudu.util.Pair;
 class IsAlterTableDoneRequest extends KuduRpc<IsAlterTableDoneResponse> {
   private final TableIdentifierPB.Builder tableId;
 
-  IsAlterTableDoneRequest(KuduTable masterTable, TableIdentifierPB.Builder tableId) {
-    super(masterTable);
+  IsAlterTableDoneRequest(KuduTable masterTable,
+                          TableIdentifierPB.Builder tableId,
+                          Timer timer,
+                          long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.tableId = tableId;
   }
 
@@ -62,7 +66,8 @@ class IsAlterTableDoneRequest extends KuduRpc<IsAlterTableDoneResponse> {
     final IsAlterTableDoneResponsePB.Builder respBuilder = IsAlterTableDoneResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
     IsAlterTableDoneResponse resp = new IsAlterTableDoneResponse(deadlineTracker.getElapsedMillis(),
-        tsUUID, respBuilder.getDone());
+                                                                 tsUUID,
+                                                                 respBuilder.getDone());
     return new Pair<IsAlterTableDoneResponse, Object>(
         resp, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
index c08360b..2fd0290 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
@@ -19,6 +19,8 @@ package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
+
 import org.apache.kudu.master.Master.IsCreateTableDoneRequestPB;
 import org.apache.kudu.master.Master.IsCreateTableDoneResponsePB;
 import org.apache.kudu.master.Master.TableIdentifierPB;
@@ -31,8 +33,11 @@ import org.apache.kudu.util.Pair;
 class IsCreateTableDoneRequest extends KuduRpc<IsCreateTableDoneResponse> {
   private final TableIdentifierPB.Builder tableId;
 
-  IsCreateTableDoneRequest(KuduTable masterTable, TableIdentifierPB.Builder tableId) {
-    super(masterTable);
+  IsCreateTableDoneRequest(KuduTable masterTable,
+                           TableIdentifierPB.Builder tableId,
+                           Timer timer,
+                           long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.tableId = tableId;
   }
 
@@ -54,7 +59,8 @@ class IsCreateTableDoneRequest extends KuduRpc<IsCreateTableDoneResponse> {
     readProtobuf(callResponse.getPBMessage(), builder);
     IsCreateTableDoneResponse resp =
         new IsCreateTableDoneResponse(deadlineTracker.getElapsedMillis(),
-        tsUUID, builder.getDone());
+                                      tsUUID,
+                                      builder.getDone());
     return new Pair<IsCreateTableDoneResponse, Object>(
         resp, builder.hasError() ? builder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 8fc011a..3d212ee 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -43,6 +43,9 @@ import com.stumbleupon.async.Deferred;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.Timer;
+import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,8 +132,12 @@ public abstract class KuduRpc<R> {
 
   final DeadlineTracker deadlineTracker;
 
-  protected long propagatedTimestamp = -1;
-  protected ExternalConsistencyMode externalConsistencyMode = CLIENT_PROPAGATED;
+  // 'timeoutTask' is a handle to the timer task that will time out the RPC. It is
+  // null if and only if the task has no timeout.
+  Timeout timeoutTask;
+
+  long propagatedTimestamp = -1;
+  ExternalConsistencyMode externalConsistencyMode = CLIENT_PROPAGATED;
 
   /**
    * How many times have we retried this RPC?.
@@ -146,9 +153,15 @@ public abstract class KuduRpc<R> {
    */
   private long sequenceId = RequestTracker.NO_SEQ_NO;
 
-  KuduRpc(KuduTable table) {
+  KuduRpc(KuduTable table, Timer timer, long timeoutMillis) {
     this.table = table;
     this.deadlineTracker = new DeadlineTracker();
+    deadlineTracker.setDeadline(timeoutMillis);
+    if (timer != null) {
+      this.timeoutTask = AsyncKuduClient.newTimeout(timer,
+                                                    new RpcTimeoutTask(),
+                                                    timeoutMillis);
+    }
   }
 
   /**
@@ -241,6 +254,9 @@ public abstract class KuduRpc<R> {
       table.getAsyncClient().getRequestTracker().rpcCompleted(sequenceId);
       sequenceId = RequestTracker.NO_SEQ_NO;
     }
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
+    }
     deadlineTracker.reset();
     traces.clear();
     parentRpc = null;
@@ -276,8 +292,8 @@ public abstract class KuduRpc<R> {
    * @param parentRpc RPC that will also receive traces from this RPC
    */
   void setParentRpc(KuduRpc<?> parentRpc) {
-    assert (this.parentRpc == null);
-    assert (this.parentRpc != this);
+    assert(this.parentRpc == null);
+    assert(this != parentRpc);
     this.parentRpc = parentRpc;
   }
 
@@ -325,10 +341,6 @@ public abstract class KuduRpc<R> {
     return table;
   }
 
-  void setTimeoutMillis(long timeout) {
-    deadlineTracker.setDeadline(timeout);
-  }
-
   /**
    * If this RPC needs to be tracked on the client and server-side. Some RPCs require exactly-once
    * semantics which is enabled by tracking them.
@@ -420,4 +432,15 @@ public abstract class KuduRpc<R> {
     chanBuf.writerIndex(buf.length);
     return chanBuf;
   }
+
+  /**
+   * A netty TimerTask for timing out a KuduRpc.
+   */
+  final class RpcTimeoutTask implements TimerTask {
+    @Override
+    public void run(final Timeout timeout) {
+      Status statusTimedOut = Status.TimedOut("can not complete before timeout: " + KuduRpc.this);
+      KuduRpc.this.errback(new NonRecoverableException(statusTimedOut));
+    }
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
index 8fe19a9..e7416df 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -31,8 +32,11 @@ class ListTablesRequest extends KuduRpc<ListTablesResponse> {
 
   private final String nameFilter;
 
-  ListTablesRequest(KuduTable masterTable, String nameFilter) {
-    super(masterTable);
+  ListTablesRequest(KuduTable masterTable,
+                    String nameFilter,
+                    Timer timer,
+                    long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
     this.nameFilter = nameFilter;
   }
 
@@ -68,7 +72,8 @@ class ListTablesRequest extends KuduRpc<ListTablesResponse> {
       tables.add(info.getName());
     }
     ListTablesResponse response = new ListTablesResponse(deadlineTracker.getElapsedMillis(),
-                                                         tsUUID, tables);
+                                                         tsUUID,
+                                                         tables);
     return new Pair<ListTablesResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
index ca851c4..d7d14e2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
@@ -28,8 +28,8 @@ public class ListTablesResponse extends KuduRpcResponse {
 
   private final List<String> tablesList;
 
-  ListTablesResponse(long ellapsedMillis, String tsUUID, List<String> tablesList) {
-    super(ellapsedMillis, tsUUID);
+  ListTablesResponse(long elapsedMillis, String tsUUID, List<String> tablesList) {
+    super(elapsedMillis, tsUUID);
     this.tablesList = tablesList;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
index c453354..75e62fb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
@@ -25,14 +25,17 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
 @InterfaceAudience.Private
 public class ListTabletServersRequest extends KuduRpc<ListTabletServersResponse> {
 
-  public ListTabletServersRequest(KuduTable masterTable) {
-    super(masterTable);
+  public ListTabletServersRequest(KuduTable masterTable,
+                                  Timer timer,
+                                  long timeoutMillis) {
+    super(masterTable, timer, timeoutMillis);
   }
 
   @Override
@@ -61,8 +64,11 @@ public class ListTabletServersRequest extends KuduRpc<ListTabletServersResponse>
     for (ListTabletServersResponsePB.Entry entry : respBuilder.getServersList()) {
       servers.add(entry.getRegistration().getRpcAddresses(0).getHost());
     }
-    ListTabletServersResponse response = new ListTabletServersResponse(deadlineTracker
-        .getElapsedMillis(), tsUUID, serversCount, servers);
+    ListTabletServersResponse response =
+        new ListTabletServersResponse(deadlineTracker.getElapsedMillis(),
+                                      tsUUID,
+                                      serversCount,
+                                      servers);
     return new Pair<ListTabletServersResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java
index 1937117..fc979e4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersResponse.java
@@ -30,13 +30,13 @@ public class ListTabletServersResponse extends KuduRpcResponse {
   private final List<String> tabletServersList;
 
   /**
-   * @param ellapsedMillis Time in milliseconds since RPC creation to now.
+   * @param elapsedMillis Time in milliseconds since RPC creation to now.
    * @param tabletServersCount How many tablet servers the master is reporting.
    * @param tabletServersList List of tablet servers.
    */
-  ListTabletServersResponse(long ellapsedMillis, String tsUUID,
+  ListTabletServersResponse(long elapsedMillis, String tsUUID,
                             int tabletServersCount, List<String> tabletServersList) {
-    super(ellapsedMillis, tsUUID);
+    super(elapsedMillis, tsUUID);
     this.tabletServersCount = tabletServersCount;
     this.tabletServersList = tabletServersList;
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
index f712f3b..a6d4ff3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.tserver.Tserver;
 import org.apache.kudu.util.Pair;
@@ -29,8 +30,8 @@ import org.apache.kudu.util.Pair;
 @InterfaceAudience.Private
 class ListTabletsRequest extends KuduRpc<ListTabletsResponse> {
 
-  ListTabletsRequest() {
-    super(null);
+  ListTabletsRequest(Timer timer, long timeoutMillis) {
+    super(null, timer, timeoutMillis);
   }
 
   @Override
@@ -61,7 +62,8 @@ class ListTabletsRequest extends KuduRpc<ListTabletsResponse> {
       tablets.add(info.getTabletStatus().getTabletId());
     }
     ListTabletsResponse response = new ListTabletsResponse(deadlineTracker.getElapsedMillis(),
-                                                         tsUUID, tablets);
+                                                           tsUUID,
+                                                           tablets);
     return new Pair<ListTabletsResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java
index 1f19778..4ad6b7d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsResponse.java
@@ -26,8 +26,8 @@ public class ListTabletsResponse extends KuduRpcResponse {
 
   private final List<String> tabletsList;
 
-  ListTabletsResponse(long ellapsedMillis, String tsUUID, List<String> tabletsList) {
-    super(ellapsedMillis, tsUUID);
+  ListTabletsResponse(long elapsedMillis, String tsUUID, List<String> tabletsList) {
+    super(elapsedMillis, tsUUID);
     this.tabletsList = tabletsList;
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index c0a83af..a585c72 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -29,6 +29,7 @@ import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -93,10 +94,27 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
    * @param table table with the schema to use for this operation
    */
   Operation(KuduTable table) {
-    super(table);
+    super(table, null, 0);
     this.row = table.getSchema().newPartialRow();
   }
 
+  /**
+   * Reset the timeout of this batch.
+   *
+   * TODO(wdberkeley): The fact we have to do this is a sign an Operation should not subclass
+   * KuduRpc.
+   *
+   * @param timeoutMillis the new timeout of the batch in milliseconds
+   */
+  void resetTimeoutMillis(Timer timer, long timeoutMillis) {
+    deadlineTracker.reset();
+    deadlineTracker.setDeadline(timeoutMillis);
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
+    }
+    timeoutTask = AsyncKuduClient.newTimeout(timer, new RpcTimeoutTask(), timeoutMillis);
+  }
+
   /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
   void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
     this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
@@ -157,8 +175,11 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
         error = null;
       }
     }
-    OperationResponse response = new OperationResponse(deadlineTracker.getElapsedMillis(), tsUUID,
-                                                       builder.getTimestamp(), this, error);
+    OperationResponse response = new OperationResponse(deadlineTracker.getElapsedMillis(),
+                                                       tsUUID,
+                                                       builder.getTimestamp(),
+                                                       this,
+                                                       error);
     return new Pair<OperationResponse, Object>(
         response, builder.hasError() ? builder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
index 146765f..d4234ac 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
@@ -40,8 +40,11 @@ public class OperationResponse extends KuduRpcResponse {
    * @param operation the operation that created this response
    * @param errorPB a row error in pb format, can be null
    */
-  OperationResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
-                    Operation operation, Tserver.WriteResponsePB.PerRowErrorPB errorPB) {
+  OperationResponse(long elapsedMillis,
+                    String tsUUID,
+                    long writeTimestamp,
+                    Operation operation,
+                    Tserver.WriteResponsePB.PerRowErrorPB errorPB) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     this.rowError = errorPB == null ? null : RowError.fromRowErrorPb(errorPB, operation, tsUUID);
@@ -55,8 +58,11 @@ public class OperationResponse extends KuduRpcResponse {
    * @param operation the operation that created this response
    * @param rowError a parsed row error, can be null
    */
-  OperationResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
-                    Operation operation, RowError rowError) {
+  OperationResponse(long elapsedMillis,
+                    String tsUUID,
+                    long writeTimestamp,
+                    Operation operation,
+                    RowError rowError) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     this.rowError = rowError;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
index 20272c1..d536998 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
 import com.google.protobuf.Message;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
@@ -34,15 +35,15 @@ class PingRequest extends KuduRpc<PingResponse> {
   private final String serviceName;
 
   static PingRequest makeMasterPingRequest() {
-    return new PingRequest(MASTER_SERVICE_NAME);
+    return new PingRequest(MASTER_SERVICE_NAME, null, 0);
   }
 
   static PingRequest makeTabletServerPingRequest() {
-    return new PingRequest(TABLET_SERVER_SERVICE_NAME);
+    return new PingRequest(TABLET_SERVER_SERVICE_NAME, null, 0);
   }
 
-  private PingRequest(String serviceName) {
-    super(null);
+  private PingRequest(String serviceName, Timer timer, long timeoutMillis) {
+    super(null, timer, timeoutMillis);
     this.serviceName = serviceName;
   }
 
@@ -67,8 +68,7 @@ class PingRequest extends KuduRpc<PingResponse> {
     final Master.PingResponsePB.Builder respBuilder =
         Master.PingResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
-    PingResponse response = new PingResponse(deadlineTracker.getElapsedMillis(),
-        tsUUID);
+    PingResponse response = new PingResponse(deadlineTracker.getElapsedMillis(), tsUUID);
     return new Pair<>(response, null);
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
index e9effd9..3ad4155 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
@@ -48,16 +48,20 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
 
   /**
    * Package private constructor, only meant to be instantiated from AsyncKuduScanner.
-   * @param ellapsedMillis time in milliseconds since RPC creation to now
+   * @param elapsedMillis time in milliseconds since RPC creation to now
    * @param tsUUID UUID of the tablet server that handled our request
    * @param schema schema used to parse the rows
    * @param numRows how many rows are contained in the bs slice
    * @param bs normal row data
    * @param indirectBs indirect row data
    */
-  private RowResultIterator(long ellapsedMillis, String tsUUID, Schema schema,
-                            int numRows, Slice bs, Slice indirectBs) {
-    super(ellapsedMillis, tsUUID);
+  private RowResultIterator(long elapsedMillis,
+                            String tsUUID,
+                            Schema schema,
+                            int numRows,
+                            Slice bs,
+                            Slice indirectBs) {
+    super(elapsedMillis, tsUUID);
     this.schema = schema;
     this.bs = bs;
     this.indirectBs = indirectBs;
@@ -66,13 +70,14 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
     this.rowResult = numRows == 0 ? null : new RowResult(this.schema, this.bs, this.indirectBs);
   }
 
-  static RowResultIterator makeRowResultIterator(long ellapsedMillis, String tsUUID,
+  static RowResultIterator makeRowResultIterator(long elapsedMillis,
+                                                 String tsUUID,
                                                  Schema schema,
                                                  WireProtocol.RowwiseRowBlockPB data,
                                                  final CallResponse callResponse)
       throws KuduException {
     if (data == null || data.getNumRows() == 0) {
-      return new RowResultIterator(ellapsedMillis, tsUUID, schema, 0, null, null);
+      return new RowResultIterator(elapsedMillis, tsUUID, schema, 0, null, null);
     }
 
     Slice bs = callResponse.getSidecar(data.getRowsSidecar());
@@ -87,7 +92,7 @@ public class RowResultIterator extends KuduRpcResponse implements Iterator<RowRe
           " bytes of data but expected " + expectedSize + " for " + numRows + " rows");
       throw new NonRecoverableException(statusIllegalState);
     }
-    return new RowResultIterator(ellapsedMillis, tsUUID, schema, numRows, bs, indirectBs);
+    return new RowResultIterator(elapsedMillis, tsUUID, schema, numRows, bs, indirectBs);
   }
 
   /**
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 347e3c8..105ba22 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -120,9 +120,6 @@ class RpcProxy {
               .serverInfo(connection.getServerInfo())
               .build());
 
-      if (!rpc.deadlineTracker.hasDeadline()) {
-        LOG.warn("{} sending RPC with no timeout {}", connection.getLogPrefix(), rpc);
-      }
       connection.enqueueMessage(rpcToMessage(client, rpc),
           new Callback<Void, Connection.CallResponseInfo>() {
             @Override
@@ -164,11 +161,10 @@ class RpcProxy {
                 .setServiceName(rpc.serviceName())
                 .setMethodName(rpc.method()));
     final Message reqPB = rpc.createRequestPB();
-
+    // TODO(wdberkeley): We should enforce that every RPC has a timeout.
     if (rpc.deadlineTracker.hasDeadline()) {
       headerBuilder.setTimeoutMillis((int) rpc.deadlineTracker.getMillisBeforeDeadline());
     }
-
     if (rpc.isRequestTracked()) {
       RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
       final RequestTracker requestTracker = client.getRequestTracker();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index a7c9112..f6c7beb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -153,7 +153,7 @@ public class TestAsyncKuduSession {
       client.deleteTable(TABLE_NAME).join();
       // Wait until tablet is deleted on TS.
       while (true) {
-        ListTabletsRequest req = new ListTabletsRequest();
+        ListTabletsRequest req = new ListTabletsRequest(client.getTimer(), 10000);
         Deferred<ListTabletsResponse> d = req.getDeferred();
         proxy.sendRpc(req);
         ListTabletsResponse resp = d.join();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index ead72b2..64b6ed2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -117,6 +117,6 @@ public class TestConnectionCache {
     PingRequest ping = PingRequest.makeMasterPingRequest();
     Deferred<PingResponse> d = ping.getDeferred();
     proxy.sendRpc(ping);
-    d.join();
+    d.join(10000);
   }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index 20033ca..1d5d2c8 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -19,10 +19,12 @@ package org.apache.kudu.client;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -54,16 +56,18 @@ public class TestTimeouts {
       }
 
       harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
-      KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
 
-      KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
+      // openTable() may time out, nextRows() should time out.
+      try {
+        KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
 
-      OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
-      assertTrue(response.hasRowError());
-      assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+        KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
 
-      KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
-      try {
+        OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
+        assertTrue(response.hasRowError());
+        assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+
+        KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
         lowTimeoutScanner.nextRows();
         fail("Should have timed out");
       } catch (KuduException ex) {
@@ -71,4 +75,52 @@ public class TestTimeouts {
       }
     }
   }
+
+  /**
+   * KUDU-1868: This test checks that, even if there is no event on the channel over which an RPC
+   * was sent (e.g., even if the server hangs and does not respond), RPCs will still time out.
+   */
+  @Test(timeout = 100000)
+  @TabletServerConfig(flags = { "--scanner_inject_latency_on_each_batch_ms=200000" })
+  public void testTimeoutEvenWhenServerHangs() throws Exception {
+    // Set up a table with one row.
+    KuduClient client = harness.getClient();
+    KuduTable table = client.createTable(
+        TABLE_NAME,
+        getBasicSchema(),
+        getBasicCreateTableOptions());
+    assertFalse(client
+        .newSession()
+        .apply(createBasicSchemaInsert(table, 0))
+        .hasRowError());
+
+    // Create a new client with no socket read timeout (0 means do not set a read timeout).
+    try (KuduClient noRecvTimeoutClient =
+             new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+                 .defaultSocketReadTimeoutMs(0)
+                 .build()) {
+      // Propagate the timestamp to be sure we should see the row that was
+      // inserted by another client.
+      noRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
+      KuduTable noRecvTimeoutTable = noRecvTimeoutClient.openTable(TABLE_NAME);
+
+      // Do something besides a scan to cache table and tablet lookup.
+      noRecvTimeoutClient.getTablesList();
+
+      // Scan with a short timeout.
+      KuduScanner scanner = noRecvTimeoutClient
+          .newScannerBuilder(noRecvTimeoutTable)
+          .scanRequestTimeout(1000)
+          .build();
+
+      // The server will not respond for the lifetime of the test, so we expect
+      // the operation to time out.
+      try {
+        scanner.nextRows();
+        fail("should not have completed nextRows");
+      } catch (NonRecoverableException e) {
+        assertTrue(e.getStatus().isTimedOut());
+      }
+    }
+  }
 }