You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/01/10 16:51:49 UTC

[1/2] kudu git commit: Fixup DeltaStats::ToString

Repository: kudu
Updated Branches:
  refs/heads/master 8bcc80eec -> f2adee004


Fixup DeltaStats::ToString

Simplifies the implementation and fixes the mismatched delimiters.

Change-Id: I75f578fa5a858609107e51779fb4e272f6c57a13
Reviewed-on: http://gerrit.cloudera.org:8080/8988
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/80eca49d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/80eca49d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/80eca49d

Branch: refs/heads/master
Commit: 80eca49d5e03c4515ff58ea0e92376193482739f
Parents: 8bcc80e
Author: Dan Burkert <da...@apache.org>
Authored: Tue Jan 9 15:54:58 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jan 10 01:56:27 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_stats.cc | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/80eca49d/src/kudu/tablet/delta_stats.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_stats.cc b/src/kudu/tablet/delta_stats.cc
index 0db1343..b0e83fe 100644
--- a/src/kudu/tablet/delta_stats.cc
+++ b/src/kudu/tablet/delta_stats.cc
@@ -95,21 +95,17 @@ Status DeltaStats::UpdateStats(const Timestamp& timestamp,
 }
 
 string DeltaStats::ToString() const {
-  string ret = strings::Substitute(
-      "ts range=[$0, $1]",
+  return strings::Substitute(
+      "ts range=[$0, $1], delete_count=[$2], reinsert_count=[$3], update_counts_by_col_id=[$4]",
       min_timestamp_.ToString(),
-      max_timestamp_.ToString());
-  ret.append(Substitute(", delete_count=[$0]", delete_count_));
-  ret.append(Substitute(", reinsert_count=[$0]", reinsert_count_));
-  ret.append(", update_counts_by_col_id=[");
-  ret.append(JoinKeysAndValuesIterator(update_counts_by_col_id_.begin(),
-                                       update_counts_by_col_id_.end(),
-                                       ":", ","));
-  ret.append(")");
-  return ret;
+      max_timestamp_.ToString(),
+      delete_count_,
+      reinsert_count_,
+      JoinKeysAndValuesIterator(update_counts_by_col_id_.begin(),
+                                update_counts_by_col_id_.end(),
+                                ":", ","));
 }
 
