You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/10/06 00:05:57 UTC

[1/8] kudu git commit: consensus: refactor tracking of received OpIds out of ReplicaState

Repository: kudu
Updated Branches:
  refs/heads/master 64f9ab34f -> bce1dd777


consensus: refactor tracking of received OpIds out of ReplicaState

The PeerMessageQueue class was already tracking the last appended OpId, so
tracking it in ReplicaState was redundant and confusing. This removes a
bunch of stuff from ReplicaState and adds just a little bit of new
functionality to PeerMessageQueue:

- TruncateOpsAfter() now takes an index instead of an OpId. The queue
  can already map the index to an OpId by asking the log.
- Added a getter to expose the last OpId in the log back to RaftConsensus
- Changed OpId generation to happen in PeerMessageQueue. This was easy
  because it already knows the previous OpId and the current term.

The 'last_received_cur_leader' tracking was moved into RaftConsensus
itself, since it's just transient state tracking the RPC back-and-forths
between a leader and the follower.

This patch also removes raft_consensus-test, the mock-based testing for
RaftConsensus. I found that maintaining this test was very difficult, in
particular because now we rely on the fact that AppendOperations() is
reflected in GetLastOpIdInLog(). With a mock PeerMessageQueue, this
state update wasn't happening properly, and trying to reproduce that
behavior in the mocks themselves seemed like I was basically
re-implementing the actual production code for the queue. I looked over
the tests in this suite and I believe all of the cases are covered by
various other tests (randomized and otherwise).

I looped raft_consensus-itest 100 times[1], the Churny test case 1000
times[2], and exactly_once_writes-itest 1000 times[3]. Lastly, I was
able to re-enable TestChurnyElections_WithNotificationLatency and loop
it 500 times[4].

[1] http://dist-test.cloudera.org/job?job_id=todd.1474357631.30024
[2] http://dist-test.cloudera.org//job?job_id=todd.1474359004.2328
[3] http://dist-test.cloudera.org//job?job_id=todd.1474358436.31536
[4] http://dist-test.cloudera.org//job?job_id=todd.1474359250.4834

Change-Id: I81614d26328b0fbba37bf279f59717e05a07b816
Reviewed-on: http://gerrit.cloudera.org:8080/4476
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4bcbb4a4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4bcbb4a4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4bcbb4a4

Branch: refs/heads/master
Commit: 4bcbb4a405c037162954cba218c2316e517cdbf3
Parents: 64f9ab3
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Sep 20 00:33:45 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 21:41:26 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt               |   1 -
 src/kudu/consensus/consensus_queue.cc           |  20 +-
 src/kudu/consensus/consensus_queue.h            |  14 +-
 src/kudu/consensus/raft_consensus-test.cc       | 663 -------------------
 src/kudu/consensus/raft_consensus.cc            |  48 +-
 src/kudu/consensus/raft_consensus.h             |   7 +-
 .../consensus/raft_consensus_quorum-test.cc     |   9 +-
 src/kudu/consensus/raft_consensus_state.cc      |  58 +-
 src/kudu/consensus/raft_consensus_state.h       |  35 -
 .../integration-tests/raft_consensus-itest.cc   |   7 +-
 10 files changed, 61 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index 88f0c61..da4cc65 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -136,7 +136,6 @@ ADD_KUDU_TEST(mt-log-test)
 ADD_KUDU_TEST(quorum_util-test)
 ADD_KUDU_TEST(raft_consensus_quorum-test)
 ADD_KUDU_TEST(raft_consensus_state-test)
-ADD_KUDU_TEST(raft_consensus-test)
 
 # Our current version of gmock overrides virtual functions without adding
 # the 'override' keyword which, since our move to c++11, make the compiler

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index b153bdd..6598662 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -291,9 +291,13 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
   return Status::OK();
 }
 
