You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/10/05 17:07:22 UTC

[1/4] kudu git commit: [python] - Expand gitignore for python

Repository: kudu
Updated Branches:
  refs/heads/master d87486c47 -> 07d190c3e


[python] - Expand gitignore for python

Adding additional exclusion rules for python to .gitignore.

Change-Id: I4b7bcac00a88678a555a4d85419c22decac23b58
Reviewed-on: http://gerrit.cloudera.org:8080/4616
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2a1a6c07
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2a1a6c07
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2a1a6c07

Branch: refs/heads/master
Commit: 2a1a6c073bfb3dfb0d310d07f11aab10641665b6
Parents: d87486c
Author: Jordan Birdsell <jo...@gmail.com>
Authored: Tue Oct 4 07:03:37 2016 -0400
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 04:41:04 2016 +0000

----------------------------------------------------------------------
 python/.gitignore | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2a1a6c07/python/.gitignore
----------------------------------------------------------------------
diff --git a/python/.gitignore b/python/.gitignore
index 2e78d86..4615a32 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -19,11 +19,14 @@
 *flymake*
 
 # Compiled source and in-place build files
+__pycache__
+*$py.class
 *.py[ocd]
 *.so
 .build_cache_dir
 .cache
 .eggs
+*.egg
 MANIFEST
 
 # Generated sources
@@ -42,5 +45,17 @@ dist
 .coverage
 coverage.xml
 
+# other distribution/packaging directories
+env/
+develop-eggs/
+downloads/
+eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+.installed.cfg
+
 # automatically generated during local development
 kudu/version.py


[4/4] kudu git commit: loadgen: allow configuring the number of buckets

Posted by to...@apache.org.
loadgen: allow configuring the number of buckets

Change-Id: Idf0ae3e95b90d38829dba67a9ccc2dfbe2b5b09b
Reviewed-on: http://gerrit.cloudera.org:8080/4625
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/07d190c3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/07d190c3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/07d190c3

Branch: refs/heads/master
Commit: 07d190c3e47661e762c56c8405370e37b9e989ac
Parents: b70d362
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Oct 4 17:16:41 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 17:06:13 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/tool_action_test.cc | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/07d190c3/src/kudu/tools/tool_action_test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
index f528f38..29b8950 100644
--- a/src/kudu/tools/tool_action_test.cc
+++ b/src/kudu/tools/tool_action_test.cc
@@ -193,6 +193,9 @@ DEFINE_string(table_name, "",
               "an already existing table, it's highly recommended to use a "
               "dedicated table created just for testing purposes: "
               "the existing table nor its data is never dropped/deleted.");