-
 void DeltaStats::ToPB(DeltaStatsPB* pb) const {
   pb->Clear();
   pb->set_delete_count(delete_count_);


[2/2] kudu git commit: KUDU-2115: avoid compacting already-compacted rowsets

Posted by da...@apache.org.
KUDU-2115: avoid compacting already-compacted rowsets

Tablets will perform compaction selection on a copy of the currently
available rowsets in order to avoid holding the component_lock_ for the
duration of rowset selection.

It is then verified that nothing else compacted the selected rowsets by
iterating over the selected rowsets and checking that they still exist
to be compacted.

However, the initial selection of rowsets is performed on a snapshotted
copy of the available rowsets, and in between the snapshot of the
rowsets and the verification, the component_lock_ is dropped, allowing
the following race:

T1: runs PickRowSetsToCompact, picks {A, B}, begins compaction
T2: runs PickRowSetsToCompact:
T2:   snapshots the rowset tree (which still contains {A, B})
T2:   drops component_lock_ after taking the snapshot
T1: finishes compaction and removes {A, B} from the rowset tree,
    unlocking each rowset's compact_flush_lock_
T2: iterates over the rowset tree and sees {A, B} as
    IsAvailableForCompaction(), since they have been unlocked
T2: selects {A, B} as a part of its compaction
T2: grabs the component_lock_ to verify that the selected rowsets are
    still available
T2: sees that some of the selected rowsets would not be returned because
    they are missing from the currently available rowsets (DFATALs and
    aborts the compaction)

I verified the diagnosis by placing a random sleep just after making the
copy in PickRowSetsToCompact() and seeing
TabletServerDiskFailureTest.TestRandomOpSequence, which schedules many
compactions, fail consistently with the failed verification. Upon making
the fix, this passes.

This can lead to scheduling a fairly useless compaction. This patch
adds an API to RowSets to mark itself as compacted, and ensure that when
selecting rowsets to compact, check that rowset candidates have not
already been compacted.

To verify the solution, I've added a small test that schedules several
concurrent compactions. Upon adding the aforementioned randomized
delay and removing a couple sanity D/CHECKs (including the above
verification), I saw the test fail, leading to an unexpected number of
rows left in the tablet.

Change-Id: I4dab330d61facb18717f6faf179f9b94a9e55236
Reviewed-on: http://gerrit.cloudera.org:8080/8859
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f2adee00
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f2adee00
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f2adee00

Branch: refs/heads/master
Commit: f2adee0046c553ca2a697b661da11846d34559fd
Parents: 80eca49
Author: Andrew Wong <aw...@cloudera.com>
Authored: Fri Dec 15 18:31:52 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jan 10 01:58:43 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/compaction-test.cc | 47 +++++++++++++++++++
 src/kudu/tablet/diskrowset.cc      |  4 +-
 src/kudu/tablet/diskrowset.h       | 64 ++++++++++++++-----------
 src/kudu/tablet/memrowset-test.cc  |  4 +-
 src/kudu/tablet/memrowset.cc       |  4 +-
 src/kudu/tablet/memrowset.h        | 82 ++++++++++++++++++---------------
 src/kudu/tablet/mock-rowsets.h     |  7 +++
 src/kudu/tablet/rowset.h           | 16 ++++++-
 src/kudu/tablet/tablet.cc          | 19 ++++----
 9 files changed, 170 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/compaction-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 50a4786..e18d966 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -23,6 +23,7 @@
 #include <numeric>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <utility>
 #include <vector>
 
@@ -90,6 +91,8 @@ DECLARE_string(block_manager);
 
 using std::shared_ptr;
 using std::string;
+using std::thread;
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
@@ -1060,6 +1063,50 @@ TEST_F(TestCompaction, BenchmarkMergeWithOverlap) {
 }
 #endif
 
+// Test for KUDU-2115 to ensure that compaction selection will correctly pick
+// rowsets that exist in the rowset tree (i.e. rowsets that are removed by
+// concurrent compactions are not considered).
+//
+// Failure of this test may not necessarily mean that a compaction of the
+// single rowset will occur, but rather that a potentially sub-optimal
+// compaction may be scheduled.
+TEST_F(TestCompaction, TestConcurrentCompactionRowSetPicking) {
+  LocalTabletWriter writer(tablet().get(), &client_schema());
+  KuduPartialRow row(&client_schema());
+  const int kNumRowSets = 3;
+  const int kNumRowsPerRowSet = 2;
+  const int kExpectedRows = kNumRowSets * kNumRowsPerRowSet;
+
+  // Flush a few overlapping rowsets.
+  for (int i = 0; i < kNumRowSets; i++) {
+    for (int j = 0; j < kNumRowsPerRowSet; j++) {
+      const int val = i + j * 10;
+      ASSERT_OK(row.SetStringCopy("key", Substitute("hello $0", val)));
+      ASSERT_OK(row.SetInt32("val", val));
+      ASSERT_OK(writer.Insert(row));
+    }
+    ASSERT_OK(tablet()->Flush());
+  }
+  uint64_t num_rows;
+  ASSERT_OK(tablet()->CountRows(&num_rows));
+  ASSERT_EQ(kExpectedRows, num_rows);
+
+  // Schedule multiple compactions on the tablet at once. Concurrent
+  // compactions should not schedule the same rowsets for compaction, and we
+  // should end up with the same number of rows.
+  vector<unique_ptr<thread>> threads;
+  for (int i = 0; i < 10; i++) {
+    threads.emplace_back(new thread([&] {
+      ASSERT_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
+    }));
+  }
+  for (int i = 0; i < 10; i++) {
+    threads[i]->join();
+  }
+  ASSERT_OK(tablet()->CountRows(&num_rows));
+  ASSERT_EQ(kExpectedRows, num_rows);
+}
+
 TEST_F(TestCompaction, TestCompactionFreesDiskSpace) {
   {
     // We must force the LocalTabletWriter out of scope before measuring

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/diskrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index adabb79..63909a2 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -39,6 +39,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
 #include "kudu/tablet/cfile_set.h"
 #include "kudu/tablet/compaction.h"
 #include "kudu/tablet/delta_compaction.h"
@@ -517,7 +518,8 @@ DiskRowSet::DiskRowSet(shared_ptr<RowSetMetadata> rowset_metadata,
     : rowset_metadata_(std::move(rowset_metadata)),
       open_(false),
       log_anchor_registry_(log_anchor_registry),
-      mem_trackers_(std::move(mem_trackers)) {}
+      mem_trackers_(std::move(mem_trackers)),
+      has_been_compacted_(false) {}
 
 Status DiskRowSet::Open() {
   TRACE_EVENT0("tablet", "DiskRowSet::Open");

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/diskrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index eb64dd5..f5739d5 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -18,10 +18,9 @@
 // A DiskRowSet is a horizontal slice of a Kudu tablet.
 // Each DiskRowSet contains data for a a disjoint set of keys.
 // See src/kudu/tablet/README for a detailed description.
+#pragma once
 
-#ifndef KUDU_TABLET_DISKROWSET_H_
-#define KUDU_TABLET_DISKROWSET_H_
-
+#include <atomic>
 #include <cstddef>
 #include <cstdint>
 #include <memory>
@@ -39,7 +38,6 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_tracker.h"
@@ -322,12 +320,12 @@ class DiskRowSet : public RowSet {
   ////////////////////////////////////////////////////////////
 
   // Flush all accumulated delta data to disk.
-  Status FlushDeltas() OVERRIDE;
+  Status FlushDeltas() override;
 
   // Perform delta store minor compaction.
   // This compacts the delta files down to a single one.
   // If there is already only a single delta file, this does nothing.
-  Status MinorCompactDeltaStores() OVERRIDE;
+  Status MinorCompactDeltaStores() override;
 
   ////////////////////////////////////////////////////////////
   // RowSet implementation
@@ -345,11 +343,11 @@ class DiskRowSet : public RowSet {
                    const RowChangeList &update,
                    const consensus::OpId& op_id,
                    ProbeStats* stats,
-                   OperationResultPB* result) OVERRIDE;
+                   OperationResultPB* result) override;
 
   Status CheckRowPresent(const RowSetKeyProbe &probe,
                          bool *present,
-                         ProbeStats* stats) const OVERRIDE;
+                         ProbeStats* stats) const override;
 
   ////////////////////
   // Read functions.
@@ -357,64 +355,72 @@ class DiskRowSet : public RowSet {
   virtual Status NewRowIterator(const Schema *projection,
                                 const MvccSnapshot &mvcc_snap,
                                 OrderMode order,
-                                gscoped_ptr<RowwiseIterator>* out) const OVERRIDE;
+                                gscoped_ptr<RowwiseIterator>* out) const override;
 
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot &snap,
-                                    gscoped_ptr<CompactionInput>* out) const OVERRIDE;
+                                    gscoped_ptr<CompactionInput>* out) const override;
 
   // Count the number of rows in this rowset.
-  Status CountRows(rowid_t *count) const OVERRIDE;
+  Status CountRows(rowid_t *count) const override;
 
   // See RowSet::GetBounds(...)
   virtual Status GetBounds(std::string* min_encoded_key,
-                           std::string* max_encoded_key) const OVERRIDE;
+                           std::string* max_encoded_key) const override;
 
   void GetDiskRowSetSpaceUsage(DiskRowSetSpace* drss) const;
 
-  uint64_t OnDiskSize() const OVERRIDE;
+  uint64_t OnDiskSize() const override;
 
-  uint64_t OnDiskBaseDataSize() const OVERRIDE;
+  uint64_t OnDiskBaseDataSize() const override;
 
-  uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE;
+  uint64_t OnDiskBaseDataSizeWithRedos() const override;
 
-  size_t DeltaMemStoreSize() const OVERRIDE;
+  size_t DeltaMemStoreSize() const override;
 
-  bool DeltaMemStoreEmpty() const OVERRIDE;
+  bool DeltaMemStoreEmpty() const override;
 
-  int64_t MinUnflushedLogIndex() const OVERRIDE;
+  int64_t MinUnflushedLogIndex() const override;
 
   size_t CountDeltaStores() const;
 
-  double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const OVERRIDE;
+  double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const override;
 
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
-                                                     int64_t* bytes) OVERRIDE;
+                                                     int64_t* bytes) override;
 
   Status InitUndoDeltas(Timestamp ancient_history_mark,
                         MonoTime deadline,
                         int64_t* delta_blocks_initialized,
-                        int64_t* bytes_in_ancient_undos) OVERRIDE;
+                        int64_t* bytes_in_ancient_undos) override;
 
   Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