-void PeerMessageQueue::TruncateOpsAfter(const OpId& op) {
+void PeerMessageQueue::TruncateOpsAfter(int64_t index) {
   DFAKE_SCOPED_LOCK(append_fake_lock_); // should not race with append.
-
+  OpId op;
+  CHECK_OK_PREPEND(log_cache_.LookupOpId(index, &op),
+                   Substitute("$0: cannot truncate ops after bad index $1",
+                              LogPrefixUnlocked(),
+                              index));
   {
     std::unique_lock<simple_spinlock> lock(queue_lock_);
     queue_state_.last_appended = op;
@@ -301,6 +305,18 @@ void PeerMessageQueue::TruncateOpsAfter(const OpId& op) {
   log_cache_.TruncateOpsAfter(op.index());
 }
 
+OpId PeerMessageQueue::GetLastOpIdInLog() const {
+  std::unique_lock<simple_spinlock> lock(queue_lock_);
+  return queue_state_.last_appended;
+}
+
+OpId PeerMessageQueue::GetNextOpId() const {
+  std::unique_lock<simple_spinlock> lock(queue_lock_);
+  return MakeOpId(queue_state_.current_term,
+                  queue_state_.last_appended.index() + 1);
+}
+
+
 Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         ConsensusRequestPB* request,
                                         vector<ReplicateRefPtr>* msg_refs,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index abcc089..c5fc98c 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -181,9 +181,17 @@ class PeerMessageQueue {
   virtual Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
                                   const StatusCallback& log_append_callback);
 
-  // Truncate all operations coming after 'op'. Following this, the 'last_appended'
-  // operation is reset to 'op', and the log cache will be truncated accordingly.
-  virtual void TruncateOpsAfter(const OpId& op);
+  // Truncate all operations coming after 'index'. Following this, the 'last_appended'
+  // operation is reset to the OpId with this index, and the log cache will be truncated
+  // accordingly.
+  virtual void TruncateOpsAfter(int64_t index);
+
+  // Return the last OpId in the log.
+  // Note that this can move backwards after a truncation (TruncateOpsAfter).
+  virtual OpId GetLastOpIdInLog() const;
+
+  // Return the next OpId to be appended to the queue in the current term.
+  virtual OpId GetNextOpId() const;
 
   // Assembles a request for a peer, adding entries past 'op_id' up to
   // 'consensus_max_batch_size_bytes'.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc
deleted file mode 100644
index 147d626..0000000
--- a/src/kudu/consensus/raft_consensus-test.cc
+++ /dev/null
@@ -1,663 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-#include <memory>
-
-#include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol-test-util.h"
-#include "kudu/consensus/consensus_peers.h"
-#include "kudu/consensus/consensus-test-util.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/peer_manager.h"
-#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/server/logical_clock.h"
-#include "kudu/util/async_util.h"
-#include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-
-DECLARE_bool(enable_leader_failure_detection);
-
-METRIC_DECLARE_entity(tablet);
-
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-
-namespace kudu {
-namespace consensus {
-
-using log::Log;
-using log::LogOptions;
-using ::testing::_;
-using ::testing::AnyNumber;
-using ::testing::AtLeast;
-using ::testing::InSequence;
-using ::testing::Invoke;
-using ::testing::Mock;
-using ::testing::Return;
-
-const char* kTestTablet = "TestTablet";
-const char* kLocalPeerUuid = "peer-0";
-
-// A simple map to collect the results of a sequence of transactions.
-typedef std::map<OpId, Status, OpIdCompareFunctor> StatusesMap;
-
-class MockQueue : public PeerMessageQueue {
- public:
-  explicit MockQueue(const scoped_refptr<MetricEntity>& metric_entity, log::Log* log)
-    : PeerMessageQueue(metric_entity, log, FakeRaftPeerPB(kLocalPeerUuid), kTestTablet) {}
-  MOCK_METHOD1(Init, void(const OpId& locally_replicated_index));
-  MOCK_METHOD3(SetLeaderMode, void(int64_t committed_opid,
-                                   int64_t current_term,
-                                   const RaftConfigPB& active_config));
-  MOCK_METHOD0(SetNonLeaderMode, void());
-  virtual Status AppendOperations(const vector<ReplicateRefPtr>& msgs,
-                                  const StatusCallback& callback) OVERRIDE {
-    return AppendOperationsMock(msgs, callback);
-  }
-  MOCK_METHOD2(AppendOperationsMock, Status(const vector<ReplicateRefPtr>& msgs,
-                                            const StatusCallback& callback));
-  MOCK_METHOD1(TruncateOpsAfter, void(const OpId& op_id));
-  MOCK_METHOD1(TrackPeer, void(const string&));
-  MOCK_METHOD1(UntrackPeer, void(const string&));
-  MOCK_METHOD4(RequestForPeer, Status(const std::string& uuid,
-                                      ConsensusRequestPB* request,
-                                      std::vector<ReplicateRefPtr>* msg_refs,
-                                      bool* needs_tablet_copy));
-  MOCK_METHOD3(ResponseFromPeer, void(const std::string& peer_uuid,
-                                      const ConsensusResponsePB& response,
-                                      bool* more_pending));
-  MOCK_METHOD0(Close, void());
-};
-
-class MockPeerManager : public PeerManager {
- public:
-  MockPeerManager() : PeerManager("", "", nullptr, nullptr, nullptr, nullptr) {}
-  MOCK_METHOD1(UpdateRaftConfig, Status(const consensus::RaftConfigPB& config));
-  MOCK_METHOD1(SignalRequest, void(bool force_if_queue_empty));
-  MOCK_METHOD0(Close, void());
-};
-
-class RaftConsensusSpy : public RaftConsensus {
- public:
-  typedef Callback<Status(const scoped_refptr<ConsensusRound>& round)> AppendCallback;
-
-  RaftConsensusSpy(const ConsensusOptions& options,
-                   unique_ptr<ConsensusMetadata> cmeta,
-                   gscoped_ptr<PeerProxyFactory> proxy_factory,
-                   gscoped_ptr<PeerMessageQueue> queue,
-                   gscoped_ptr<PeerManager> peer_manager,
-                   gscoped_ptr<ThreadPool> thread_pool,
-                   const scoped_refptr<MetricEntity>& metric_entity,
-                   const std::string& peer_uuid,
-                   const scoped_refptr<server::Clock>& clock,
-                   ReplicaTransactionFactory* txn_factory,
-                   const scoped_refptr<log::Log>& log,
-                   const shared_ptr<MemTracker>& parent_mem_tracker,
-                   const Callback<void(const std::string& reason)>& mark_dirty_clbk)
-    : RaftConsensus(options,
-                    std::move(cmeta),
-                    std::move(proxy_factory),
-                    std::move(queue),
-                    std::move(peer_manager),
-                    std::move(thread_pool),
-                    metric_entity,
-                    peer_uuid,
-                    clock,
-                    txn_factory,
-                    log,
-                    parent_mem_tracker,
-                    mark_dirty_clbk) {
-    // These "aliases" allow us to count invocations and assert on them.
-    ON_CALL(*this, StartConsensusOnlyRoundUnlocked(_))
-        .WillByDefault(Invoke(this,
-              &RaftConsensusSpy::StartNonLeaderConsensusRoundUnlockedConcrete));
-    ON_CALL(*this, NonTxRoundReplicationFinished(_, _, _))
-        .WillByDefault(Invoke(this, &RaftConsensusSpy::NonTxRoundReplicationFinishedConcrete));
-  }
-
-  MOCK_METHOD1(AppendNewRoundToQueueUnlocked, Status(const scoped_refptr<ConsensusRound>& round));
-  Status AppendNewRoundToQueueUnlockedConcrete(const scoped_refptr<ConsensusRound>& round) {
-    return RaftConsensus::AppendNewRoundToQueueUnlocked(round);
-  }
-
-  MOCK_METHOD1(StartConsensusOnlyRoundUnlocked, Status(const ReplicateRefPtr& msg));
-  Status StartNonLeaderConsensusRoundUnlockedConcrete(const ReplicateRefPtr& msg) {
-    return RaftConsensus::StartConsensusOnlyRoundUnlocked(msg);
-  }
-
-  MOCK_METHOD3(NonTxRoundReplicationFinished, void(ConsensusRound* round,
-                                                   const StatusCallback& client_cb,
-                                                   const Status& status));
-  void NonTxRoundReplicationFinishedConcrete(ConsensusRound* round,
-                                             const StatusCallback& client_cb,
-                                             const Status& status) {
-    LOG(INFO) << "Committing round with opid " << round->id()
-              << " given Status " << status.ToString();
-    RaftConsensus::NonTxRoundReplicationFinished(round, client_cb, status);
-  }
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(RaftConsensusSpy);
-};
-
-void DoNothing(const string& s) {
-}
-
-class RaftConsensusTest : public KuduTest {
- public:
-  RaftConsensusTest()
-      : clock_(server::LogicalClock::CreateStartingAt(Timestamp(0))),
-        metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-consensus-test")),
-        schema_(GetSimpleTestSchema()) {
-    FLAGS_enable_leader_failure_detection = false;
-    options_.tablet_id = kTestTablet;
-  }
-
-  virtual void SetUp() OVERRIDE {
-    LogOptions options;
-    string test_path = GetTestPath("test-peer-root");
-
-    // TODO mock the Log too, since we're gonna mock the queue
-    // monitors and pretty much everything else.
-    fs_manager_.reset(new FsManager(env_.get(), test_path));
-    CHECK_OK(fs_manager_->CreateInitialFileSystemLayout());
-    CHECK_OK(fs_manager_->Open());
-    CHECK_OK(Log::Open(LogOptions(),
-                       fs_manager_.get(),
-                       kTestTablet,
-                       schema_,
-                       0, // schema_version
-                       nullptr,
-                       &log_));
-
-    queue_ = new MockQueue(metric_entity_, log_.get());
-    peer_manager_ = new MockPeerManager;
-    txn_factory_.reset(new MockTransactionFactory);
-
-    ON_CALL(*queue_, AppendOperationsMock(_, _))
-        .WillByDefault(Invoke(this, &RaftConsensusTest::AppendToLog));
-  }
-
-  void SetUpConsensus(int64_t initial_term = consensus::kMinimumTerm, int num_peers = 1) {
-    config_ = BuildRaftConfigPBForTests(num_peers);
-    config_.set_opid_index(kInvalidOpIdIndex);
-
-    gscoped_ptr<PeerProxyFactory> proxy_factory(new LocalTestPeerProxyFactory(nullptr));
-
-    string peer_uuid = config_.peers(num_peers - 1).permanent_uuid();
-
-    unique_ptr<ConsensusMetadata> cmeta;
-    CHECK_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid,
-                                       config_, initial_term, &cmeta));
-
-    gscoped_ptr<ThreadPool> thread_pool;
-    CHECK_OK(ThreadPoolBuilder("raft-pool") .Build(&thread_pool));
-
-    consensus_.reset(new RaftConsensusSpy(options_,
-                                          std::move(cmeta),
-                                          std::move(proxy_factory),
-                                          gscoped_ptr<PeerMessageQueue>(queue_),
-                                          gscoped_ptr<PeerManager>(peer_manager_),
-                                          std::move(thread_pool),
-                                          metric_entity_,
-                                          peer_uuid,
-                                          clock_,
-                                          txn_factory_.get(),
-                                          log_.get(),
-                                          MemTracker::GetRootTracker(),
-                                          Bind(&DoNothing)));
-
-    ON_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-        .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRound));
-  }
-
-  Status AppendToLog(const vector<ReplicateRefPtr>& msgs,
-                     const StatusCallback& callback) {
-    return log_->AsyncAppendReplicates(msgs,
-                                       Bind(LogAppendCallback, callback));
-  }
-
-  static void LogAppendCallback(const StatusCallback& callback,
-                                const Status& s) {
-    CHECK_OK(s);
-    callback.Run(s);
-  }
-
-  Status MockAppendNewRound(const scoped_refptr<ConsensusRound>& round) {
-    rounds_.push_back(round);
-    RETURN_NOT_OK(consensus_->AppendNewRoundToQueueUnlockedConcrete(round));
-    LOG(INFO) << "Round append: " << round->id() << ", ReplicateMsg: "
-              << round->replicate_msg()->ShortDebugString();
-    return Status::OK();
-  }
-
-  void SetUpGeneralExpectations() {
-    EXPECT_CALL(*peer_manager_, SignalRequest(_))
-        .Times(AnyNumber());
-    EXPECT_CALL(*peer_manager_, Close())
-        .Times(AtLeast(1));
-    EXPECT_CALL(*queue_, Close())
-        .Times(1);
-    EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-        .Times(AnyNumber());
-  }
-
-  // Create a ConsensusRequestPB suitable to send to a peer.
-  ConsensusRequestPB MakeConsensusRequest(int64_t caller_term,
-                                          const string& caller_uuid,
-                                          const OpId& preceding_opid);
-
-  // Add a single no-op with the given OpId to a ConsensusRequestPB.
-  void AddNoOpToConsensusRequest(ConsensusRequestPB* request, const OpId& noop_opid);
-
-  scoped_refptr<ConsensusRound> AppendNoOpRound() {
-    ReplicateRefPtr replicate_ptr(make_scoped_refptr_replicate(new ReplicateMsg));
-    replicate_ptr->get()->set_op_type(NO_OP);
-    replicate_ptr->get()->set_timestamp(clock_->Now().ToUint64());
-    scoped_refptr<ConsensusRound> round(new ConsensusRound(consensus_.get(), replicate_ptr));
-    round->SetConsensusReplicatedCallback(
-        Bind(&RaftConsensusSpy::NonTxRoundReplicationFinished,
-             Unretained(consensus_.get()), Unretained(round.get()), Bind(&DoNothingStatusCB)));
-
-    CHECK_OK(consensus_->Replicate(round));
-    LOG(INFO) << "Appended NO_OP round with opid " << round->id();
-    return round;
-  }
-
-  void DumpRounds() {
-    LOG(INFO) << "Dumping rounds...";
-    for (const scoped_refptr<ConsensusRound>& round : rounds_) {
-      LOG(INFO) << "Round: OpId " << round->id() << ", ReplicateMsg: "
-                << round->replicate_msg()->ShortDebugString();
-    }
-  }
-
- protected:
-  ConsensusOptions options_;
-  RaftConfigPB config_;
-  OpId initial_id_;
-  gscoped_ptr<FsManager> fs_manager_;
-  scoped_refptr<Log> log_;
-  gscoped_ptr<PeerProxyFactory> proxy_factory_;
-  scoped_refptr<server::Clock> clock_;
-  MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
-  const Schema schema_;
-  scoped_refptr<RaftConsensusSpy> consensus_;
-
-  vector<scoped_refptr<ConsensusRound> > rounds_;
-
-  // Mocks.
-  // NOTE: both 'queue_' and 'peer_manager_' belong to 'consensus_' and may be deleted before
-  // the test is.
-  MockQueue* queue_;
-  MockPeerManager* peer_manager_;
-  gscoped_ptr<MockTransactionFactory> txn_factory_;
-};
-
-ConsensusRequestPB RaftConsensusTest::MakeConsensusRequest(int64_t caller_term,
-                                                           const string& caller_uuid,
-                                                           const OpId& preceding_opid) {
-  ConsensusRequestPB request;
-  request.set_caller_term(caller_term);
-  request.set_caller_uuid(caller_uuid);
-  request.set_tablet_id(kTestTablet);
-  request.set_all_replicated_index(0);
-  *request.mutable_preceding_id() = preceding_opid;
-  return request;
-}
-
-void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request,
-                                                  const OpId& noop_opid) {
-  ReplicateMsg* noop_msg = request->add_ops();
-  *noop_msg->mutable_id() = noop_opid;
-  noop_msg->set_op_type(NO_OP);
-  noop_msg->set_timestamp(clock_->Now().ToUint64());
-  noop_msg->mutable_noop_request();
-}
-
-// Asserts that a ConsensusRound has an OpId set in its ReplicateMsg.
-MATCHER(HasOpId, "") { return arg->id().IsInitialized(); }
-
-// These matchers assert that a Status object is of a certain type.
-MATCHER(IsOk, "") { return arg.ok(); }
-MATCHER(IsAborted, "") { return arg.IsAborted(); }
-
-// Tests that consensus is able to handle pending operations. It tests this in two ways:
-// - It tests that consensus does the right thing with pending transactions from the the WAL.
-// - It tests that when a follower gets promoted to leader it does the right thing
-//   with the pending operations.
-TEST_F(RaftConsensusTest, TestPendingTransactions) {
-  SetUpConsensus(10);
-
-  // Emulate a stateful system by having a bunch of operations in flight when consensus starts.
-  // Specifically we emulate we're on term 10, with 5 operations before the last known
-  // committed operation, 10.104, which should be committed immediately, and 5 operations after the
-  // last known committed operation, which should be pending but not yet committed.
-  ConsensusBootstrapInfo info;
-  info.last_id.set_term(10);
-  for (int i = 0; i < 10; i++) {
-    auto replicate = new ReplicateMsg();
-    replicate->set_op_type(NO_OP);
-    info.last_id.set_index(100L + i);
-    replicate->mutable_id()->CopyFrom(info.last_id);
-    info.orphaned_replicates.push_back(replicate);
-  }
-
-  info.last_committed_id.set_term(10);
-  info.last_committed_id.set_index(104);
-
-  {
-    InSequence dummy;
-    // On start we expect 10 NO_OPs to be enqueued, with 5 of those having
-    // their commit continuation called immediately.
-    EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
-        .Times(10);
-
-    // Queue gets initted when the peer starts.
-    EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  }
-
-  ASSERT_OK(consensus_->Start(info));
-
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get()));
-  // Now we test what this peer does with the pending operations once it's elected leader.
-  {
-    InSequence dummy;
-    // Peer manager gets updated with the new set of peers to send stuff to.
-    EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-        .Times(1).WillOnce(Return(Status::OK()));
-    // The no-op should be appended to the queue.
-    // One more op will be appended for the election.
-    EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-        .Times(1);
-    EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-        .Times(1).WillRepeatedly(Return(Status::OK()));;
-  }
-
-  // Emulate an election, this will make this peer become leader and trigger the
-  // above set expectations.
-  ASSERT_OK(consensus_->EmulateElection());
-
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
-
-  // Commit the 5 no-ops from the previous term, along with the one pushed to
-  // assert leadership.
-  EXPECT_CALL(*consensus_.get(), NonTxRoundReplicationFinished(HasOpId(), _, IsOk()))
-      .Times(6);
-  EXPECT_CALL(*peer_manager_, SignalRequest(_))
-      .Times(AnyNumber());
-  // In the end peer manager and the queue get closed.
-  EXPECT_CALL(*peer_manager_, Close())
-      .Times(AtLeast(1));
-  EXPECT_CALL(*queue_, Close())
-      .Times(1);
-
-  // Now mark the last operation (the no-op round) as committed.
-  // This should advance the committed index, since that round in on our current term,
-  // and we should be able to commit all previous rounds.
-  int64_t cc_round_index = info.orphaned_replicates.back()->id().index() + 1;
-  consensus_->NotifyCommitIndex(cc_round_index);
-}
-
-MATCHER_P2(RoundHasOpId, term, index, "") {
-  LOG(INFO) << "expected: " << MakeOpId(term, index) << ", actual: " << arg->id();
-  return arg->id().term() == term && arg->id().index() == index;
-}
-
-MATCHER_P2(EqOpId, term, index, "") {
-  return arg.term() == term && arg.index() == index;
-}
-
-// Tests the case where a a leader is elected and pushed a sequence of
-// operations of which some never get committed. Eventually a new leader in a higher
-// term pushes operations that overwrite some of the original indexes.
-TEST_F(RaftConsensusTest, TestAbortOperations) {
-  SetUpConsensus(1, 2);
-
-  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-      .Times(AnyNumber());
-
-  EXPECT_CALL(*peer_manager_, SignalRequest(_))
-      .Times(AnyNumber());
-  EXPECT_CALL(*peer_manager_, Close())
-      .Times(AtLeast(1));
-  EXPECT_CALL(*queue_, Close())
-      .Times(1);
-  EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-      .Times(1)
-      .WillRepeatedly(Return(Status::OK()));
-
-  // We'll append to the queue 12 times, the initial noop txn + 10 initial ops while leader
-  // and the new leader's update, when we're overwriting operations.
-  EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-      .Times(12);
-
-  // .. but those will be overwritten later by another
-  // leader, which will push and commit 5 ops.
-  // Only these five should start as replica rounds.
-  EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
-      .Times(4);
-
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-  ASSERT_OK(consensus_->EmulateElection());
-
-  // Append 10 rounds: 2.2 - 2.11
-  for (int i = 0; i < 10; i++) {
-    AppendNoOpRound();
-  }
-
-  // Expectations for what gets committed and what gets aborted:
-  // (note: the aborts may be triggered before the commits)
-  // 5 OK's for the 2.1-2.5 ops.
-  // 6 Aborts for the 2.6-2.11 ops.
-  // 1 OK for the 3.6 op.
-  for (int index = 1; index < 6; index++) {
-    EXPECT_CALL(*consensus_.get(),
-                NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsOk())).Times(1);
-  }
-  for (int index = 6; index < 12; index++) {
-    EXPECT_CALL(*consensus_.get(),
-                NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsAborted())).Times(1);
-  }
-  EXPECT_CALL(*consensus_.get(),
-              NonTxRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1);
-  EXPECT_CALL(*queue_, TruncateOpsAfter(EqOpId(2, 5))).Times(1);
-
-  // Nothing's committed so far, so now just send an Update() message
-  // emulating another guy got elected leader and is overwriting a suffix
-  // of the previous messages.
-  // In particular this request has:
-  // - Op 2.5 from the previous leader's term
-  // - Ops 3.6-3.9 from the new leader's term
-  // - A new committed index of 3.6
-  ConsensusRequestPB request;
-  request.set_caller_term(3);
-  const string PEER_0_UUID = "peer-0";
-  request.set_caller_uuid(PEER_0_UUID);
-  request.set_tablet_id(kTestTablet);
-  request.set_all_replicated_index(0);
-  request.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
-
-  ReplicateMsg* replicate = request.add_ops();
-  replicate->mutable_id()->CopyFrom(MakeOpId(2, 5));
-  replicate->set_op_type(NO_OP);
-
-  ReplicateMsg* noop_msg = request.add_ops();
-  noop_msg->mutable_id()->CopyFrom(MakeOpId(3, 6));
-  noop_msg->set_op_type(NO_OP);
-  noop_msg->set_timestamp(clock_->Now().ToUint64());
-  noop_msg->mutable_noop_request();
-
-  // Overwrite another 3 of the original rounds for a total of 4 overwrites.
-  for (int i = 7; i < 10; i++) {
-    ReplicateMsg* replicate = request.add_ops();
-    replicate->mutable_id()->CopyFrom(MakeOpId(3, i));
-    replicate->set_op_type(NO_OP);
-    replicate->set_timestamp(clock_->Now().ToUint64());
-  }
-
-  request.set_committed_index(6);
-
-  ConsensusResponsePB response;
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.has_error());
-
-  ASSERT_TRUE(Mock::VerifyAndClearExpectations(consensus_.get()));
-
-  // Now we expect to commit ops 3.7 - 3.9.
-  for (int index = 7; index < 10; index++) {
-    EXPECT_CALL(*consensus_.get(),
-                NonTxRoundReplicationFinished(RoundHasOpId(3, index), _, IsOk())).Times(1);
-  }
-
-  request.mutable_ops()->Clear();
-  request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9));
-  request.set_committed_index(9);
-
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.has_error());
-}
-
-TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) {
-  SetUpConsensus();
-  OpId opid;
-  ASSERT_OK(consensus_->GetLastOpId(RECEIVED_OPID, &opid));
-  ASSERT_TRUE(opid.IsInitialized());
-  ASSERT_OPID_EQ(opid, MinimumOpId());
-}
-
-// Ensure that followers reset their "last_received_current_leader"
-// ConsensusStatusPB field when a new term is encountered. This is a
-// correctness test for the logic on the follower side that allows the
-// leader-side queue to determine which op to send next in various scenarios.
-TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) {
-  SetUpConsensus(kMinimumTerm, 3);
-  SetUpGeneralExpectations();
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-
-  ConsensusRequestPB request;
-  ConsensusResponsePB response;
-  int64_t caller_term = 0;
-  int64_t log_index = 0;
-
-  caller_term = 1;
-  string caller_uuid = config_.peers(0).permanent_uuid();
-  OpId preceding_opid = MinimumOpId();
-
-  // Heartbeat. This will cause the term to increment on the follower.
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), MinimumOpId());
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
-
-  // Replicate a no-op.
-  OpId noop_opid = MakeOpId(caller_term, ++log_index);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(),  noop_opid);
-
-  // New leader heartbeat. Term increase to 2.
-  // The preceding_opid is the no-op replicated above. This will match on the
-  // follower side, so it can update its last_received_current_leader to
-  // the same operation (indicating to the queue that it doesn't need to re-replicate
-  // this operation).
-  caller_term = 2;
-  caller_uuid = config_.peers(1).permanent_uuid();
-  preceding_opid = noop_opid;
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), preceding_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), preceding_opid);
-
-  // Append a no-op.
-  noop_opid = MakeOpId(caller_term, ++log_index);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
-
-  // New leader heartbeat. The term should rev but we should get an LMP mismatch.
-  caller_term = 3;
-  caller_uuid = config_.peers(0).permanent_uuid();
-  preceding_opid = MakeOpId(caller_term, log_index + 1); // Not replicated yet.
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid); // Not preceding this time.
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
-  ASSERT_TRUE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, response.status().error().code());
-
-  // Decrement preceding and append a no-op.
-  preceding_opid = MakeOpId(2, log_index);
-  noop_opid = MakeOpId(caller_term, ++log_index);
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid)
-      << response.ShortDebugString();
-
-  // Happy case. New leader with new no-op to append right off the bat.
-  // Response should be OK with all last_received* fields equal to the new no-op.
-  caller_term = 4;
-  caller_uuid = config_.peers(1).permanent_uuid();
-  preceding_opid = noop_opid;
-  noop_opid = MakeOpId(caller_term, ++log_index);
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
-}
-
-}  // namespace consensus
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index aaaf228..0cb724e 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -231,6 +231,7 @@ RaftConsensus::RaftConsensus(
           FLAGS_raft_heartbeat_interval_ms *
           FLAGS_leader_failure_max_missed_heartbeat_periods))),
       withhold_votes_until_(MonoTime::Min()),
