You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/10/05 17:07:22 UTC
[1/4] kudu git commit: [python] - Expand gitignore for python
Repository: kudu
Updated Branches:
refs/heads/master d87486c47 -> 07d190c3e
[python] - Expand gitignore for python
Adding additional exclusion rules for python to .gitignore.
Change-Id: I4b7bcac00a88678a555a4d85419c22decac23b58
Reviewed-on: http://gerrit.cloudera.org:8080/4616
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2a1a6c07
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2a1a6c07
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2a1a6c07
Branch: refs/heads/master
Commit: 2a1a6c073bfb3dfb0d310d07f11aab10641665b6
Parents: d87486c
Author: Jordan Birdsell <jo...@gmail.com>
Authored: Tue Oct 4 07:03:37 2016 -0400
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 04:41:04 2016 +0000
----------------------------------------------------------------------
python/.gitignore | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/2a1a6c07/python/.gitignore
----------------------------------------------------------------------
diff --git a/python/.gitignore b/python/.gitignore
index 2e78d86..4615a32 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -19,11 +19,14 @@
*flymake*
# Compiled source and in-place build files
+__pycache__
+*$py.class
*.py[ocd]
*.so
.build_cache_dir
.cache
.eggs
+*.egg
MANIFEST
# Generated sources
@@ -42,5 +45,17 @@ dist
.coverage
coverage.xml
+# other distribution/packaging directories
+env/
+develop-eggs/
+downloads/
+eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+.installed.cfg
+
# automatically generated during local development
kudu/version.py
[4/4] kudu git commit: loadgen: allow configuring the number of
buckets
Posted by to...@apache.org.
loadgen: allow configuring the number of buckets
Change-Id: Idf0ae3e95b90d38829dba67a9ccc2dfbe2b5b09b
Reviewed-on: http://gerrit.cloudera.org:8080/4625
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/07d190c3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/07d190c3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/07d190c3
Branch: refs/heads/master
Commit: 07d190c3e47661e762c56c8405370e37b9e989ac
Parents: b70d362
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Oct 4 17:16:41 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 17:06:13 2016 +0000
----------------------------------------------------------------------
src/kudu/tools/tool_action_test.cc | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/07d190c3/src/kudu/tools/tool_action_test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
index f528f38..29b8950 100644
--- a/src/kudu/tools/tool_action_test.cc
+++ b/src/kudu/tools/tool_action_test.cc
@@ -193,6 +193,9 @@ DEFINE_string(table_name, "",
"an already existing table, it's highly recommended to use a "
"dedicated table created just for testing purposes: "
"the existing table nor its data is never dropped/deleted.");
+DEFINE_int32(num_buckets, 8,
+ "The number of buckets to create when this tool creates a new table.");
+
DEFINE_bool(use_random, false,
"Whether to use random numbers instead of sequential ones. "
"In case of using random numbers collisions are possible over "
@@ -495,7 +498,7 @@ Status TestLoadGenerator(const RunnerContext& context) {
RETURN_NOT_OK(table_creator->table_name(table_name)
.schema(&schema)
.num_replicas(1)
- .add_hash_partitions(vector<string>({ kKeyColumnName }), 8)
+ .add_hash_partitions(vector<string>({ kKeyColumnName }), FLAGS_num_buckets)
.wait(true)
.Create());
}
@@ -570,6 +573,7 @@ unique_ptr<Mode> BuildTestMode() {
.AddOptionalParameter("string_fixed")
.AddOptionalParameter("string_len")
.AddOptionalParameter("table_name")
+ .AddOptionalParameter("num_buckets")
.AddOptionalParameter("use_random")
.Build();
[3/4] kudu git commit: [c++ client] notes for timestamp-related
methods
Posted by to...@apache.org.
[c++ client] notes for timestamp-related methods
Added notes on anticipated changes on methods related to getting
and setting hybrid timestamps while performing READ_AT_SNAPSHOT
scan operations:
* KuduClient::GetLatestObservedTimestamp()
* KuduClient::SetLatestObservedTimestamp()
* KuduScanner::SetSnapshotRaw()
Some changes are anticipated there in the context of KUDU-611.
In short, the timestamp might be replaced with some opaque type
(e.g., a sequence of bytes).
Also, added code sample to provide an idea how to use value returned
by KuduClient::GetLatestObservedTimestamp() to set appropriate snapshot
timestamp via KuduScanner::SetSnapshotRaw().
This addresses the following JIRA issues:
KUDU-1661 Mark KuduClient::GetLatestObservedTimestamp()
as experimental
KUDU-1663 Clean-up in-code documentation for KuduScanner::ReadMode
and KuduClient::GetLatestObservedTimestamp
Change-Id: I6c45b797fa459ac9d214bb2612fd797f7a1eea45
Reviewed-on: http://gerrit.cloudera.org:8080/4569
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b70d3625
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b70d3625
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b70d3625
Branch: refs/heads/master
Commit: b70d3625b88ef4ad5d15ef68865e933a73c9069c
Parents: 2528edf
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Sep 29 17:37:39 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Wed Oct 5 07:06:11 2016 +0000
----------------------------------------------------------------------
src/kudu/client/client.h | 44 ++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/b70d3625/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 74b84a5..1a113e8 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -387,6 +387,38 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
/// table which is guaranteed to contain all data written or previously read
/// by this client. See KuduScanner for more details on timestamps.
///
+ /// How to get Read-Your-Writes consistency:
+ /// the code snippet below uses KuduClient::GetLatestObservedTimestamp() along
+ /// with KuduScanner::SetSnapshotRaw() to perform READ_AT_SNAPSHOT scan
+ /// containing the data which has just been written. Notice extra 1
+ /// added to the timestamp passed to KuduScanner::SetSnapshotRaw():
+ /// @code
+ /// shared_ptr<KuduClient> client;
+ /// ... // open/initialize the client
+ /// shared_ptr<KuduSession> session(client->NewSession());
+ /// ... // set Kudu session properties
+ /// shared_ptr<KuduTable> table;
+ /// ... // open the table
+ /// unique_ptr<KuduInsert> insert_op(table->NewInsert());
+ /// ... // populate new insert operation with data
+ /// RETURN_NOT_OK(session->Apply(insert_op.release()));
+ /// RETURN_NOT_OK(session->Flush());
+ /// uint64_t snapshot_timestamp = client->GetLatestObservedTimestamp() + 1;
+ /// KuduScanner scanner(table.get());
+ /// RETURN_NOT_OK(scanner.SetSnapshotRaw(snapshot_timestamp));
+ /// RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
+ /// RETURN_NOT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+ /// RETURN_NOT_OK(scanner.Open());
+ /// ... // retrieve scanned rows
+ /// @endcode
+ /// There are currently races in which, in rare occasions, Read-Your-Writes
+ /// consistency might not hold even in this case. These are being
+ /// taken care of as part of
+ /// <a href="https://issues.apache.org/jira/browse/KUDU-430">KUDU-430</a>
+ ///
+ /// @note This method is experimental and will either disappear or
+ /// change in a future release.
+ ///
/// @return Highest HybridTime timestamp observed by the client.
uint64_t GetLatestObservedTimestamp() const;
@@ -399,6 +431,9 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
/// The HybridTime encoded timestamp should be obtained from another client's
/// KuduClient::GetLatestObservedTimestamp() method.
///
+ /// @note This method is experimental and will either disappear or
+ /// change in a future release.
+ ///
/// @param [in] ht_timestamp
/// Timestamp encoded in HybridTime format.
void SetLatestObservedTimestamp(uint64_t ht_timestamp);
@@ -1517,7 +1552,8 @@ class KUDU_EXPORT KuduScanner {
/// timestamp will yield the same data. This is performed at the expense
/// of waiting for in-flight transactions whose timestamp is lower than
/// the snapshot's timestamp to complete, so it might incur
- /// a latency penalty.
+ /// a latency penalty. See KuduScanner::SetSnapshotMicros() and
+ /// KuduScanner::SetSnapshotRaw() for details.
///
/// In ACID terms this, by itself, corresponds to Isolation mode "Repeatable
/// Read". If all writes to the scanned tablet are made externally
@@ -1806,6 +1842,12 @@ class KUDU_EXPORT KuduScanner {
/// Set snapshot timestamp for scans in @c READ_AT_SNAPSHOT mode (raw).
///
+ /// See KuduClient::GetLatestObservedTimestamp() for details on how to
+ /// use this method to achieve Read-Your-Writes behavior.
+ ///
+ /// @note This method is experimental and will either disappear or
+ /// change in a future release.
+ ///
/// @param [in] snapshot_timestamp
/// Timestamp to set in raw encoded form
/// (i.e. as returned by a previous call to a server).
[2/4] kudu git commit: Separate tablet MM ops into separate .cc file
Posted by to...@apache.org.
Separate tablet MM ops into separate .cc file
tablet.cc is a monster, no need for this in there.
Plus, these classes already have their own header file
Change-Id: Ie40475fdb9f5da6a7300661d5149509da990939d
Reviewed-on: http://gerrit.cloudera.org:8080/4362
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2528edf2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2528edf2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2528edf2
Branch: refs/heads/master
Commit: 2528edf28489569f4d4625a8eb1e92fb4df37753
Parents: 2a1a6c0
Author: Mike Percy <mp...@apache.org>
Authored: Tue Sep 6 21:14:30 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 5 04:41:41 2016 +0000
----------------------------------------------------------------------
src/kudu/tablet/CMakeLists.txt | 1 +
src/kudu/tablet/tablet.cc | 213 ------------------------------
src/kudu/tablet/tablet_mm_ops.cc | 241 ++++++++++++++++++++++++++++++++++
src/kudu/tablet/tablet_mm_ops.h | 3 +
4 files changed, 245 insertions(+), 213 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index 8fda82c..9c8bc13 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -19,6 +19,7 @@ set(TABLET_SRCS
tablet.cc
tablet_bootstrap.cc
tablet_metrics.cc
+ tablet_mm_ops.cc
tablet_peer_mm_ops.cc
tablet_peer.cc
transactions/transaction.cc
http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 8b7e0e4..a0c3f3a 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -937,219 +937,6 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) {
return throttler_->Take(MonoTime::Now(), 1, bytes);
}
-////////////////////////////////////////////////////////////
-// CompactRowSetsOp
-////////////////////////////////////////////////////////////
-
-CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
- : MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
- MaintenanceOp::HIGH_IO_USAGE),
- last_num_mrs_flushed_(0),
- last_num_rs_compacted_(0),
- tablet_(tablet) {
-}
-
-void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
- std::lock_guard<simple_spinlock> l(lock_);
-
- // Any operation that changes the on-disk row layout invalidates the
- // cached stats.
- TabletMetrics* metrics = tablet_->metrics();
- if (metrics) {
- uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
- uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
- if (prev_stats_.valid() &&
- new_num_mrs_flushed == last_num_mrs_flushed_ &&
- new_num_rs_compacted == last_num_rs_compacted_) {
- *stats = prev_stats_;
- return;
- } else {
- last_num_mrs_flushed_ = new_num_mrs_flushed;
- last_num_rs_compacted_ = new_num_rs_compacted;
- }
- }
-
- tablet_->UpdateCompactionStats(&prev_stats_);
- *stats = prev_stats_;
-}
-
-bool CompactRowSetsOp::Prepare() {
- std::lock_guard<simple_spinlock> l(lock_);
- // Invalidate the cached stats so that another section of the tablet can
- // be compacted concurrently.
- //
- // TODO: we should acquire the rowset compaction locks here. Otherwise, until
- // Compact() acquires them, the maintenance manager may compute the same
- // stats for this op and run it again, even though Perform() will end up
- // performing a much less fruitful compaction. See KUDU-790 for more details.
- prev_stats_.Clear();
- return true;
-}
-
-void CompactRowSetsOp::Perform() {
- WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
- Substitute("Compaction failed on $0", tablet_->tablet_id()));
-}
-
-scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
- return tablet_->metrics()->compact_rs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > CompactRowSetsOp::RunningGauge() const {
- return tablet_->metrics()->compact_rs_running;
-}
-
-////////////////////////////////////////////////////////////
-// MinorDeltaCompactionOp
-////////////////////////////////////////////////////////////
-
-MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet)
- : MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()),
- MaintenanceOp::HIGH_IO_USAGE),
- last_num_mrs_flushed_(0),
- last_num_dms_flushed_(0),
- last_num_rs_compacted_(0),
- last_num_rs_minor_delta_compacted_(0),
- tablet_(tablet) {
-}
-
-void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
- std::lock_guard<simple_spinlock> l(lock_);
-
- // Any operation that changes the number of REDO files invalidates the
- // cached stats.
- TabletMetrics* metrics = tablet_->metrics();
- if (metrics) {
- uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
- uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
- uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
- uint64_t new_num_rs_minor_delta_compacted =
- metrics->delta_minor_compact_rs_duration->TotalCount();
- if (prev_stats_.valid() &&
- new_num_mrs_flushed == last_num_mrs_flushed_ &&
- new_num_dms_flushed == last_num_dms_flushed_ &&
- new_num_rs_compacted == last_num_rs_compacted_ &&
- new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
- *stats = prev_stats_;
- return;
- } else {
- last_num_mrs_flushed_ = new_num_mrs_flushed;
- last_num_dms_flushed_ = new_num_dms_flushed;
- last_num_rs_compacted_ = new_num_rs_compacted;
- last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
- }
- }
-
- double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
- RowSet::MINOR_DELTA_COMPACTION, nullptr);
- prev_stats_.set_perf_improvement(perf_improv);
- prev_stats_.set_runnable(perf_improv > 0);
- *stats = prev_stats_;
-}
-
-bool MinorDeltaCompactionOp::Prepare() {
- std::lock_guard<simple_spinlock> l(lock_);
- // Invalidate the cached stats so that another rowset in the tablet can
- // be delta compacted concurrently.
- //
- // TODO: See CompactRowSetsOp::Prepare().
- prev_stats_.Clear();
- return true;
-}
-
-void MinorDeltaCompactionOp::Perform() {
- WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
- Substitute("Minor delta compaction failed on $0", tablet_->tablet_id()));
-}
-
-scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
- return tablet_->metrics()->delta_minor_compact_rs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > MinorDeltaCompactionOp::RunningGauge() const {
- return tablet_->metrics()->delta_minor_compact_rs_running;
-}
-
-////////////////////////////////////////////////////////////
-// MajorDeltaCompactionOp
-////////////////////////////////////////////////////////////
-
-MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet)
- : MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()),
- MaintenanceOp::HIGH_IO_USAGE),
- last_num_mrs_flushed_(0),
- last_num_dms_flushed_(0),
- last_num_rs_compacted_(0),
- last_num_rs_minor_delta_compacted_(0),
- last_num_rs_major_delta_compacted_(0),
- tablet_(tablet) {
-}
-
-void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
- std::lock_guard<simple_spinlock> l(lock_);
-
- // Any operation that changes the size of the on-disk data invalidates the
- // cached stats.
- TabletMetrics* metrics = tablet_->metrics();
- if (metrics) {
- int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
- int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
- int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
- int64_t new_num_rs_minor_delta_compacted =
- metrics->delta_minor_compact_rs_duration->TotalCount();
- int64_t new_num_rs_major_delta_compacted =
- metrics->delta_major_compact_rs_duration->TotalCount();
- if (prev_stats_.valid() &&
- new_num_mrs_flushed == last_num_mrs_flushed_ &&
- new_num_dms_flushed == last_num_dms_flushed_ &&
- new_num_rs_compacted == last_num_rs_compacted_ &&
- new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
- new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
- *stats = prev_stats_;
- return;
- } else {
- last_num_mrs_flushed_ = new_num_mrs_flushed;
- last_num_dms_flushed_ = new_num_dms_flushed;
- last_num_rs_compacted_ = new_num_rs_compacted;
- last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
- last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
- }
- }
-
- double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
- RowSet::MAJOR_DELTA_COMPACTION, nullptr);
- prev_stats_.set_perf_improvement(perf_improv);
- prev_stats_.set_runnable(perf_improv > 0);
- *stats = prev_stats_;
-}
-
-bool MajorDeltaCompactionOp::Prepare() {
- std::lock_guard<simple_spinlock> l(lock_);
- // Invalidate the cached stats so that another rowset in the tablet can
- // be delta compacted concurrently.
- //
- // TODO: See CompactRowSetsOp::Prepare().
- prev_stats_.Clear();
- return true;
-}
-
-void MajorDeltaCompactionOp::Perform() {
- WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
- Substitute("Major delta compaction failed on $0", tablet_->tablet_id()));
-}
-
-scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
- return tablet_->metrics()->delta_major_compact_rs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() const {
- return tablet_->metrics()->delta_major_compact_rs_running;
-}
-
-////////////////////////////////////////////////////////////
-// Tablet
-////////////////////////////////////////////////////////////
-
Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked,
CompactFlags flags) const {
CHECK_EQ(state_, kOpen);
http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/tablet_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
new file mode 100644
index 0000000..345e898
--- /dev/null
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/tablet_mm_ops.h"
+
+#include <mutex>
+
+#include "kudu/util/locks.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metrics.h"
+
+using strings::Substitute;
+
+namespace kudu {
+namespace tablet {
+
+////////////////////////////////////////////////////////////
+// CompactRowSetsOp
+////////////////////////////////////////////////////////////
+
+CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
+ : MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
+ MaintenanceOp::HIGH_IO_USAGE),
+ last_num_mrs_flushed_(0),
+ last_num_rs_compacted_(0),
+ tablet_(tablet) {
+}
+
+void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
+ std::lock_guard<simple_spinlock> l(lock_);
+
+ // Any operation that changes the on-disk row layout invalidates the
+ // cached stats.
+ TabletMetrics* metrics = tablet_->metrics();
+ if (metrics) {
+ uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+ uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+ if (prev_stats_.valid() &&
+ new_num_mrs_flushed == last_num_mrs_flushed_ &&
+ new_num_rs_compacted == last_num_rs_compacted_) {
+ *stats = prev_stats_;
+ return;
+ } else {
+ last_num_mrs_flushed_ = new_num_mrs_flushed;
+ last_num_rs_compacted_ = new_num_rs_compacted;
+ }
+ }
+
+ tablet_->UpdateCompactionStats(&prev_stats_);
+ *stats = prev_stats_;
+}
+
+bool CompactRowSetsOp::Prepare() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ // Invalidate the cached stats so that another section of the tablet can
+ // be compacted concurrently.
+ //
+ // TODO: we should acquire the rowset compaction locks here. Otherwise, until
+ // Compact() acquires them, the maintenance manager may compute the same
+ // stats for this op and run it again, even though Perform() will end up
+ // performing a much less fruitful compaction. See KUDU-790 for more details.
+ prev_stats_.Clear();
+ return true;
+}
+
+void CompactRowSetsOp::Perform() {
+ WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
+ Substitute("Compaction failed on $0", tablet_->tablet_id()));
+}
+
+scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
+ return tablet_->metrics()->compact_rs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > CompactRowSetsOp::RunningGauge() const {
+ return tablet_->metrics()->compact_rs_running;
+}
+
+////////////////////////////////////////////////////////////
+// MinorDeltaCompactionOp
+////////////////////////////////////////////////////////////
+
+MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet)
+ : MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()),
+ MaintenanceOp::HIGH_IO_USAGE),
+ last_num_mrs_flushed_(0),
+ last_num_dms_flushed_(0),
+ last_num_rs_compacted_(0),
+ last_num_rs_minor_delta_compacted_(0),
+ tablet_(tablet) {
+}
+
+void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
+ std::lock_guard<simple_spinlock> l(lock_);
+
+ // Any operation that changes the number of REDO files invalidates the
+ // cached stats.
+ TabletMetrics* metrics = tablet_->metrics();
+ if (metrics) {
+ uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+ uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
+ uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+ uint64_t new_num_rs_minor_delta_compacted =
+ metrics->delta_minor_compact_rs_duration->TotalCount();
+ if (prev_stats_.valid() &&
+ new_num_mrs_flushed == last_num_mrs_flushed_ &&
+ new_num_dms_flushed == last_num_dms_flushed_ &&
+ new_num_rs_compacted == last_num_rs_compacted_ &&
+ new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
+ *stats = prev_stats_;
+ return;
+ } else {
+ last_num_mrs_flushed_ = new_num_mrs_flushed;
+ last_num_dms_flushed_ = new_num_dms_flushed;
+ last_num_rs_compacted_ = new_num_rs_compacted;
+ last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
+ }
+ }
+
+ double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
+ RowSet::MINOR_DELTA_COMPACTION, nullptr);
+ prev_stats_.set_perf_improvement(perf_improv);
+ prev_stats_.set_runnable(perf_improv > 0);
+ *stats = prev_stats_;
+}
+
+bool MinorDeltaCompactionOp::Prepare() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ // Invalidate the cached stats so that another rowset in the tablet can
+ // be delta compacted concurrently.
+ //
+ // TODO: See CompactRowSetsOp::Prepare().
+ prev_stats_.Clear();
+ return true;
+}
+
+void MinorDeltaCompactionOp::Perform() {
+ WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
+ Substitute("Minor delta compaction failed on $0", tablet_->tablet_id()));
+}
+
+scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
+ return tablet_->metrics()->delta_minor_compact_rs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > MinorDeltaCompactionOp::RunningGauge() const {
+ return tablet_->metrics()->delta_minor_compact_rs_running;
+}
+
+////////////////////////////////////////////////////////////
+// MajorDeltaCompactionOp
+////////////////////////////////////////////////////////////
+
+MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet)
+ : MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()),
+ MaintenanceOp::HIGH_IO_USAGE),
+ last_num_mrs_flushed_(0),
+ last_num_dms_flushed_(0),
+ last_num_rs_compacted_(0),
+ last_num_rs_minor_delta_compacted_(0),
+ last_num_rs_major_delta_compacted_(0),
+ tablet_(tablet) {
+}
+
+void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
+ std::lock_guard<simple_spinlock> l(lock_);
+
+ // Any operation that changes the size of the on-disk data invalidates the
+ // cached stats.
+ TabletMetrics* metrics = tablet_->metrics();
+ if (metrics) {
+ int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+ int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
+ int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+ int64_t new_num_rs_minor_delta_compacted =
+ metrics->delta_minor_compact_rs_duration->TotalCount();
+ int64_t new_num_rs_major_delta_compacted =
+ metrics->delta_major_compact_rs_duration->TotalCount();
+ if (prev_stats_.valid() &&
+ new_num_mrs_flushed == last_num_mrs_flushed_ &&
+ new_num_dms_flushed == last_num_dms_flushed_ &&
+ new_num_rs_compacted == last_num_rs_compacted_ &&
+ new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
+ new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
+ *stats = prev_stats_;
+ return;
+ } else {
+ last_num_mrs_flushed_ = new_num_mrs_flushed;
+ last_num_dms_flushed_ = new_num_dms_flushed;
+ last_num_rs_compacted_ = new_num_rs_compacted;
+ last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
+ last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
+ }
+ }
+
+ double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
+ RowSet::MAJOR_DELTA_COMPACTION, nullptr);
+ prev_stats_.set_perf_improvement(perf_improv);
+ prev_stats_.set_runnable(perf_improv > 0);
+ *stats = prev_stats_;
+}
+
+bool MajorDeltaCompactionOp::Prepare() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ // Invalidate the cached stats so that another rowset in the tablet can
+ // be delta compacted concurrently.
+ //
+ // TODO: See CompactRowSetsOp::Prepare().
+ prev_stats_.Clear();
+ return true;
+}
+
+void MajorDeltaCompactionOp::Perform() {
+ WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
+ Substitute("Major delta compaction failed on $0", tablet_->tablet_id()));
+}
+
+scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
+ return tablet_->metrics()->delta_major_compact_rs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() const {
+ return tablet_->metrics()->delta_major_compact_rs_running;
+}
+
+} // namespace tablet
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/2528edf2/src/kudu/tablet/tablet_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index f4ee1c3..1ea8f95 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -18,6 +18,7 @@
#ifndef KUDU_TABLET_TABLET_MM_OPS_H_
#define KUDU_TABLET_TABLET_MM_OPS_H_
+#include "kudu/util/locks.h"
#include "kudu/util/maintenance_manager.h"
namespace kudu {
@@ -28,6 +29,8 @@ class AtomicGauge;
namespace tablet {
+class Tablet;
+
// MaintenanceOp for rowset compaction.
//
// This periodically invokes the tablet's CompactionPolicy to select a compaction. The