-                                 int64_t* blocks_deleted, int64_t* bytes_deleted) OVERRIDE;
+                                 int64_t* blocks_deleted, int64_t* bytes_deleted) override;
 
   // Major compacts all the delta files for all the columns.
   Status MajorCompactDeltaStores(HistoryGcOpts history_gc_opts);
 
-  std::mutex *compact_flush_lock() OVERRIDE {
+  std::mutex *compact_flush_lock() override {
     return &compact_flush_lock_;
   }
 
+  bool has_been_compacted() const override {
+    return has_been_compacted_.load();
+  }
+
+  void set_has_been_compacted() override {
+    has_been_compacted_.store(true);
+  }
+
   DeltaTracker *delta_tracker() {
     return DCHECK_NOTNULL(delta_tracker_.get());
   }
 
-  std::shared_ptr<RowSetMetadata> metadata() OVERRIDE {
+  std::shared_ptr<RowSetMetadata> metadata() override {
     return rowset_metadata_;
   }
 
-  std::string ToString() const OVERRIDE {
+  std::string ToString() const override {
     return rowset_metadata_->ToString();
   }
 
@@ -425,7 +431,7 @@ class DiskRowSet : public RowSet {
         ToString());
   }
 
-  virtual Status DebugDump(std::vector<std::string> *lines = NULL) OVERRIDE;
+  virtual Status DebugDump(std::vector<std::string> *lines) override;
 
  private:
   FRIEND_TEST(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns);
@@ -468,10 +474,12 @@ class DiskRowSet : public RowSet {
   // no other compactor will attempt to include this rowset.
   std::mutex compact_flush_lock_;
 
+  // Flag indicating whether the rowset has been removed from a rowset tree,
+  // and thus should not be scheduled for further compactions.
+  std::atomic<bool> has_been_compacted_;
+
   DISALLOW_COPY_AND_ASSIGN(DiskRowSet);
 };
 
 } // namespace tablet
 } // namespace kudu