+      last_received_cur_leader_(MinimumOpId()),
       mark_dirty_clbk_(std::move(mark_dirty_clbk)),
       shutdown_(false),
       follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter(
@@ -283,7 +284,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
 
     state_->SetInitialCommittedOpIdUnlocked(info.last_committed_id);
 
-    queue_->Init(state_->GetLastReceivedOpIdUnlocked());
+    queue_->Init(info.last_id);
   }
 
   {
@@ -413,7 +414,7 @@ Status RaftConsensus::StartElection(ElectionMode mode) {
     request.set_candidate_term(state_->GetCurrentTermUnlocked());
     request.set_tablet_id(state_->GetOptions().tablet_id);
     *request.mutable_candidate_status()->mutable_last_received() =
-        state_->GetLastReceivedOpIdUnlocked();
+        queue_->GetLastOpIdInLog();
 
     election.reset(new LeaderElection(active_config,
                                       peer_proxy_factory_.get(),
@@ -548,7 +549,7 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
 }
 
 Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
-  state_->NewIdUnlocked(round->replicate_msg()->mutable_id());
+  *round->replicate_msg()->mutable_id() = queue_->GetNextOpId();
   RETURN_NOT_OK(state_->AddPendingOperation(round));
 
   Status s = queue_->AppendOperation(round->replicate_scoped_refptr());
@@ -556,23 +557,17 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
   // Handle Status::ServiceUnavailable(), which means the queue is full.
   if (PREDICT_FALSE(s.IsServiceUnavailable())) {
     gscoped_ptr<OpId> id(round->replicate_msg()->release_id());
-    // Rollback the id gen. so that we reuse this id later, when we can
-    // actually append to the state machine, i.e. this makes the state
-    // machine have continuous ids, for the same term, even if the queue
-    // refused to add any more operations.
+    // Cancel the operation that we started.
     state_->CancelPendingOperation(*id);
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << ": Could not append replicate request "
                  << "to the queue. Queue is Full. "
                  << "Queue metrics: " << queue_->ToString();
-
-    // TODO Possibly evict a dangling peer from the configuration here.
-    // TODO count of number of ops failed due to consensus queue overflow.
+    // TODO(todd) count of number of ops failed due to consensus queue overflow.
   } else if (PREDICT_FALSE(s.IsIOError())) {
     // This likely came from the log.
     LOG(FATAL) << "IO error appending to the queue: " << s.ToString();
   }
   RETURN_NOT_OK_PREPEND(s, "Unable to append operation to consensus queue");
-  state_->UpdateLastReceivedOpIdUnlocked(round->id());
   return Status::OK();
 }
 
@@ -754,7 +749,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
   // The leader's preceding id.
   deduplicated_req->preceding_opid = &rpc_req->preceding_id();
 
-  int64_t dedup_up_to_index = state_->GetLastReceivedOpIdUnlocked().index();
+  int64_t dedup_up_to_index = queue_->GetLastOpIdInLog().index();
 
   deduplicated_req->first_message_idx = -1;
 
@@ -844,7 +839,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
   string error_msg = Substitute(
     "Log matching property violated."
     " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)",
-    state_->GetLastReceivedOpIdUnlocked().ShortDebugString(),
+    queue_->GetLastOpIdInLog().ShortDebugString(),
     req.preceding_opid->ShortDebugString(),
     term_mismatch ? "term" : "index");
 
@@ -875,10 +870,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
 
 void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
   state_->AbortOpsAfterUnlocked(truncate_after_index);
-  // Above resets the 'last received' to the operation with index 'truncate_after_index'.
-  OpId new_last_received = state_->GetLastReceivedOpIdUnlocked();
-  DCHECK_EQ(truncate_after_index, new_last_received.index());
-  queue_->TruncateOpsAfter(new_last_received);
+  queue_->TruncateOpsAfter(truncate_after_index);
 }
 
 Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
@@ -1215,16 +1207,10 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to));
     queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());
 