+DEFINE_int32(num_buckets, 8,
+             "The number of buckets to create when this tool creates a new table.");
+
 DEFINE_bool(use_random, false,
             "Whether to use random numbers instead of sequential ones. "
             "In case of using random numbers collisions are possible over "
@@ -495,7 +498,7 @@ Status TestLoadGenerator(const RunnerContext& context) {
     RETURN_NOT_OK(table_creator->table_name(table_name)
                   .schema(&schema)
                   .num_replicas(1)
-                  .add_hash_partitions(vector<string>({ kKeyColumnName }), 8)
+                  .add_hash_partitions(vector<string>({ kKeyColumnName }), FLAGS_num_buckets)
                   .wait(true)
                   .Create());
   }
@@ -570,6 +573,7 @@ unique_ptr<Mode> BuildTestMode() {
       .AddOptionalParameter("string_fixed")
       .AddOptionalParameter("string_len")
       .AddOptionalParameter("table_name")
+      .AddOptionalParameter("num_buckets")
       .AddOptionalParameter("use_random")
       .Build();
 


[3/4] kudu git commit: [c++ client] notes for timestamp-related methods

Posted by to...@apache.org.
[c++ client] notes for timestamp-related methods

Added notes on anticipated changes on methods related to getting
and setting hybrid timestamps while performing READ_AT_SNAPSHOT
scan operations:
  * KuduClient::GetLatestObservedTimestamp()
  * KuduClient::SetLatestObservedTimestamp()
  * KuduScanner::SetSnapshotRaw()

Some changes are anticipated there in the context of KUDU-611.
In short, the timestamp might be replaced with some opaque type
(e.g., a sequence of bytes).

Also, added code sample to provide an idea how to use value returned
by KuduClient::GetLatestObservedTimestamp() to set appropriate snapshot
timestamp via KuduScanner::SetSnapshotRaw().

This addresses the following JIRA issues:
  KUDU-1661 Mark KuduClient::GetLatestObservedTimestamp()
      as experimental
  KUDU-1663 Clean-up in-code documentation for KuduScanner::ReadMode
      and KuduClient::GetLatestObservedTimestamp

Change-Id: I6c45b797fa459ac9d214bb2612fd797f7a1eea45
Reviewed-on: http://gerrit.cloudera.org:8080/4569
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b70d3625
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b70d3625
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b70d3625

Branch: refs/heads/master
Commit: b70d3625b88ef4ad5d15ef68865e933a73c9069c
Parents: 2528edf
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Sep 29 17:37:39 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Wed Oct 5 07:06:11 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client.h | 44 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b70d3625/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 74b84a5..1a113e8 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -387,6 +387,38 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   /// table which is guaranteed to contain all data written or previously read
   /// by this client. See KuduScanner for more details on timestamps.
   ///
+  /// How to get Read-Your-Writes consistency:
+  /// the code snippet below uses KuduClient::GetLatestObservedTimestamp() along
+  /// with KuduScanner::SetSnapshotRaw() to perform READ_AT_SNAPSHOT scan
+  /// containing the data which has just been written.  Notice extra 1
+  /// added to the timestamp passed to KuduScanner::SetSnapshotRaw():
+  /// @code
+  ///   shared_ptr<KuduClient> client;
+  ///   ... // open/initialize the client
+  ///   shared_ptr<KuduSession> session(client->NewSession());
+  ///   ... // set Kudu session properties
+  ///   shared_ptr<KuduTable> table;
+  ///   ... // open the table
+  ///   unique_ptr<KuduInsert> insert_op(table->NewInsert());
+  ///   ... // populate new insert operation with data
+  ///   RETURN_NOT_OK(session->Apply(insert_op.release()));
+  ///   RETURN_NOT_OK(session->Flush());
+  ///   uint64_t snapshot_timestamp = client->GetLatestObservedTimestamp() + 1;
+  ///   KuduScanner scanner(table.get());
+  ///   RETURN_NOT_OK(scanner.SetSnapshotRaw(snapshot_timestamp));
+  ///   RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
+  ///   RETURN_NOT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+  ///   RETURN_NOT_OK(scanner.Open());
+  ///   ... // retrieve scanned rows
+  /// @endcode
+  /// There are currently races in which, in rare occasions, Read-Your-Writes
+  /// consistency might not hold even in this case. These are being
+  /// taken care of as part of
+  /// <a href="https://issues.apache.org/jira/browse/KUDU-430">KUDU-430</a>
+  ///
+  /// @note This method is experimental and will either disappear or
+  ///   change in a future release.
+  ///
   /// @return Highest HybridTime timestamp observed by the client.
   uint64_t GetLatestObservedTimestamp() const;
 
@@ -399,6 +431,9 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   /// The HybridTime encoded timestamp should be obtained from another client's
   /// KuduClient::GetLatestObservedTimestamp() method.
   ///
+  /// @note This method is experimental and will either disappear or
+  ///   change in a future release.
+  ///
   /// @param [in] ht_timestamp
   ///   Timestamp encoded in HybridTime format.
   void SetLatestObservedTimestamp(uint64_t ht_timestamp);
@@ -1517,7 +1552,8 @@ class KUDU_EXPORT KuduScanner {
     /// timestamp will yield the same data. This is performed at the expense
     /// of waiting for in-flight transactions whose timestamp is lower than
     /// the snapshot's timestamp to complete, so it might incur
-    /// a latency penalty.
+    /// a latency penalty. See KuduScanner::SetSnapshotMicros() and
+    /// KuduScanner::SetSnapshotRaw() for details.
     ///
     /// In ACID terms this, by itself, corresponds to Isolation mode "Repeatable
     /// Read". If all writes to the scanned tablet are made externally
@@ -1806,6 +1842,12 @@ class KUDU_EXPORT KuduScanner {
 
   /// Set snapshot timestamp for scans in @c READ_AT_SNAPSHOT mode (raw).
   ///
+  /// See KuduClient::GetLatestObservedTimestamp() for details on how to
+  /// use this method to achieve Read-Your-Writes behavior.
+  ///
+  /// @note This method is experimental and will either disappear or
+  ///   change in a future release.
+  ///
   /// @param [in] snapshot_timestamp
   ///   Timestamp to set in raw encoded form
   ///   (i.e. as returned by a previous call to a server).


[2/4] kudu git commit: Separate tablet MM ops into separate .cc file

Posted by to...@apache.org.
Separate tablet MM ops into separate .cc file

tablet.cc is a monster, no need for this in there.
Plus, these classes already have their own header file

Change-Id: Ie40475fdb9f5da6a7300661d5149509da990939d
Reviewed-on: http://gerrit.cloudera.org:8080/4362
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2528edf2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2528edf2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2528edf2

Branch: refs/heads/master
Commit: 2528edf28489569f4d4625a8eb1e92fb4df37753
Parents: 2a1a6c0
Author: Mike Percy <mp...@apache.org>
Authored: Tue Sep 6 21:14:30 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 04:41:41 2016 +0000

----------------------------------------------------------------------
 src/kudu/tablet/CMakeLists.txt   |   1 +
 src/kudu/tablet/tablet.cc        | 213 ------------------------------
 src/kudu/tablet/tablet_mm_ops.cc | 241 ++++++++++++++++++++++++++++++++++
 src/kudu/tablet/tablet_mm_ops.h  |   3 +
 4 files changed, 245 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index 8fda82c..9c8bc13 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -19,6 +19,7 @@ set(TABLET_SRCS
   tablet.cc
   tablet_bootstrap.cc
   tablet_metrics.cc
+  tablet_mm_ops.cc
   tablet_peer_mm_ops.cc
   tablet_peer.cc
   transactions/transaction.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 8b7e0e4..a0c3f3a 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -937,219 +937,6 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) {
   return throttler_->Take(MonoTime::Now(), 1, bytes);
 }
 
-////////////////////////////////////////////////////////////
-// CompactRowSetsOp
-////////////////////////////////////////////////////////////
-
-CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
-  : MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
-                  MaintenanceOp::HIGH_IO_USAGE),
-    last_num_mrs_flushed_(0),
-    last_num_rs_compacted_(0),
-    tablet_(tablet) {
-}
-
-void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
-  std::lock_guard<simple_spinlock> l(lock_);
-
-  // Any operation that changes the on-disk row layout invalidates the
-  // cached stats.
-  TabletMetrics* metrics = tablet_->metrics();
-  if (metrics) {
-    uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
-    uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
-    if (prev_stats_.valid() &&
-        new_num_mrs_flushed == last_num_mrs_flushed_ &&
-        new_num_rs_compacted == last_num_rs_compacted_) {
-      *stats = prev_stats_;
-      return;
-    } else {
-      last_num_mrs_flushed_ = new_num_mrs_flushed;
-      last_num_rs_compacted_ = new_num_rs_compacted;
-    }
-  }
-
-  tablet_->UpdateCompactionStats(&prev_stats_);
-  *stats = prev_stats_;
-}
-
-bool CompactRowSetsOp::Prepare() {
-  std::lock_guard<simple_spinlock> l(lock_);
-  // Invalidate the cached stats so that another section of the tablet can
-  // be compacted concurrently.
-  //
-  // TODO: we should acquire the rowset compaction locks here. Otherwise, until
-  // Compact() acquires them, the maintenance manager may compute the same
-  // stats for this op and run it again, even though Perform() will end up
-  // performing a much less fruitful compaction. See KUDU-790 for more details.
-  prev_stats_.Clear();
-  return true;
-}
-
-void CompactRowSetsOp::Perform() {
-  WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
-              Substitute("Compaction failed on $0", tablet_->tablet_id()));
-}
-
-scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
-  return tablet_->metrics()->compact_rs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > CompactRowSetsOp::RunningGauge() const {
-  return tablet_->metrics()->compact_rs_running;
-}
-
-////////////////////////////////////////////////////////////
-// MinorDeltaCompactionOp
-////////////////////////////////////////////////////////////
-
-MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet)
-  : MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()),
-                  MaintenanceOp::HIGH_IO_USAGE),
-    last_num_mrs_flushed_(0),
-    last_num_dms_flushed_(0),
-    last_num_rs_compacted_(0),
-    last_num_rs_minor_delta_compacted_(0),
-    tablet_(tablet) {
-}
-
-void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
-  std::lock_guard<simple_spinlock> l(lock_);
-
-  // Any operation that changes the number of REDO files invalidates the
-  // cached stats.
-  TabletMetrics* metrics = tablet_->metrics();
-  if (metrics) {
-    uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
-    uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
-    uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
-    uint64_t new_num_rs_minor_delta_compacted =
-        metrics->delta_minor_compact_rs_duration->TotalCount();
-    if (prev_stats_.valid() &&
-        new_num_mrs_flushed == last_num_mrs_flushed_ &&
-        new_num_dms_flushed == last_num_dms_flushed_ &&
-        new_num_rs_compacted == last_num_rs_compacted_ &&
-        new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
-      *stats = prev_stats_;
-      return;
-    } else {
-      last_num_mrs_flushed_ = new_num_mrs_flushed;
-      last_num_dms_flushed_ = new_num_dms_flushed;
-      last_num_rs_compacted_ = new_num_rs_compacted;
-      last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
-    }
-  }
-
-  double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
-      RowSet::MINOR_DELTA_COMPACTION, nullptr);
-  prev_stats_.set_perf_improvement(perf_improv);
-  prev_stats_.set_runnable(perf_improv > 0);
-  *stats = prev_stats_;
-}
-
-bool MinorDeltaCompactionOp::Prepare() {
-  std::lock_guard<simple_spinlock> l(lock_);
-  // Invalidate the cached stats so that another rowset in the tablet can
-  // be delta compacted concurrently.
-  //
-  // TODO: See CompactRowSetsOp::Prepare().
-  prev_stats_.Clear();
-  return true;
-}
-
-void MinorDeltaCompactionOp::Perform() {
-  WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
-              Substitute("Minor delta compaction failed on $0", tablet_->tablet_id()));
-}
-
-scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
-  return tablet_->metrics()->delta_minor_compact_rs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > MinorDeltaCompactionOp::RunningGauge() const {
-  return tablet_->metrics()->delta_minor_compact_rs_running;
-}
-
-////////////////////////////////////////////////////////////
-// MajorDeltaCompactionOp
-////////////////////////////////////////////////////////////
-
-MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet)
-  : MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()),
-                  MaintenanceOp::HIGH_IO_USAGE),
-    last_num_mrs_flushed_(0),
-    last_num_dms_flushed_(0),
-    last_num_rs_compacted_(0),
-    last_num_rs_minor_delta_compacted_(0),
-    last_num_rs_major_delta_compacted_(0),
-    tablet_(tablet) {
-}
-
-void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
-  std::lock_guard<simple_spinlock> l(lock_);
-
-  // Any operation that changes the size of the on-disk data invalidates the
-  // cached stats.
-  TabletMetrics* metrics = tablet_->metrics();
-  if (metrics) {
-    int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
-    int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
-    int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
-    int64_t new_num_rs_minor_delta_compacted =
-        metrics->delta_minor_compact_rs_duration->TotalCount();
-    int64_t new_num_rs_major_delta_compacted =
-        metrics->delta_major_compact_rs_duration->TotalCount();
-    if (prev_stats_.valid() &&
-        new_num_mrs_flushed == last_num_mrs_flushed_ &&
-        new_num_dms_flushed == last_num_dms_flushed_ &&
-        new_num_rs_compacted == last_num_rs_compacted_ &&
-        new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
-        new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
-      *stats = prev_stats_;
-      return;
-    } else {
-      last_num_mrs_flushed_ = new_num_mrs_flushed;
-      last_num_dms_flushed_ = new_num_dms_flushed;
-      last_num_rs_compacted_ = new_num_rs_compacted;
-      last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
-      last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
-    }
-  }
-
-  double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
-      RowSet::MAJOR_DELTA_COMPACTION, nullptr);
-  prev_stats_.set_perf_improvement(perf_improv);
-  prev_stats_.set_runnable(perf_improv > 0);
-  *stats = prev_stats_;
-}
-
-bool MajorDeltaCompactionOp::Prepare() {
-  std::lock_guard<simple_spinlock> l(lock_);
-  // Invalidate the cached stats so that another rowset in the tablet can
-  // be delta compacted concurrently.
-  //
-  // TODO: See CompactRowSetsOp::Prepare().
-  prev_stats_.Clear();
-  return true;
-}
-
-void MajorDeltaCompactionOp::Perform() {
-  WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
-              Substitute("Major delta compaction failed on $0", tablet_->tablet_id()));
-}
-
-scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
-  return tablet_->metrics()->delta_major_compact_rs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() const {
-  return tablet_->metrics()->delta_major_compact_rs_running;
-}
-
-////////////////////////////////////////////////////////////
-// Tablet
-////////////////////////////////////////////////////////////
-
 Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked,
                                     CompactFlags flags) const {
   CHECK_EQ(state_, kOpen);

http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/tablet_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
new file mode 100644
index 0000000..345e898
--- /dev/null
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/tablet_mm_ops.h"
+
+#include <mutex>
+
+#include "kudu/util/locks.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metrics.h"
+
+using strings::Substitute;
+
+namespace kudu {
+namespace tablet {
+
+////////////////////////////////////////////////////////////
+// CompactRowSetsOp
+////////////////////////////////////////////////////////////
+
+CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
+  : MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
+                  MaintenanceOp::HIGH_IO_USAGE),
+    last_num_mrs_flushed_(0),
+    last_num_rs_compacted_(0),
+    tablet_(tablet) {
+}
+
+void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
+  std::lock_guard<simple_spinlock> l(lock_);
+
+  // Any operation that changes the on-disk row layout invalidates the
+  // cached stats.
+  TabletMetrics* metrics = tablet_->metrics();
+  if (metrics) {
+    uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+    uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+    if (prev_stats_.valid() &&
+        new_num_mrs_flushed == last_num_mrs_flushed_ &&
+        new_num_rs_compacted == last_num_rs_compacted_) {
+      *stats = prev_stats_;
+      return;
+    } else {
+      last_num_mrs_flushed_ = new_num_mrs_flushed;
+      last_num_rs_compacted_ = new_num_rs_compacted;
+    }
+  }
+
+  tablet_->UpdateCompactionStats(&prev_stats_);
+  *stats = prev_stats_;
+}
+
+bool CompactRowSetsOp::Prepare() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  // Invalidate the cached stats so that another section of the tablet can
+  // be compacted concurrently.
+  //
+  // TODO: we should acquire the rowset compaction locks here. Otherwise, until
+  // Compact() acquires them, the maintenance manager may compute the same
+  // stats for this op and run it again, even though Perform() will end up
+  // performing a much less fruitful compaction. See KUDU-790 for more details.
+  prev_stats_.Clear();
+  return true;
+}
+
+void CompactRowSetsOp::Perform() {
+  WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
+              Substitute("Compaction failed on $0", tablet_->tablet_id()));
+}
+
+scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
+  return tablet_->metrics()->compact_rs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > CompactRowSetsOp::RunningGauge() const {
+  return tablet_->metrics()->compact_rs_running;
+}
+
+////////////////////////////////////////////////////////////
+// MinorDeltaCompactionOp
+////////////////////////////////////////////////////////////
+
+MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet)
+  : MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()),
+                  MaintenanceOp::HIGH_IO_USAGE),
+    last_num_mrs_flushed_(0),
+    last_num_dms_flushed_(0),
+    last_num_rs_compacted_(0),
+    last_num_rs_minor_delta_compacted_(0),
+    tablet_(tablet) {
+}
+
+void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
+  std::lock_guard<simple_spinlock> l(lock_);
+
+  // Any operation that changes the number of REDO files invalidates the
+  // cached stats.
+  TabletMetrics* metrics = tablet_->metrics();
+  if (metrics) {
+    uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+    uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
+    uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+    uint64_t new_num_rs_minor_delta_compacted =
+        metrics->delta_minor_compact_rs_duration->TotalCount();
+    if (prev_stats_.valid() &&
+        new_num_mrs_flushed == last_num_mrs_flushed_ &&
+        new_num_dms_flushed == last_num_dms_flushed_ &&
+        new_num_rs_compacted == last_num_rs_compacted_ &&
+        new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
+      *stats = prev_stats_;
+      return;
+    } else {
+      last_num_mrs_flushed_ = new_num_mrs_flushed;
+      last_num_dms_flushed_ = new_num_dms_flushed;
+      last_num_rs_compacted_ = new_num_rs_compacted;
+      last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
+    }
+  }
+
+  double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
+      RowSet::MINOR_DELTA_COMPACTION, nullptr);
+  prev_stats_.set_perf_improvement(perf_improv);
+  prev_stats_.set_runnable(perf_improv > 0);
+  *stats = prev_stats_;
+}
+
+bool MinorDeltaCompactionOp::Prepare() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  // Invalidate the cached stats so that another rowset in the tablet can
+  // be delta compacted concurrently.
+  //
+  // TODO: See CompactRowSetsOp::Prepare().
+  prev_stats_.Clear();
+  return true;
+}
+
+void MinorDeltaCompactionOp::Perform() {
+  WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
+              Substitute("Minor delta compaction failed on $0", tablet_->tablet_id()));
+}
+
+scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
+  return tablet_->metrics()->delta_minor_compact_rs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > MinorDeltaCompactionOp::RunningGauge() const {
+  return tablet_->metrics()->delta_minor_compact_rs_running;
+}
+
+////////////////////////////////////////////////////////////
+// MajorDeltaCompactionOp
+////////////////////////////////////////////////////////////
+
+MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet)
+  : MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()),
+                  MaintenanceOp::HIGH_IO_USAGE),
+    last_num_mrs_flushed_(0),
+    last_num_dms_flushed_(0),
+    last_num_rs_compacted_(0),
+    last_num_rs_minor_delta_compacted_(0),
+    last_num_rs_major_delta_compacted_(0),
+    tablet_(tablet) {
+}
+
+void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
+  std::lock_guard<simple_spinlock> l(lock_);
+
+  // Any operation that changes the size of the on-disk data invalidates the
+  // cached stats.
+  TabletMetrics* metrics = tablet_->metrics();
+  if (metrics) {
+    int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+    int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
+    int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+    int64_t new_num_rs_minor_delta_compacted =
+        metrics->delta_minor_compact_rs_duration->TotalCount();
+    int64_t new_num_rs_major_delta_compacted =
+        metrics->delta_major_compact_rs_duration->TotalCount();
+    if (prev_stats_.valid() &&
+        new_num_mrs_flushed == last_num_mrs_flushed_ &&
+        new_num_dms_flushed == last_num_dms_flushed_ &&
+        new_num_rs_compacted == last_num_rs_compacted_ &&
+        new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
+        new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
+      *stats = prev_stats_;
+      return;
+    } else {
+      last_num_mrs_flushed_ = new_num_mrs_flushed;
+      last_num_dms_flushed_ = new_num_dms_flushed;
+      last_num_rs_compacted_ = new_num_rs_compacted;
+      last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
+      last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
+    }
+  }
+
+  double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
+      RowSet::MAJOR_DELTA_COMPACTION, nullptr);
+  prev_stats_.set_perf_improvement(perf_improv);
+  prev_stats_.set_runnable(perf_improv > 0);
+  *stats = prev_stats_;
+}
+
+bool MajorDeltaCompactionOp::Prepare() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  // Invalidate the cached stats so that another rowset in the tablet can
+  // be delta compacted concurrently.
+  //
+  // TODO: See CompactRowSetsOp::Prepare().
+  prev_stats_.Clear();
+  return true;
+}
+
+void MajorDeltaCompactionOp::Perform() {
+  WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
+              Substitute("Major delta compaction failed on $0", tablet_->tablet_id()));
+}
+
+scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
+  return tablet_->metrics()->delta_major_compact_rs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() const {
+  return tablet_->metrics()->delta_major_compact_rs_running;
+}
+
+} // namespace tablet
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/tablet_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index f4ee1c3..1ea8f95 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_TABLET_TABLET_MM_OPS_H_
 #define KUDU_TABLET_TABLET_MM_OPS_H_
 
+#include "kudu/util/locks.h"
 #include "kudu/util/maintenance_manager.h"
 
 namespace kudu {
@@ -28,6 +29,8 @@ class AtomicGauge;
 
 namespace tablet {
 
+class Tablet;
+
 // MaintenanceOp for rowset compaction.
 //
 // This periodically invokes the tablet's CompactionPolicy to select a compaction.  The