-
-#endif // KUDU_TABLET_DISKROWSET_H_

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/memrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index 6f83971..6d7aaf3 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -504,7 +504,7 @@ TEST_F(TestMemRowSet, TestInsertionMVCC) {
     snapshots.emplace_back(mvcc_);
   }
   LOG(INFO) << "MemRowSet after inserts:";
-  ASSERT_OK(mrs->DebugDump());
+  ASSERT_OK(mrs->DebugDump(nullptr));
 
   ASSERT_EQ(5, snapshots.size());
   for (int i = 0; i < 5; i++) {
@@ -544,7 +544,7 @@ TEST_F(TestMemRowSet, TestUpdateMVCC) {
   }
 
   LOG(INFO) << "MemRowSet after updates:";
-  ASSERT_OK(mrs->DebugDump());
+  ASSERT_OK(mrs->DebugDump(nullptr));
 
   // Validate that each snapshot returns the expected value
   ASSERT_EQ(6, snapshots.size());

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/memrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 6c8551e..b08228d 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -39,6 +39,7 @@
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/move.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/compaction.h"
 #include "kudu/tablet/mutation.h"
@@ -119,7 +120,8 @@ MemRowSet::MemRowSet(int64_t id,
     tree_(arena_),
     debug_insert_count_(0),
     debug_update_count_(0),
-    anchorer_(log_anchor_registry, Substitute("MemRowSet-$0", id_)) {
+    anchorer_(log_anchor_registry, Substitute("MemRowSet-$0", id_)),
+    has_been_compacted_(false) {
   CHECK(schema.has_column_ids());
   ANNOTATE_BENIGN_RACE(&debug_insert_count_, "insert count isnt accurate");
   ANNOTATE_BENIGN_RACE(&debug_update_count_, "update count isnt accurate");

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/memrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index bcbb031..5d83a91 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -14,9 +14,9 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TABLET_MEMROWSET_H
-#define KUDU_TABLET_MEMROWSET_H
+#pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <cstring>
 #include <memory>
@@ -38,7 +38,6 @@
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/tablet/concurrent_btree.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset.h"
@@ -238,7 +237,7 @@ class MemRowSet : public RowSet,
                            const RowChangeList &delta,
                            const consensus::OpId& op_id,
                            ProbeStats* stats,
-                           OperationResultPB *result) OVERRIDE;
+                           OperationResultPB *result) override;
 
   // Return the number of entries in the memrowset.
   // NOTE: this requires iterating all data, and is thus
@@ -248,32 +247,40 @@ class MemRowSet : public RowSet,
   }
 
   // Conform entry_count to RowSet
-  Status CountRows(rowid_t *count) const OVERRIDE {
+  Status CountRows(rowid_t *count) const override {
     *count = entry_count();
     return Status::OK();
   }
 
   virtual Status GetBounds(std::string *min_encoded_key,
-                           std::string *max_encoded_key) const OVERRIDE;
+                           std::string *max_encoded_key) const override;
 
-  uint64_t OnDiskSize() const OVERRIDE {
+  uint64_t OnDiskSize() const override {
     return 0;
   }
 
-  uint64_t OnDiskBaseDataSize() const OVERRIDE {
+  uint64_t OnDiskBaseDataSize() const override {
     return 0;
   }
 
-  uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE {
+  uint64_t OnDiskBaseDataSizeWithRedos() const override {
     return 0;
   }
 
-  std::mutex *compact_flush_lock() OVERRIDE {
+  std::mutex *compact_flush_lock() override {
     return &compact_flush_lock_;
   }
 
+  bool has_been_compacted() const override {
+    return has_been_compacted_.load();
+  }
+
+  void set_has_been_compacted() override {
+    has_been_compacted_.store(true);
+  }
+
   // MemRowSets are never available for compaction, currently.
-  virtual bool IsAvailableForCompaction() OVERRIDE {
+  virtual bool IsAvailableForCompaction() override {
     return false;
   }
 
@@ -282,9 +289,9 @@ class MemRowSet : public RowSet,
     return tree_.empty();
   }
 
-  // TODO: unit test me
+  // TODO(todd): unit test me
   Status CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                         ProbeStats* stats) const OVERRIDE;
+                         ProbeStats* stats) const override;
 
   // Return the memory footprint of this memrowset.
   // Note that this may be larger than the sum of the data
@@ -300,7 +307,7 @@ class MemRowSet : public RowSet,
   // referring to this MemRowSet. Otherwise, this will throw
   // a C++ exception and all bets are off.
   //
-  // TODO: clarify the consistency of this iterator in the method doc
+  // TODO(todd): clarify the consistency of this iterator in the method doc
   Iterator *NewIterator() const;
   Iterator *NewIterator(const Schema *projection,
                         const MvccSnapshot &snap) const;
@@ -309,12 +316,12 @@ class MemRowSet : public RowSet,
   virtual Status NewRowIterator(const Schema* projection,
                                 const MvccSnapshot& snap,
                                 OrderMode order,
-                                gscoped_ptr<RowwiseIterator>* out) const OVERRIDE;
+                                gscoped_ptr<RowwiseIterator>* out) const override;
 
   // Create compaction input.
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot& snap,
-                                    gscoped_ptr<CompactionInput>* out) const OVERRIDE;
+                                    gscoped_ptr<CompactionInput>* out) const override;
 
   // Return the Schema for the rows in this memrowset.
    const Schema &schema() const {
@@ -330,7 +337,7 @@ class MemRowSet : public RowSet,
     return id_;
   }
 