-    // We can now update the last received watermark.
-    //
-    // We do it here (and before we actually hear back from the wal whether things
-    // are durable) so that, if we receive another, possible duplicate, message
-    // that exercises this path we don't handle these messages twice.
-    //
     // If any messages failed to be started locally, then we already have removed them
-    // from 'deduped_req' at this point. So, we can simply update our last-received
-    // watermark to the last message that remains in 'deduped_req'.
-    state_->UpdateLastReceivedOpIdUnlocked(last_from_leader);
+    // from 'deduped_req' at this point. So, 'last_from_leader' is the last one that
+    // we might apply.
+    last_received_cur_leader_ = last_from_leader;
 
     // Fill the response with the current state. We will not mutate anymore state until
     // we actually reply to the leader, we'll just wait for the messages to be durable.
@@ -1269,12 +1255,11 @@ void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* respons
   TRACE("Filling consensus response to leader.");
   response->set_responder_term(state_->GetCurrentTermUnlocked());
   response->mutable_status()->mutable_last_received()->CopyFrom(
-      state_->GetLastReceivedOpIdUnlocked());
+      queue_->GetLastOpIdInLog());
   response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
-      state_->GetLastReceivedOpIdCurLeaderUnlocked());
-  // TODO: interrogate queue rather than state?
+      last_received_cur_leader_);
   response->mutable_status()->set_last_committed_idx(
-      state_->GetCommittedIndexUnlocked());
+      queue_->GetCommittedIndex());
 }
 
 void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
@@ -1879,7 +1864,7 @@ Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
   ReplicaState::UniqueLock lock;
   RETURN_NOT_OK(state_->LockForRead(&lock));
   if (type == RECEIVED_OPID) {
-    *DCHECK_NOTNULL(id) = state_->GetLastReceivedOpIdUnlocked();
+    *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
   } else if (type == COMMITTED_OPID) {
     id->set_term(state_->GetTermWithLastCommittedOpUnlocked());
     id->set_index(state_->GetCommittedIndexUnlocked());
@@ -2038,6 +2023,7 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
   RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term, flush));
   term_metric_->set_value(new_term);
+  last_received_cur_leader_ = MinimumOpId();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index ec48edb..887201b 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -459,9 +459,14 @@ class RaftConsensus : public Consensus,
   // nodes from disturbing the healthy leader.
   MonoTime withhold_votes_until_;
 
+  // The last OpId received from the current leader. This is updated whenever the follower
+  // accepts operations from a leader, and passed back so that the leader knows from what
+  // point to continue sending operations.
+  OpId last_received_cur_leader_;
+
   const Callback<void(const std::string& reason)> mark_dirty_clbk_;
 
-  // TODO hack to serialize updates due to repeated/out-of-order messages
+  // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
   // should probably be refactored out.
   //
   // Lock ordering note: If both this lock and the ReplicaState lock are to be

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 04e894a..9d63266 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -285,14 +285,9 @@ class RaftConsensusQuorumTest : public KuduTest {
   void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) {
     scoped_refptr<RaftConsensus> peer;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
-    ReplicaState* state = peer->GetReplicaStateForTests();
     while (true) {
-      {
-        ReplicaState::UniqueLock lock;
-        CHECK_OK(state->LockForRead(&lock));
-        if (OpIdCompare(state->GetLastReceivedOpIdUnlocked(), to_wait_for) >= 0) {
-          return;
-        }
+      if (OpIdCompare(peer->queue_->GetLastOpIdInLog(), to_wait_for) >= 0) {
+        return;
       }
       SleepFor(MonoDelta::FromMilliseconds(1));
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index a212fad..f99fb28 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -48,10 +48,7 @@ ReplicaState::ReplicaState(ConsensusOptions options, string peer_uuid,
     : options_(std::move(options)),
       peer_uuid_(std::move(peer_uuid)),
       cmeta_(std::move(cmeta)),
-      next_index_(0),
       txn_factory_(txn_factory),
-      last_received_op_id_(MinimumOpId()),
-      last_received_op_id_current_leader_(MinimumOpId()),
       last_committed_op_id_(MinimumOpId()),
       state_(kInitialized) {
   CHECK(cmeta_) << "ConsensusMeta passed as NULL";
@@ -71,9 +68,6 @@ Status ReplicaState::StartUnlocked(const OpId& last_id_in_wal) {
         GetCurrentTermUnlocked()));
   }
 
-  next_index_ = last_id_in_wal.index() + 1;
-  last_received_op_id_.CopyFrom(last_id_in_wal);
-
   state_ = kRunning;
   return Status::OK();
 }
@@ -266,13 +260,11 @@ bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch
     return true;
   }
 
-  if (op_id.index() > GetLastReceivedOpIdUnlocked().index()) {
+  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index());
+  if (!round) {
     return false;
   }
 
-  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index());
-  DCHECK(round);
-
   if (round->id().term() != op_id.term()) {
     *term_mismatch = true;
     return false;
@@ -296,7 +288,6 @@ Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term,
     CHECK_OK(cmeta_->Flush());
   }
   ClearLeaderUnlocked();
-  last_received_op_id_current_leader_ = MinimumOpId();
   return Status::OK();
 }
 
@@ -405,12 +396,6 @@ void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
     new_preceding = last_committed_op_id_;
   }
 
-  // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it
-  // here to avoid the bounds check, since we're breaking monotonicity.
-  last_received_op_id_ = new_preceding;
-  last_received_op_id_current_leader_ = last_received_op_id_;
-  next_index_ = new_preceding.index() + 1;
-
   for (; iter != pending_txns_.end();) {
     const scoped_refptr<ConsensusRound>& round = (*iter).second;
     auto op_type = round->replicate_msg()->op_type();
@@ -611,53 +596,19 @@ Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const {
   return Status::OK();
 }
 
-void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpId& op_id) {
-  DCHECK(update_lock_.is_locked());
-  if (OpIdCompare(op_id, last_received_op_id_) > 0) {
-    TRACE("Updating last received op as $0", OpIdToString(op_id));
-    last_received_op_id_ = op_id;
-    next_index_ = op_id.index() + 1;
-  }
-  last_received_op_id_current_leader_ = op_id;
-}
-
-const OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return last_received_op_id_;
-}
-
-const OpId& ReplicaState::GetLastReceivedOpIdCurLeaderUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return last_received_op_id_current_leader_;
-}
-
 OpId ReplicaState::GetLastPendingTransactionOpIdUnlocked() const {
   DCHECK(update_lock_.is_locked());
   return pending_txns_.empty()
       ? MinimumOpId() : (--pending_txns_.end())->second->id();
 }
 
-void ReplicaState::NewIdUnlocked(OpId* id) {
-  DCHECK(update_lock_.is_locked());
-  id->set_term(GetCurrentTermUnlocked());
-  id->set_index(next_index_++);
-}
 
 void ReplicaState::CancelPendingOperation(const OpId& id) {
   OpId previous = id;
   previous.set_index(previous.index() - 1);
   DCHECK(update_lock_.is_locked());
   CHECK_EQ(GetCurrentTermUnlocked(), id.term());
-  CHECK_EQ(next_index_, id.index() + 1);
-  next_index_ = id.index();
-
-  // We don't use UpdateLastReceivedOpIdUnlocked because we're actually
-  // updating it back to a lower value and we need to avoid the checks
-  // that method has.
 
-  // This is only ok if we do _not_ release the lock after calling
-  // NewIdUnlocked() (which we don't in RaftConsensus::Replicate()).
-  last_received_op_id_ = previous;
   scoped_refptr<ConsensusRound> round = EraseKeyReturnValuePtr(&pending_txns_, id.index());
   DCHECK(round);
 }