-  std::shared_ptr<RowSetMetadata> metadata() OVERRIDE {
+  std::shared_ptr<RowSetMetadata> metadata() override {
     return std::shared_ptr<RowSetMetadata>(
         reinterpret_cast<RowSetMetadata *>(NULL));
   }
@@ -339,9 +346,9 @@ class MemRowSet : public RowSet,
   // If 'lines' is NULL, dumps to LOG(INFO).
   //
   // This dumps every row, so should only be used in tests, etc.
-  virtual Status DebugDump(std::vector<std::string> *lines = NULL) OVERRIDE;
+  virtual Status DebugDump(std::vector<std::string> *lines) override;
 
-  std::string ToString() const OVERRIDE {
+  std::string ToString() const override {
     return "memrowset";
   }
 
@@ -357,20 +364,21 @@ class MemRowSet : public RowSet,
     return debug_update_count_;
   }
 
-  size_t DeltaMemStoreSize() const OVERRIDE { return 0; }
+  size_t DeltaMemStoreSize() const override { return 0; }
 
-  bool DeltaMemStoreEmpty() const OVERRIDE { return true; }
+  bool DeltaMemStoreEmpty() const override { return true; }
 
-  int64_t MinUnflushedLogIndex() const OVERRIDE {
+  int64_t MinUnflushedLogIndex() const override {
     return anchorer_.minimum_log_index();
   }
 
-  double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const OVERRIDE {
+  double DeltaStoresCompactionPerfImprovementScore(
+      DeltaCompactionType /*type*/) const override {
     return 0;
   }
 
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
-                                                     int64_t* bytes) OVERRIDE {
+                                                     int64_t* bytes) override {
     DCHECK(bytes);
     *bytes = 0;
     return Status::OK();
@@ -379,22 +387,22 @@ class MemRowSet : public RowSet,
   Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                         MonoTime /*deadline*/,
                         int64_t* delta_blocks_initialized,
-                        int64_t* bytes_in_ancient_undos) OVERRIDE {
+                        int64_t* bytes_in_ancient_undos) override {
     if (delta_blocks_initialized) *delta_blocks_initialized = 0;
     if (bytes_in_ancient_undos) *bytes_in_ancient_undos = 0;
     return Status::OK();
   }
 
   Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
-                                 int64_t* blocks_deleted, int64_t* bytes_deleted) OVERRIDE {
+                                 int64_t* blocks_deleted, int64_t* bytes_deleted) override {
     if (blocks_deleted) *blocks_deleted = 0;
     if (bytes_deleted) *bytes_deleted = 0;
     return Status::OK();
   }
 
-  Status FlushDeltas() OVERRIDE { return Status::OK(); }
+  Status FlushDeltas() override { return Status::OK(); }
 
-  Status MinorCompactDeltaStores() OVERRIDE { return Status::OK(); }
+  Status MinorCompactDeltaStores() override { return Status::OK(); }
 
  private:
   friend class Iterator;
@@ -432,6 +440,10 @@ class MemRowSet : public RowSet,
 
   log::MinLogIndexAnchorer anchorer_;
 
+  // Flag indicating whether the rowset has been removed from a rowset tree,
+  // and thus should not be scheduled for further compactions.
+  std::atomic<bool> has_been_compacted_;
+
   DISALLOW_COPY_AND_ASSIGN(MemRowSet);
 };
 