@@ -696,10 +647,9 @@ string ReplicaState::ToString() const {
 
 string ReplicaState::ToStringUnlocked() const {
   DCHECK(update_lock_.is_locked());
-  return Substitute("Replica: $0, State: $1, Role: $2, Watermarks: {Received: $3, Committed: $4}",
+  return Substitute("Replica: $0, State: $1, Role: $2, Last Committed: $3",
                     peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()),
-                    last_received_op_id_.ShortDebugString(),
-                    last_committed_op_id_.ShortDebugString());
+                    OpIdToString(last_committed_op_id_));
 }
 
 Status ReplicaState::CheckOpInSequence(const OpId& previous, const OpId& current) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index e27b7b3..d5d6fe2 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -269,19 +269,6 @@ class ReplicaState {
   // Returns OK iff an op from the current term has been committed.
   Status CheckHasCommittedOpInCurrentTermUnlocked() const;
 
-  // Updates the last received operation, if 'op_id''s index is higher than
-  // the previous last received. Also updates 'last_received_from_current_leader_'
-  // regardless of whether it is higher or lower than the prior value.
-  //
-  // This must be called under a lock.
-  void UpdateLastReceivedOpIdUnlocked(const OpId& op_id);
-
-  // Returns the last received op id. This must be called under the lock.
-  const OpId& GetLastReceivedOpIdUnlocked() const;
-
-  // Returns the id of the last op received from the current leader.
-  const OpId& GetLastReceivedOpIdCurLeaderUnlocked() const;
-
   // Returns the id of the latest pending transaction (i.e. the one with the
   // latest index). This must be called under the lock.
   OpId GetLastPendingTransactionOpIdUnlocked() const;
@@ -291,8 +278,6 @@ class ReplicaState {
   // to complete. This does not cancel transactions being applied.
   Status CancelPendingTransactions();
 
-  void NewIdUnlocked(OpId* id);
-
   // Used when, for some reason, an operation that failed before it could be considered
   // a part of the state machine. Basically restores the id gen to the state it was before
   // generating 'id'.
@@ -338,10 +323,6 @@ class ReplicaState {
   // Consensus metadata persistence object.
   std::unique_ptr<ConsensusMetadata> cmeta_;
 
-  // Used by the LEADER. This is the index of the next operation generated
-  // by this LEADER.
-  int64_t next_index_;
-
   // Index=>Round map that manages pending ops, i.e. operations for which we've
   // received a replicate message from the leader but have yet to be committed.
   // The key is the index of the replicate operation.
@@ -351,22 +332,6 @@ class ReplicaState {
   // this factory to start it.
   ReplicaTransactionFactory* txn_factory_;
 
-  // The id of the last received operation, which corresponds to the last entry
-  // written to the local log. Operations whose id is lower than or equal to
-  // this id do not need to be resent by the leader. This is not guaranteed to
-  // be monotonically increasing due to the possibility for log truncation and
-  // aborted operations when a leader change occurs.
-  OpId last_received_op_id_;
-
-  // Same as last_received_op_id_ but only includes operations sent by the
-  // current leader. The "term" in this op may not actually match the current
-  // term, since leaders may replicate ops from prior terms.
-  //
-  // As an implementation detail, this field is reset to MinumumOpId() every
-  // time there is a term advancement on the local node, to simplify the logic
-  // involved in resetting this every time a new node becomes leader.
-  OpId last_received_op_id_current_leader_;
-
   // The OpId of the Apply that was last triggered when the last message from the leader
   // was received. Initialized to MinimumOpId().
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 1bbef3c..5d94a9b 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -897,10 +897,9 @@ TEST_F(RaftConsensusITest, TestChurnyElections) {
 }
 
 // The same test, except inject artificial latency when propagating notifications
-// from the queue back to consensus. This can reproduce bugs like KUDU-1078 which
-// normally only appear under high load. TODO: Re-enable once we get to the
-// bottom of KUDU-1078.
-TEST_F(RaftConsensusITest, DISABLED_TestChurnyElections_WithNotificationLatency) {
+// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
+// normally only appear under high load.
+TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
   DoTestChurnyElections(WITH_NOTIFICATION_LATENCY);
 }
 


[4/8] kudu git commit: maintenance_manager-test: fix flaky test

Posted by to...@apache.org.
maintenance_manager-test: fix flaky test

TestCompletedOps failed relatively often because of the following racy
interleaving:

- Multiple MM threads called Prepare() while remaining_runs_ was positive
- These threads then called Perform(), which decremented remaining_runs_
  once per thread. This resulted in the op being called more times than
  'remaining_runs_', causing an assertion failure.

The fix is to separately count the number of 'prepared' ops.

Change-Id: I003e72339f69228195130c89572a20be3009f22c
Reviewed-on: http://gerrit.cloudera.org:8080/4634
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/07ce9fbc
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/07ce9fbc
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/07ce9fbc

Branch: refs/heads/master
Commit: 07ce9fbce4fcdcfafe01db0347b69c0283df5e2c
Parents: 4db8851
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Oct 5 14:37:50 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 22:26:33 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/maintenance_manager-test.cc | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/07ce9fbc/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index 2448766..cb124af 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -86,6 +86,7 @@ class TestMaintenanceOp : public MaintenanceOp {
       maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
       maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)),
       remaining_runs_(1),
+      prepared_runs_(0),
       sleep_time_(MonoDelta::FromSeconds(0)) {
   }
 
@@ -96,6 +97,8 @@ class TestMaintenanceOp : public MaintenanceOp {
     if (remaining_runs_ == 0) {
       return false;
     }
+    remaining_runs_--;
+    prepared_runs_++;
     DLOG(INFO) << "Prepared op " << name();
     return true;
   }
@@ -104,8 +107,11 @@ class TestMaintenanceOp : public MaintenanceOp {
     {
       std::lock_guard<Mutex> guard(lock_);
       DLOG(INFO) << "Performing op " << name();
-      CHECK_GE(remaining_runs_, 1);
-      remaining_runs_--;
+
+      // Ensure that we don't call Perform() more times than we returned
+      // success from Prepare().
+      CHECK_GE(prepared_runs_, 1);
+      prepared_runs_--;
     }
 
     SleepFor(sleep_time_);
@@ -166,6 +172,8 @@ class TestMaintenanceOp : public MaintenanceOp {
   // The number of remaining times this operation will run before disabling
   // itself.
   int remaining_runs_;
+  // The number of Prepared() operations which have not yet been Perform()ed.
+  int prepared_runs_;
 
   // The amount of time each op invocation will sleep.
   MonoDelta sleep_time_;


[6/8] kudu git commit: [java client] Retry regressing counts in ITClient

Posted by to...@apache.org.
[java client] Retry regressing counts in ITClient

There's currently no sure way to read your writes, even with snapshot scans, so we
can either retry counting rows or ignore it. This patch is doing the former, unless
we hit some artificial timeout.

Change-Id: I1e79a6c7aaf069294a6ca40e487947d14d9f2aa7
Reviewed-on: http://gerrit.cloudera.org:8080/4597
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/98130e3c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/98130e3c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/98130e3c

Branch: refs/heads/master
Commit: 98130e3c40780e80dd46e9c729424554f2c77a4f
Parents: f26ab7d
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Mon Oct 3 09:16:18 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Oct 5 23:30:18 2016 +0000

----------------------------------------------------------------------
 .../java/org/apache/kudu/client/ITClient.java   | 38 +++++++++++++-------
 1 file changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/98130e3c/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index d4ffec0..1ff242d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -380,21 +380,35 @@ public class ITClient extends BaseKuduTest {
      * @return true if the full scan was successful, false if there was an error
      */
     private boolean fullScan() {
-      KuduScanner scanner = getScannerBuilder().build();
-      try {
-        int rowCount = countRowsInScan(scanner);
-        if (rowCount < lastRowCount) {
-          reportError("Row count regressed: " + rowCount + " < " + lastRowCount, null);
-          return false;
+      int rowCount;
+      DeadlineTracker deadlineTracker = new DeadlineTracker();
+      deadlineTracker.setDeadline(DEFAULT_SLEEP);
+
+      while (KEEP_RUNNING_LATCH.getCount() > 0 && !deadlineTracker.timedOut()) {
+        KuduScanner scanner = getScannerBuilder().build();
+
+        try {
+          rowCount = countRowsInScan(scanner);
+        } catch (KuduException e) {
+          return checkAndReportError("Got error while row counting", e);
+        }
+
+        if (rowCount >= lastRowCount) {
+          if (rowCount > lastRowCount) {
+            lastRowCount = rowCount;
+            LOG.info("New row count {}", lastRowCount);
+          }
+          return true;
         }
-        if (rowCount > lastRowCount) {
-          lastRowCount = rowCount;
-          LOG.info("New row count {}", lastRowCount);
+
+        // Due to the lack of KUDU-430, we need to loop until the row count stops regressing.
+        try {
+          KEEP_RUNNING_LATCH.await(50, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          // No need to do anything, we'll exit the loop once we test getCount() in the condition.
         }
-      } catch (KuduException e) {
-        return checkAndReportError("Got error while row counting", e);
       }
-      return true;
+      return !deadlineTracker.timedOut();
     }
 
     private KuduScanner.KuduScannerBuilder getScannerBuilder() {


[7/8] kudu git commit: KUDU-1669. Java ITClient test can orphan processes

Posted by to...@apache.org.
KUDU-1669. Java ITClient test can orphan processes

This removes the interruption of threads, which is the reason why we were orphaning
processes (in the ChaosThread).

Change-Id: I0085323ba9ea544488c6ce896d8627f24ea69f4b
Reviewed-on: http://gerrit.cloudera.org:8080/4598
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3c33847f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3c33847f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3c33847f

Branch: refs/heads/master
Commit: 3c33847fc5f70ab6d65fa2fe93d5bde185e4365e
Parents: 98130e3
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Oct 5 14:40:39 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Oct 5 23:36:45 2016 +0000

----------------------------------------------------------------------
 .../src/test/java/org/apache/kudu/client/ITClient.java          | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3c33847f/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 1ff242d..13fae25 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -116,8 +116,8 @@ public class ITClient extends BaseKuduTest {
     KEEP_RUNNING_LATCH.countDown();
 
     for (Thread thread : threads) {
-      thread.interrupt();
-      thread.join();
+      // Give plenty of time for threads to stop.
+      thread.join(DEFAULT_SLEEP);
     }
 
     AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
@@ -166,7 +166,6 @@ public class ITClient extends BaseKuduTest {
           }
           KEEP_RUNNING_LATCH.await(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-          e.printStackTrace();
           return;
         }
 


[3/8] kudu git commit: KUDU-1612 - [python] Enable setting of read mode for scanning

Posted by to...@apache.org.
KUDU-1612 - [python] Enable setting of read mode for scanning

Currently the python client is unable to set the read mode for
scanning, so all scans are done as READ_LATEST.  This patch enables
the ability to set the read mode so that the python client can read
at snapshots. This patch includes multiple tests.

Change-Id: I2c61ef09f6e15bad2c44d9caf85b2cc2582b8a49
Reviewed-on: http://gerrit.cloudera.org:8080/4520
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4db8851c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4db8851c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4db8851c

Branch: refs/heads/master
Commit: 4db8851cf31f68f2db8dafc41a4609017371501b
Parents: bda9b94
Author: Jordan Birdsell <jo...@gmail.com>
Authored: Thu Sep 22 19:11:15 2016 -0500
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Wed Oct 5 21:53:07 2016 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py             |   4 +-
 python/kudu/client.pyx              | 168 ++++++++++++++++++++++++++++++-
 python/kudu/libkudu_client.pxd      |  11 +-
 python/kudu/tests/test_scanner.py   |  32 ++++++
 python/kudu/tests/test_scantoken.py |  33 ++++++
 python/kudu/tests/util.py           |  33 ++++++
 python/kudu/util.py                 |  15 +++
 7 files changed, 289 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 1a1ff39..99ca0d9 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -21,7 +21,9 @@ from kudu.client import (Client, Table, Scanner, Session,  # noqa
                          ScanToken,
                          FLUSH_AUTO_BACKGROUND,
                          FLUSH_AUTO_SYNC,
-                         FLUSH_MANUAL)
+                         FLUSH_MANUAL,
+                         READ_LATEST,
+                         READ_AT_SNAPSHOT)
 
 from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound,  # noqa
                          KuduNotSupported,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 29004bb..150997d 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -28,12 +28,21 @@ from libkudu_client cimport *
 from kudu.compat import tobytes, frombytes
 from kudu.schema cimport Schema, ColumnSchema
 from kudu.errors cimport check_status
-from kudu.util import to_unixtime_micros, from_unixtime_micros
+from kudu.util import to_unixtime_micros, from_unixtime_micros, from_hybridtime
 from errors import KuduException
 
 import six
 
 
+# Read mode enums
+READ_LATEST = ReadMode_Latest
+READ_AT_SNAPSHOT = ReadMode_Snapshot
+
+cdef dict _read_modes = {
+    'latest': ReadMode_Latest,
+    'snapshot': ReadMode_Snapshot
+}
+
 cdef dict _type_names = {
     KUDU_INT8 : "KUDU_INT8",
     KUDU_INT16 : "KUDU_INT16",
@@ -240,6 +249,25 @@ cdef class Client:
         # Nothing yet to clean up here
         pass
 
+    def latest_observed_timestamp(self):
+        """
+        Get the highest timestamp observed by the client in UTC. This
+        is intended to gain external consistency across clients.
+
+        Note: The latest observed timestamp can also be used to start a
+        snapshot scan on a table which is guaranteed to contain all data
+        written or previously read by this client. This should be treated
+        as experimental as it this method will change or disappear in a
+        future release. Additionally, note that 1 must be added to the
+        value to be used in snapshot reads (this is taken care of in the
+        from_hybridtime method).
+
+        Returns
+        -------
+        latest : datetime.datetime
+        """
+        return from_hybridtime(self.cp.GetLatestObservedTimestamp())
+
     def create_table(self, table_name, Schema schema, partitioning, n_replicas=None):
         """
         Creates a new Kudu table from the passed Schema and options.
@@ -1286,6 +1314,75 @@ cdef class Scanner:
         check_status(self.scanner.SetProjectedColumnIndexes(v_indexes))
         return self
 
+    def set_read_mode(self, read_mode):
+        """
+        Set the read mode for scanning.
+
+        Parameters
+        ----------
+        read_mode : {'latest', 'snapshot'}
+          You can also use the constants READ_LATEST, READ_AT_SNAPSHOT
+
+        Returns
+        -------
+        self : Scanner
+        """
+        cdef ReadMode rmode
+
+        def invalid_selection_policy():
+            raise ValueError('Invalid read mode: {0}'
+                             .format(read_mode))
+
+        if isinstance(read_mode, int):
+            if 0 <= read_mode < len(_read_modes):
+                check_status(self.scanner.SetReadMode(
+                             <ReadMode> read_mode))
+            else:
+                invalid_selection_policy()
+        else:
+            try:
+                check_status(self.scanner.SetReadMode(
+                    _read_modes[read_mode.lower()]))
+            except KeyError:
+                invalid_selection_policy()
+
+        return self
+
+    def set_snapshot(self, timestamp, format=None):
+        """
+        Set the snapshot timestamp for this scanner.
+
+        Parameters
+        ---------
+        timestamp : datetime.datetime or string
+          If a string is provided, a format must be provided as well.
+          NOTE: This should be in UTC. If a timezone aware datetime
+          object is provided, it will be converted to UTC, otherwise,
+          all other input is assumed to be UTC.
+        format : Required if a string timestamp is provided
+          Uses the C strftime() function, see strftime(3) documentation.
+
+        Returns
+        -------
+        self : Scanner
+        """
+        # Confirm that a format is provided if timestamp is a string
+        if isinstance(timestamp, six.string_types) and not format:
+            raise ValueError(
+                "To use a string timestamp you must provide a format. " +
+                "See the strftime(3) documentation.")
+
+        snapshot_micros = to_unixtime_micros(timestamp, format)
+
+        if snapshot_micros >= 0:
+            check_status(self.scanner.SetSnapshotMicros(
+                         <uint64_t> snapshot_micros))
+        else:
+            raise ValueError(
+                "Snapshot Timestamps be greater than the unix epoch.")
+
+        return self
+
     def set_fault_tolerant(self):
         """
         Makes the underlying KuduScanner fault tolerant.
@@ -1553,6 +1650,75 @@ cdef class ScanTokenBuilder:
         check_status(self._builder.SetBatchSizeBytes(batch_size))
         return self
 
+    def set_read_mode(self, read_mode):
+        """
+        Set the read mode for scanning.
+
+        Parameters
+        ----------
+        read_mode : {'latest', 'snapshot'}
+          You can also use the constants READ_LATEST, READ_AT_SNAPSHOT
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        cdef ReadMode rmode
+
+        def invalid_selection_policy():
+            raise ValueError('Invalid read mode: {0}'
+                             .format(read_mode))
+
+        if isinstance(read_mode, int):
+            if 0 <= read_mode < len(_read_modes):
+                check_status(self._builder.SetReadMode(
+                             <ReadMode> read_mode))
+            else:
+                invalid_selection_policy()
+        else:
+            try:
+                check_status(self._builder.SetReadMode(
+                    _read_modes[read_mode.lower()]))
+            except KeyError:
+                invalid_selection_policy()
+
+        return self
+
+    def set_snapshot(self, timestamp, format=None):
+        """
+        Set the snapshot timestamp for this ScanTokenBuilder.
+
+        Parameters
+        ---------
+        timestamp : datetime.datetime or string
+          If a string is provided, a format must be provided as well.
+          NOTE: This should be in UTC. If a timezone aware datetime
+          object is provided, it will be converted to UTC, otherwise,
+          all other input is assumed to be UTC.
+        format : Required if a string timestamp is provided
+          Uses the C strftime() function, see strftime(3) documentation.
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        # Confirm that a format is provided if timestamp is a string
+        if isinstance(timestamp, six.string_types) and not format:
+            raise ValueError(
+                "To use a string timestamp you must provide a format. " +
+                "See the strftime(3) documentation.")
+
+        snapshot_micros = to_unixtime_micros(timestamp, format)
+
+        if snapshot_micros >= 0:
+            check_status(self._builder.SetSnapshotMicros(
+                         <uint64_t> snapshot_micros))
+        else:
+            raise ValueError(
+                "Snapshot Timestamps be greater than the unix epoch.")
+
+        return self
+
     def set_timout_millis(self, millis):
         """
         Sets the timeout in milliseconds.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index b9022e0..9c9899f 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -456,6 +456,10 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         CLOSEST_REPLICA " kudu::client::KuduClient::CLOSEST_REPLICA"
         FIRST_REPLICA " kudu::client::KuduClient::FIRST_REPLICA"
 
+    enum ReadMode" kudu::client::KuduScanner::ReadMode":
+        ReadMode_Latest " kudu::client::KuduScanner::READ_LATEST"
+        ReadMode_Snapshot " kudu::client::KuduScanner::READ_AT_SNAPSHOT"
+
     cdef cppclass KuduClient:
 
         Status DeleteTable(const string& table_name)
@@ -479,6 +483,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         KuduTableAlterer* NewTableAlterer()
         Status IsAlterTableInProgress(const string& table_name,
                                       c_bool* alter_in_progress)
+        uint64_t GetLatestObservedTimestamp()
 
         shared_ptr[KuduSession] NewSession()
 
@@ -601,9 +606,6 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
 
         KuduClient* client()
 
-    enum ReadMode" kudu::client::KuduScanner::ReadMode":
-        READ_LATEST " kudu::client::KuduScanner::READ_LATEST"
-        READ_AT_SNAPSHOT " kudu::client::KuduScanner::READ_AT_SNAPSHOT"
 
     cdef cppclass KuduScanner:
         KuduScanner(KuduTable* table)
@@ -620,7 +622,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         Status SetSelection(ReplicaSelection selection)
 
         Status SetReadMode(ReadMode read_mode)
-        Status SetSnapshot(uint64_t snapshot_timestamp_micros)
+        Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
         Status SetTimeoutMillis(int millis)
         Status SetProjectedColumnNames(const vector[string]& col_names)
         Status SetProjectedColumnIndexes(const vector[int]& col_indexes)
@@ -651,7 +653,6 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         Status SetReadMode(ReadMode read_mode)
         Status SetFaultTolerant()
         Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
-        Status SetSnapshotRaw(uint64_t snapshot_timestamp)
         Status SetSelection(ReplicaSelection selection)
         Status SetTimeoutMillis(int millis)
         Status AddConjunctPredicate(KuduPredicate* pred)

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py
index e0fcd37..950f0dd 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -23,6 +23,7 @@ from kudu.tests.util import TestScanBase
 from kudu.tests.common import KuduTestBase
 import kudu
 import datetime
+import time
 
 
 class TestScanner(TestScanBase):
@@ -182,3 +183,34 @@ class TestScanner(TestScanBase):
         scanner = self.table.scanner()
         scanner.set_fault_tolerant().open()
         self.assertEqual(sorted(self.tuples), scanner.read_all_tuples())
+
+    def test_read_mode(self):
+        """
+        Test setting the read mode and scanning against a
+        snapshot and latest
+        """
+        # Delete row
+        self.delete_insert_row_for_read_test()
+
+        # Check scanner results prior to delete
+        scanner = self.table.scanner()
+        scanner.set_read_mode('snapshot')\
+            .set_snapshot(self.snapshot_timestamp)\
+            .open()
+
+        self.assertEqual(sorted(self.tuples[1:]), sorted(scanner.read_all_tuples()))
+
+        #Check scanner results after delete
+        timeout = time.time() + 10
+        check_tuples = []
+        while check_tuples != sorted(self.tuples):
+            if time.time() > timeout:
+                raise TimeoutError("Could not validate results in allocated" +
+                                   "time.")
+
+            scanner = self.table.scanner()
+            scanner.set_read_mode(kudu.READ_LATEST)\
+                .open()
+            check_tuples = sorted(scanner.read_all_tuples())
+            # Avoid tight looping
+            time.sleep(0.05)

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index a5ae256..5a37486 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -176,3 +176,36 @@ class TestScanToken(TestScanBase):
         # Serialize execute and verify
         self._subtest_serialize_thread_and_verify(builder.build(),
                                                   [self.tuples[98]])
+
+    def test_read_mode(self):
+        """
+        Test setting the read mode and scanning against a
+        snapshot and latest
+        """
+        # Delete row
+        self.delete_insert_row_for_read_test()
+
+        # Check scanner results prior to delete
+        builder = self.table.scan_token_builder()
+        tokens = builder.set_read_mode('snapshot') \
+            .set_snapshot(self.snapshot_timestamp) \
+            .build()
+
+        tuples = []
+        for token in tokens:
+            scanner = token.into_kudu_scanner().open()
+            tuples.extend(scanner.read_all_tuples())
+
+        self.assertEqual(sorted(self.tuples[1:]), sorted(tuples))
+
+        #Check scanner results after insterts
+        builder = self.table.scan_token_builder()
+        tokens = builder.set_read_mode(kudu.READ_LATEST) \
+            .build()
+
+        tuples = []
+        for token in tokens:
+            scanner = token.into_kudu_scanner().open()
+            tuples.extend(scanner.read_all_tuples())
+
+        self.assertEqual(sorted(self.tuples), sorted(tuples))

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py
index 39520e4..6a72a63 100644
--- a/python/kudu/tests/util.py
+++ b/python/kudu/tests/util.py
@@ -62,6 +62,12 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
         pass
 
     def insert_new_unixtime_micros_rows(self):
+        # Get current UTC datetime to be used for read at snapshot test
+        # Not using the easter time value below as that may not be the
+        # actual local timezone of the host executing this test and as
+        # such does would not accurately be offset to UTC
+        self.snapshot_timestamp = datetime.datetime.utcnow()
+
         # Insert new rows
         # Also test a timezone other than UTC to confirm that
         # conversion to UTC is properly applied
@@ -94,4 +100,31 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
             # Apply timezone
             list[3] = list[3].replace(tzinfo=pytz.utc)
             self.tuples.append(tuple(list))
+        session.flush()
+
+    def delete_insert_row_for_read_test(self):
+
+        # Retrive row to delete so it can be reinserted into the table so
+        # that other tests do not fail
+        row = self.table.scanner()\
+                    .set_fault_tolerant()\
+                    .open()\
+                    .read_all_tuples()[0]
+
+        # Delete row from table
+        session = self.client.new_session()
+        op = self.table.new_delete()
+        op['key'] = row[0]
+        session.apply(op)
+        session.flush()
+
+        # Get latest observed timestamp for snapshot
+        self.snapshot_timestamp = self.client.latest_observed_timestamp()
+
+        # Insert row back into table so that other tests don't fail.
+        session = self.client.new_session()
+        op = self.table.new_insert()
+        for idx, val in enumerate(row):
+            op[idx] = val
+        session.apply(op)
         session.flush()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/util.py b/python/kudu/util.py
index 603e0e0..8533b04 100644
--- a/python/kudu/util.py
+++ b/python/kudu/util.py
@@ -92,3 +92,18 @@ def from_unixtime_micros(unixtime_micros):
     else:
         raise ValueError("Invalid unixtime_micros value." +
                          "You must provide an integer value.")
+
+def from_hybridtime(hybridtime):
+    """
+    Convert a raw HybridTime value to a datetime in UTC.
+
+    Parameters
+    ----------
+    hybridtime : long
+
+    Returns
+    -------
+    timestamp : datetime.datetime in UTC
+    """
+    # Add 1 so the value is usable for snapshot scans
+    return from_unixtime_micros(int(hybridtime >> 12) + 1)


[5/8] kudu git commit: [java client] Rename NoLeaderMasterFoundException and reuse

Posted by to...@apache.org.
[java client] Rename NoLeaderMasterFoundException and reuse

I pushed d87486c too fast, Dan convinced me to reuse the exception.

Change-Id: I51ad20943fe8b4b808123dc8b7795215f3b12308
Reviewed-on: http://gerrit.cloudera.org:8080/4624
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f26ab7d5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f26ab7d5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f26ab7d5

Branch: refs/heads/master
Commit: f26ab7d5c28979b2cda9860faac1039920fd7a15
Parents: 07ce9fb
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Oct 4 16:30:54 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Oct 5 22:34:42 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  2 +-
 .../client/GetMasterRegistrationReceived.java   | 11 +++---
 .../kudu/client/NoLeaderFoundException.java     | 38 ++++++++++++++++++++
 .../client/NoLeaderMasterFoundException.java    | 37 -------------------
 .../kudu/client/NoSuitableReplicaException.java | 32 -----------------
 .../apache/kudu/client/TestAsyncKuduClient.java |  2 +-
 .../TestGetMasterRegistrationReceived.java      |  4 +--
 7 files changed, 47 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f26ab7d5/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 0c0d9bb..cedd111 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1410,7 +1410,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // sleep before retrying.
     TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
     if (!entry.isNonCoveredRange() && clientFor(entry.getTablet()) == null) {
-      throw new NoSuitableReplicaException(
+      throw new NoLeaderFoundException(
           Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/f26ab7d5/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
index 3f2c745..02410a8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
@@ -59,7 +59,6 @@ final class GetMasterRegistrationReceived {
   private final AtomicInteger countResponsesReceived = new AtomicInteger(0);
 
   // Exceptions received so far: kept for debugging purposes.
-  // (see: NoLeaderMasterFoundException#create() for how this is used).
   private final List<Exception> exceptionsReceived =
       Collections.synchronizedList(new ArrayList<Exception>());
 
@@ -102,7 +101,7 @@ final class GetMasterRegistrationReceived {
   /**
    * Checks if we've already received a response or an exception from every master that
    * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has been found
-   * (that is, 'responseD' was never called) -- pass a {@link NoLeaderMasterFoundException}
+   * (that is, 'responseD' was never called) -- pass a {@link NoLeaderFoundException}
    * to responseD.
    */
   private void incrementCountAndCheckExhausted() {
@@ -142,7 +141,7 @@ final class GetMasterRegistrationReceived {
             LOG.warn(String.format(
                 "None of the provided masters (%s) is a leader, will retry.",
                 allHosts));
-            ex = new NoLeaderMasterFoundException(Status.ServiceUnavailable(message));
+            ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
           } else {
             LOG.warn(String.format(
                 "Unable to find the leader master (%s), will retry",
@@ -151,7 +150,7 @@ final class GetMasterRegistrationReceived {
                 Joiner.on(",").join(Lists.transform(
                     exceptionsReceived, Functions.toStringFunction()));
             Status s = Status.ServiceUnavailable(joinedMsg);
-            ex = new NoLeaderMasterFoundException(s,
+            ex = new NoLeaderFoundException(s,
                 exceptionsReceived.get(exceptionsReceived.size() - 1));
           }
           responseD.callback(ex);
@@ -166,7 +165,7 @@ final class GetMasterRegistrationReceived {
    * the callback in 'responseD' is invoked with an initialized GetTableLocationResponsePB
    * object containing the leader's RPC address.
    * If the master is not a leader, increment 'countResponsesReceived': if the count equals to
-   * the number of masters, pass {@link NoLeaderMasterFoundException} into
+   * the number of masters, pass {@link NoLeaderFoundException} into
    * 'responseD' if no one else had called 'responseD' before; otherwise, do nothing.
    */
   final class GetMasterRegistrationCB implements Callback<Void, GetMasterRegistrationResponse> {
@@ -220,7 +219,7 @@ final class GetMasterRegistrationReceived {
    * Errback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
    * Stores each exception in 'exceptionsReceived'. Increments 'countResponseReceived': if
    * the count is equal to the number of masters and no one else had called 'responseD' before,
-   * pass a {@link NoLeaderMasterFoundException} into 'responseD'; otherwise, do
+   * pass a {@link NoLeaderFoundException} into 'responseD'; otherwise, do
    * nothing.
    */
   final class GetMasterRegistrationErrCB implements Callback<Void, Exception> {

http://git-wip-us.apache.org/repos/asf/kudu/blob/f26ab7d5/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderFoundException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderFoundException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderFoundException.java
new file mode 100644
index 0000000..86aeccf
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderFoundException.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+
+import java.util.List;
+
+/**
+ * Indicates that the request failed because we couldn't find a leader. It is retried as long
+ * as the original call hasn't timed out.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class NoLeaderFoundException extends RecoverableException {
+
+  NoLeaderFoundException(Status status) {
+    super(status);
+  }
+  NoLeaderFoundException(Status status, Exception cause) {
+    super(status, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/f26ab7d5/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderMasterFoundException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderMasterFoundException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderMasterFoundException.java
deleted file mode 100644
index 587741e..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/NoLeaderMasterFoundException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.client;
-
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.annotations.InterfaceStability;
-
-import java.util.List;
-
-/**
- * Indicates that the request failed because we couldn't find a leader master server.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-final class NoLeaderMasterFoundException extends RecoverableException {
-
-  NoLeaderMasterFoundException(Status status) {
-    super(status);
-  }
-  NoLeaderMasterFoundException(Status status, Exception cause) {
-    super(status, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/f26ab7d5/java/kudu-client/src/main/java/org/apache/kudu/client/NoSuitableReplicaException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/NoSuitableReplicaException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/NoSuitableReplicaException.java
deleted file mode 100644
index e51a346..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/NoSuitableReplicaException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.client;
-
-/**
- * Indicates that the master lookup failed because no suitable replicas were found for
- * the given RPC.
- */
-final class NoSuitableReplicaException extends RecoverableException {
-
-  NoSuitableReplicaException(Status status) {
-    super(status);
-  }
-
-  NoSuitableReplicaException(Status status, Exception cause) {
-    super(status, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/f26ab7d5/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 1511221..a91124d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -192,7 +192,7 @@ public class TestAsyncKuduClient extends BaseKuduTest {
     try {
       client.discoverTablets(table, new byte[0], tabletLocations, 1000);
       fail("discoverTablets should throw an exception if there's no leader");
-    } catch (NoSuitableReplicaException ex) {
+    } catch (NoLeaderFoundException ex) {
       // Expected.
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/f26ab7d5/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
index 8570668..3882acd 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
@@ -46,8 +46,8 @@ public class TestGetMasterRegistrationReceived {
         Status.RuntimeError(""));
     RecoverableException reusableRE = new RecoverableException(
         Status.RuntimeError(""));
-    NoLeaderMasterFoundException retryResponse =
-        new NoLeaderMasterFoundException(Status.RuntimeError(""));
+    NoLeaderFoundException retryResponse =
+        new NoLeaderFoundException(Status.RuntimeError(""));
     // We don't test for a particular good response, so as long as we pass something that's not an
     // exception to runTest() we're good.
     Object successResponse = new Object();


[2/8] kudu git commit: raft_consensus: clean up overrides, protected methods

Posted by to...@apache.org.
raft_consensus: clean up overrides, protected methods

RaftConsensus used to have a subclass mock in raft_consensus-test, but
that test is now removed. Given this, we can make some
previously-protected methods private and non-virtual.

I also took this opportunity to modernize the 'override' style to C++11,
and also to remove a function which was essentially duplicate code.

Change-Id: Ib52d3ac40ed68a4ff8b1738897f6ef62f94a843b
Reviewed-on: http://gerrit.cloudera.org:8080/4620
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bda9b94e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bda9b94e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bda9b94e

Branch: refs/heads/master
Commit: bda9b94e88b2602dcc029e763c8a9e0eecff8e08
Parents: 4bcbb4a
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Oct 4 14:34:10 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 21:41:36 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc            |  9 +-
 src/kudu/consensus/raft_consensus.h             | 98 +++++++++-----------
 .../consensus/raft_consensus_quorum-test.cc     |  2 +-
 3 files changed, 45 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bda9b94e/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 0cb724e..3e4a89b 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1511,12 +1511,6 @@ void RaftConsensus::Shutdown() {
   shutdown_.Store(true, kMemOrderRelease);
 }
 
-RaftPeerPB::Role RaftConsensus::GetActiveRole() const {
-  ReplicaState::UniqueLock lock;
-  CHECK_OK(state_->LockForRead(&lock));
-  return state_->GetActiveRoleUnlocked();
-}
-
 OpId RaftConsensus::GetLatestOpIdFromLog() {
   OpId id;
   log_->GetLatestEntryOpId(&id);
@@ -1675,8 +1669,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
 RaftPeerPB::Role RaftConsensus::role() const {
   ReplicaState::UniqueLock lock;
   CHECK_OK(state_->LockForRead(&lock));
-  return GetConsensusRole(state_->GetPeerUuid(),
-                          state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE));
+  return state_->GetActiveRoleUnlocked();
 }
 
 std::string RaftConsensus::LogPrefixUnlocked() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/bda9b94e/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 887201b..07ef186 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -91,69 +91,64 @@ class RaftConsensus : public Consensus,
 
   virtual ~RaftConsensus();
 
-  virtual Status Start(const ConsensusBootstrapInfo& info) OVERRIDE;
+  Status Start(const ConsensusBootstrapInfo& info) override;
 
-  virtual bool IsRunning() const OVERRIDE;
+  bool IsRunning() const override;
 
   // Emulates an election by increasing the term number and asserting leadership
   // in the configuration by sending a NO_OP to other peers.
   // This is NOT safe to use in a distributed configuration with failure detection
   // enabled, as it could result in a split-brain scenario.
-  virtual Status EmulateElection() OVERRIDE;
+  Status EmulateElection() override;
 
-  virtual Status StartElection(ElectionMode mode) OVERRIDE;
+  Status StartElection(ElectionMode mode) override;
 
-  virtual Status WaitUntilLeaderForTests(const MonoDelta& timeout) OVERRIDE;
+  Status WaitUntilLeaderForTests(const MonoDelta& timeout) override;
 
-  virtual Status StepDown(LeaderStepDownResponsePB* resp) OVERRIDE;
+  Status StepDown(LeaderStepDownResponsePB* resp) override;
 
   // Call StartElection(), log a warning if the call fails (usually due to
   // being shut down).
   void ReportFailureDetected(const std::string& name, const Status& msg);
 
-  virtual Status Replicate(const scoped_refptr<ConsensusRound>& round) OVERRIDE;
+  Status Replicate(const scoped_refptr<ConsensusRound>& round) override;
 
-  virtual Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) OVERRIDE;
+  Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) override;
 
-  virtual Status Update(const ConsensusRequestPB* request,
-                        ConsensusResponsePB* response) OVERRIDE;
+  Status Update(const ConsensusRequestPB* request,
+                ConsensusResponsePB* response) override;
 
-  virtual Status RequestVote(const VoteRequestPB* request,
-                             VoteResponsePB* response) OVERRIDE;
+  Status RequestVote(const VoteRequestPB* request,
+                     VoteResponsePB* response) override;
 
-  virtual Status ChangeConfig(const ChangeConfigRequestPB& req,
-                              const StatusCallback& client_cb,
-                              boost::optional<tserver::TabletServerErrorPB::Code>* error_code)
-                              OVERRIDE;
+  Status ChangeConfig(const ChangeConfigRequestPB& req,
+                      const StatusCallback& client_cb,
+                      boost::optional<tserver::TabletServerErrorPB::Code>* error_code) override;
 
-  virtual RaftPeerPB::Role role() const OVERRIDE;
+  Status GetLastOpId(OpIdType type, OpId* id) override;
 
-  virtual std::string peer_uuid() const OVERRIDE;
+  RaftPeerPB::Role role() const override;
 
-  virtual std::string tablet_id() const OVERRIDE;
+  std::string peer_uuid() const override;
 
-  virtual ConsensusStatePB ConsensusState(ConsensusConfigType type) const OVERRIDE;
+  std::string tablet_id() const override;
 
-  virtual RaftConfigPB CommittedConfig() const OVERRIDE;
+  ConsensusStatePB ConsensusState(ConsensusConfigType type) const override;
 
-  virtual void DumpStatusHtml(std::ostream& out) const OVERRIDE;
+  RaftConfigPB CommittedConfig() const override;
 
-  virtual void Shutdown() OVERRIDE;
+  void DumpStatusHtml(std::ostream& out) const override;
 
-  // Makes this peer advance it's term (and step down if leader), for tests.
-  virtual Status AdvanceTermForTests(int64_t new_term);
+  void Shutdown() override;
 
-  // Return the active (as opposed to committed) role.
-  RaftPeerPB::Role GetActiveRole() const;
+  // Makes this peer advance it's term (and step down if leader), for tests.
+  Status AdvanceTermForTests(int64_t new_term);
 
   // Returns the replica state for tests. This should never be used outside of
   // tests, in particular calling the LockFor* methods on the returned object
   // can cause consensus to deadlock.
   ReplicaState* GetReplicaStateForTests();
 
-  virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE;
-
-
   //------------------------------------------------------------
   // PeerMessageQueueObserver implementation
   //------------------------------------------------------------
@@ -171,25 +166,6 @@ class RaftConsensus : public Consensus,
 
   log::RetentionIndexes GetRetentionIndexes() override;
 
- protected:
-  // Trigger that a non-Transaction ConsensusRound has finished replication.
-  // If the replication was successful, an status will be OK. Otherwise, it
-  // may be Aborted or some other error status.
-  // If 'status' is OK, write a Commit message to the local WAL based on the
-  // type of message it is.
-  // The 'client_cb' will be invoked at the end of this execution.
-  virtual void NonTxRoundReplicationFinished(ConsensusRound* round,
-                                             const StatusCallback& client_cb,
-                                             const Status& status);
-
-  // As a leader, append a new ConsensusRound to the queue.
-  // Only virtual and protected for mocking purposes.
-  virtual Status AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round);
-
-  // As a follower, start a consensus round not associated with a Transaction.
-  // Only virtual and protected for mocking purposes.
-  virtual Status StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg);
-
  private:
   friend class ReplicaState;
   friend class RaftConsensusQuorumTest;
@@ -288,12 +264,6 @@ class RaftConsensus : public Consensus,
   // and also truncate the LogCache accordingly.
   void TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index);
 
-  // Pushes a new Raft configuration to a majority of peers. Contrary to write operations,
-  // this actually waits for the commit round to reach a majority of peers, so that we know
-  // we can proceed. If this returns Status::OK(), a majority of peers have accepted the new
-  // configuration. The peer cannot perform any additional operations until this succeeds.
-  Status PushConfigurationToPeersUnlocked(const RaftConfigPB& new_config);
-
   // Returns the most recent OpId written to the Log.
   OpId GetLatestOpIdFromLog();
 
@@ -432,6 +402,24 @@ class RaftConsensus : public Consensus,
                              const RaftConfigPB& committed_config,
                              const std::string& reason);
 
+  // Trigger that a non-Transaction ConsensusRound has finished replication.
+  // If the replication was successful, an status will be OK. Otherwise, it
+  // may be Aborted or some other error status.
+  // If 'status' is OK, write a Commit message to the local WAL based on the
+  // type of message it is.
+  // The 'client_cb' will be invoked at the end of this execution.
+  void NonTxRoundReplicationFinished(ConsensusRound* round,
+                                     const StatusCallback& client_cb,
+                                     const Status& status);
+
+  // As a leader, append a new ConsensusRound to the queue.
+  // Only virtual and protected for mocking purposes.
+  Status AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round);
+
+  // As a follower, start a consensus round not associated with a Transaction.
+  // Only virtual and protected for mocking purposes.
+  Status StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg);
+
   // Threadpool for constructing requests to peers, handling RPC callbacks,
   // etc.
   gscoped_ptr<ThreadPool> thread_pool_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bda9b94e/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 9d63266..362a230 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -78,7 +78,7 @@ void DoNothing(const string& s) {
 Status WaitUntilLeaderForTests(RaftConsensus* raft) {
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(15);
   while (MonoTime::Now() < deadline) {
-    if (raft->GetActiveRole() == RaftPeerPB::LEADER) {
+    if (raft->role() == RaftPeerPB::LEADER) {
       return Status::OK();
     }
     SleepFor(MonoDelta::FromMilliseconds(10));


[8/8] kudu git commit: KUDU-1669. Java client tests leak orphan processes (part 2)

Posted by to...@apache.org.
KUDU-1669. Java client tests leak orphan processes (part 2)

In some cases we weren't waiting for the processes to actually finish.

Change-Id: I61d33ca2339048a51acfbb35f5b71e827d3a47f7
Reviewed-on: http://gerrit.cloudera.org:8080/4636
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bce1dd77
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bce1dd77
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bce1dd77

Branch: refs/heads/master
Commit: bce1dd7772c28a5521d97ffbcf02ee17216174a3
Parents: 3c33847
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Oct 5 14:55:13 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Oct 5 23:46:16 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/MiniKuduCluster.java | 36 +++++++++++++++-----
 1 file changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bce1dd77/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 386ce9e..22e4bfc 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -294,8 +294,7 @@ public class MiniKuduCluster implements AutoCloseable {
       return;
     }
     LOG.info("Killing server at port " + port);
-    ts.destroy();
-    ts.waitFor();
+    destroyAndWaitForProcess(ts);
   }
 
   /**
@@ -304,8 +303,7 @@ public class MiniKuduCluster implements AutoCloseable {
    */
   public void killTabletServers() throws InterruptedException {
     for (Process tserver : tserverProcesses.values()) {
-      tserver.destroy();
-      tserver.waitFor();
+      destroyAndWaitForProcess(tserver);
     }
     tserverProcesses.clear();
   }
@@ -333,8 +331,7 @@ public class MiniKuduCluster implements AutoCloseable {
       return;
     }
     LOG.info("Killing master at port " + port);
-    master.destroy();
-    master.waitFor();
+    destroyAndWaitForProcess(master);
   }
 
   /**
@@ -351,15 +348,31 @@ public class MiniKuduCluster implements AutoCloseable {
    */
   public void shutdown() {
     for (Iterator<Process> masterIter = masterProcesses.values().iterator(); masterIter.hasNext(); ) {
-      masterIter.next().destroy();
+      try {
+        destroyAndWaitForProcess(masterIter.next());
+      } catch (InterruptedException e) {
+        // Need to continue cleaning up.
+      }
       masterIter.remove();
     }
+
     for (Iterator<Process> tsIter = tserverProcesses.values().iterator(); tsIter.hasNext(); ) {
-      tsIter.next().destroy();
+      try {
+        destroyAndWaitForProcess(tsIter.next());
+      } catch (InterruptedException e) {
+        // Need to continue cleaning up.
+      }
       tsIter.remove();
     }
+
+    // Whether we were interrupted or not above we still destroyed all the processes, so the input
+    // printers will hit EOFs and stop.
     for (Thread thread : PROCESS_INPUT_PRINTERS) {
-      thread.interrupt();
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        // Need to continue cleaning up.
+      }
     }
 
     for (String path : pathsToDelete) {
@@ -376,6 +389,11 @@ public class MiniKuduCluster implements AutoCloseable {
     }
   }
 
+  private void destroyAndWaitForProcess(Process process) throws InterruptedException {
+    process.destroy();
+    process.waitFor();
+  }
+
   /**
    * Returns the comma-separated list of master addresses.
    * @return master addresses