@@ -451,11 +463,11 @@ class MemRowSet::Iterator : public RowwiseIterator {
 
   virtual ~Iterator();
 
-  virtual Status Init(ScanSpec *spec) OVERRIDE;
+  virtual Status Init(ScanSpec *spec) override;
 
   Status SeekAtOrAfter(const Slice &key, bool *exact);
 
-  virtual Status NextBlock(RowBlock *dst) OVERRIDE;
+  virtual Status NextBlock(RowBlock *dst) override;
 
   bool has_upper_bound() const {
     return exclusive_upper_bound_.is_initialized();
@@ -472,7 +484,7 @@ class MemRowSet::Iterator : public RowwiseIterator {
     return iter_->remaining_in_leaf();
   }
 
-  virtual bool HasNext() const OVERRIDE {
+  virtual bool HasNext() const override {
     DCHECK_NE(state_, kUninitialized) << "not initted";
     return state_ != kFinished && iter_->IsValid();
   }
@@ -498,15 +510,15 @@ class MemRowSet::Iterator : public RowwiseIterator {
     return iter_->Next();
   }
 
-  std::string ToString() const OVERRIDE {
+  std::string ToString() const override {
     return "memrowset iterator";
   }
 
-  const Schema& schema() const OVERRIDE {
+  const Schema& schema() const override {
     return *projection_;
   }
 
-  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
+  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const override {
     // Currently we do not expose any non-disk related statistics in
     // IteratorStats.  However, callers of GetIteratorStats expected
     // an IteratorStats object for every column; vector::resize() is
@@ -575,5 +587,3 @@ inline const Schema* MRSRow::schema() const {
 
 } // namespace tablet
 } // namespace kudu
-
-#endif

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/mock-rowsets.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index f41d1fc..f488302 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -92,6 +92,13 @@ class MockRowSet : public RowSet {
     LOG(FATAL) << "Unimplemented";
     return NULL;
   }
+  virtual bool has_been_compacted() const OVERRIDE {
+    LOG(FATAL) << "Unimplemented";
+    return false;
+  }
+  virtual void set_has_been_compacted() OVERRIDE {
+    LOG(FATAL) << "Unimplemented";
+  }
   virtual std::shared_ptr<RowSetMetadata> metadata() OVERRIDE {
     return NULL;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/rowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 45e55e7..4cc4252 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -228,9 +228,15 @@ class RowSet {
     // makes compaction selection at a time on a given Tablet due to
     // Tablet::compact_select_lock_.
     std::unique_lock<std::mutex> try_lock(*compact_flush_lock(), std::try_to_lock);
-    return try_lock.owns_lock();
+    return try_lock.owns_lock() && !has_been_compacted();
   }
 
+  // Checked while validating that a rowset is available for compaction.
+  virtual bool has_been_compacted() const = 0;
+
+  // Set after a compaction has completed to indicate that the rowset has been
+  // removed from the rowset tree and is thus longer available for compaction.
+  virtual void set_has_been_compacted() = 0;
 };
 
 // Used often enough, may as well typedef it.
@@ -370,6 +376,14 @@ class DuplicatingRowSet : public RowSet {
     return false;
   }
 
+  virtual bool has_been_compacted() const OVERRIDE {
+    return false;
+  }
+
+  virtual void set_has_been_compacted() OVERRIDE {
+    LOG(FATAL) << "Cannot be compacted";
+  }
+
   ~DuplicatingRowSet();
 
   size_t DeltaMemStoreSize() const OVERRIDE { return 0; }

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2adee00/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 5e0c805..287d64c 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1323,19 +1323,16 @@ Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked,
     picked->AddRowSet(rs, std::move(lock));
   }
 
-  // When we iterated through the current rowsets, we should have found all of the
-  // rowsets that we picked. If we didn't, that implies that some other thread swapped
-  // them out while we were making our selection decision -- that's not possible
-  // since we only picked rowsets that were marked as available for compaction.
+  // When we iterated through the current rowsets, we should have found all of
+  // the rowsets that we picked. If we didn't, that implies that some other
+  // thread swapped them out while we were making our selection decision --
+  // that's not possible since we only picked rowsets that were marked as
+  // available for compaction.
   if (!picked_set.empty()) {
     for (const RowSet* not_found : picked_set) {
       LOG_WITH_PREFIX(ERROR) << "Rowset selected for compaction but not available anymore: "
                              << not_found->ToString();
     }
-    // TODO(todd): this should never happen, but KUDU-1959 is a bug which causes us to
-    // sometimes concurrently decide to compact the same rowsets. It should be harmless
-    // to simply abort the compaction when we hit this bug, though long term we should
-    // fix the underlying race.
     const char* msg = "Was unable to find all rowsets selected for compaction";
     LOG_WITH_PREFIX(DFATAL) << msg;
     return Status::RuntimeError(msg);
@@ -1623,6 +1620,12 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas, mrs_being_flushed),
                         "Failed to flush new tablet metadata");
 
+  // Now that we've completed the operation, mark any rowsets that have been
+  // compacted, preventing them from being considered for future compactions.
+  for (const auto& rs : input.rowsets()) {
+    rs->set_has_been_compacted();
+  }
+
   // Replace the compacted rowsets with the new on-disk rowsets, making them visible now that
   // their metadata was written to disk.
   AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets);