You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/04/27 03:18:21 UTC
[1/3] kudu git commit: tpch: allow hash partitioning
Repository: kudu
Updated Branches:
refs/heads/master ec10fd365 -> 1edaee01f
tpch: allow hash partitioning
This enables a hash-partitioning mode for tpch_real_world. In this mode,
we still create one tablet per thread, but we use hash-partitioning so
that we don't get the perfect one-to-one insert pattern that we get with
range partitioning.
This is a bit more realistic for many batch insert scenarios, in which
many writers write locally-sorted (but overlapping) data ranges into a
single hash-partitioned table.
Change-Id: Icbbb643447dff58b32e67764f6fb632441a4f6e4
Reviewed-on: http://gerrit.cloudera.org:8080/6709
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f03d47ed
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f03d47ed
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f03d47ed
Branch: refs/heads/master
Commit: f03d47ed4698623df1f308f0fddab4e8ccbc86db
Parents: ec10fd3
Author: Todd Lipcon <to...@apache.org>
Authored: Sun Mar 12 15:56:41 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Apr 26 21:22:48 2017 +0000
----------------------------------------------------------------------
.../benchmarks/tpch/rpc_line_item_dao-test.cc | 6 ++++-
src/kudu/benchmarks/tpch/rpc_line_item_dao.cc | 22 ++++++++++-----
src/kudu/benchmarks/tpch/rpc_line_item_dao.h | 12 ++++++++-
src/kudu/benchmarks/tpch/tpch1.cc | 6 +++--
src/kudu/benchmarks/tpch/tpch_real_world.cc | 28 ++++++++++++++++----
5 files changed, 59 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/f03d47ed/src/kudu/benchmarks/tpch/rpc_line_item_dao-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao-test.cc b/src/kudu/benchmarks/tpch/rpc_line_item_dao-test.cc
index b57d4dc..42299eb 100644
--- a/src/kudu/benchmarks/tpch/rpc_line_item_dao-test.cc
+++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao-test.cc
@@ -54,7 +54,11 @@ class RpcLineItemDAOTest : public KuduTest {
// Create the table and Connect to it.
string master_address(cluster_->mini_master()->bound_rpc_addr_str());
- dao_.reset(new kudu::RpcLineItemDAO(master_address, kTableName, 5));
+ dao_.reset(new kudu::RpcLineItemDAO(master_address, kTableName,
+ /* batch size */ 5,
+ /* timeout_ms */ 5000,
+ RpcLineItemDAO::RANGE,
+ /* num_buckets */ 1));
dao_->Init();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/f03d47ed/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
index ab1b357..4b5e7d5 100644
--- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
+++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
@@ -104,11 +104,15 @@ RpcLineItemDAO::~RpcLineItemDAO() {
RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name,
int batch_op_num_max, int timeout_ms,
+ PartitionStrategy partition_strategy,
+ int num_buckets,
vector<const KuduPartialRow*> tablet_splits)
: master_address_(std::move(master_address)),
table_name_(std::move(table_name)),
timeout_(MonoDelta::FromMilliseconds(timeout_ms)),
batch_op_num_max_(batch_op_num_max),
+ partition_strategy_(partition_strategy),
+ num_buckets_(num_buckets),
tablet_splits_(std::move(tablet_splits)),
batch_op_num_(0),
semaphore_(1) {
@@ -124,12 +128,18 @@ void RpcLineItemDAO::Init() {
Status s = client_->OpenTable(table_name_, &client_table_);
if (s.IsNotFound()) {
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
- CHECK_OK(table_creator->table_name(table_name_)
- .schema(&schema)
- .num_replicas(1)
- .set_range_partition_columns({ tpch::kOrderKeyColName, tpch::kLineNumberColName })
- .split_rows(tablet_splits_)
- .Create());
+ table_creator->table_name(table_name_)
+ .schema(&schema)
+ .num_replicas(1);
+ if (partition_strategy_ == RANGE) {
+ table_creator
+ ->set_range_partition_columns({tpch::kOrderKeyColName, tpch::kLineNumberColName })
+ .split_rows(tablet_splits_);
+ } else {
+ table_creator->add_hash_partitions({ tpch::kOrderKeyColName }, num_buckets_);
+ }
+
+ CHECK_OK(table_creator->Create());
CHECK_OK(client_->OpenTable(table_name_, &client_table_));
} else {
CHECK_OK(s);
http://git-wip-us.apache.org/repos/asf/kudu/blob/f03d47ed/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
index 98924d6..1da2f84 100644
--- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
+++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
@@ -35,11 +35,17 @@ namespace kudu {
class RpcLineItemDAO {
public:
class Scanner;
+ enum PartitionStrategy {
+ RANGE,
+ HASH
+ };
RpcLineItemDAO(std::string master_address,
std::string table_name,
int batch_op_num_max,
- int timeout_ms = 5000,
+ int timeout_ms,
+ PartitionStrategy partition_strategy,
+ int num_buckets,
std::vector<const KuduPartialRow*> tablet_splits = {});
~RpcLineItemDAO();
void Init();
@@ -91,6 +97,10 @@ class RpcLineItemDAO {
const std::string table_name_;
const MonoDelta timeout_;
const int batch_op_num_max_;
+
+ const PartitionStrategy partition_strategy_;
+ const int num_buckets_;
+
const std::vector<const KuduPartialRow*> tablet_splits_;
int batch_op_num_;
simple_spinlock lock_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/f03d47ed/src/kudu/benchmarks/tpch/tpch1.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/tpch1.cc b/src/kudu/benchmarks/tpch/tpch1.cc
index 3cd1384..da36872 100644
--- a/src/kudu/benchmarks/tpch/tpch1.cc
+++ b/src/kudu/benchmarks/tpch/tpch1.cc
@@ -261,8 +261,10 @@ int main(int argc, char **argv) {
master_address = FLAGS_master_address;
}
- gscoped_ptr<kudu::RpcLineItemDAO> dao(new kudu::RpcLineItemDAO(master_address, FLAGS_table_name,
- FLAGS_tpch_max_batch_size));
+ gscoped_ptr<kudu::RpcLineItemDAO> dao(new kudu::RpcLineItemDAO(
+ master_address, FLAGS_table_name, FLAGS_tpch_max_batch_size,
+ /* timeout = */ 5000, kudu::RpcLineItemDAO::RANGE,
+ /* num_buckets = */ 1));
dao->Init();
kudu::WarmupScanCache(dao.get());
http://git-wip-us.apache.org/repos/asf/kudu/blob/f03d47ed/src/kudu/benchmarks/tpch/tpch_real_world.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/tpch_real_world.cc b/src/kudu/benchmarks/tpch/tpch_real_world.cc
index 349c8bc..24c3ce1 100644
--- a/src/kudu/benchmarks/tpch/tpch_real_world.cc
+++ b/src/kudu/benchmarks/tpch/tpch_real_world.cc
@@ -95,6 +95,13 @@ DEFINE_string(tpch_path_to_ts_flags_file, "",
"a mini cluster. Doesn't use one by default.");
DEFINE_string(tpch_table_name, "tpch_real_world",
"Table name to use during the test");
+DEFINE_string(tpch_partition_strategy, "range",
+ "The partitioning strategy to use for the lineitem table. If 'range' is selected, "
+ "each writer thread inserts sequentially into its own tablet, creating an ideal "
+ "workload for maximum throughput. 'hash' partitioning creates the same number of "
+ "tablets, but hash-partitions them so that each writer thread writes to all "
+ "tablets. This is less ideal, but more faithfully represents a lot of write "
+ "workloads.");
namespace kudu {
@@ -249,11 +256,20 @@ gscoped_ptr<RpcLineItemDAO> TpchRealWorld::GetInittedDAO() {
KuduSchema schema(tpch::CreateLineItemSchema());
vector<const KuduPartialRow*> split_rows;
- for (int64_t i = 1; i < FLAGS_tpch_num_inserters; i++) {
- KuduPartialRow* row = schema.NewRow();
- CHECK_OK(row->SetInt64(tpch::kOrderKeyColName, i * increment));
- CHECK_OK(row->SetInt32(tpch::kLineNumberColName, 0));
- split_rows.push_back(row);
+
+ RpcLineItemDAO::PartitionStrategy strategy;
+ if (FLAGS_tpch_partition_strategy == "hash") {
+ strategy = RpcLineItemDAO::HASH;
+ } else if (FLAGS_tpch_partition_strategy == "range") {
+ strategy = RpcLineItemDAO::RANGE;
+ for (int64_t i = 1; i < FLAGS_tpch_num_inserters; i++) {
+ KuduPartialRow* row = schema.NewRow();
+ CHECK_OK(row->SetInt64(tpch::kOrderKeyColName, i * increment));
+ CHECK_OK(row->SetInt32(tpch::kLineNumberColName, 0));
+ split_rows.push_back(row);
+ }
+ } else {
+ LOG(FATAL) << "Unknown partition strategy: " << FLAGS_tpch_partition_strategy;
}
gscoped_ptr<RpcLineItemDAO> dao(
@@ -261,6 +277,8 @@ gscoped_ptr<RpcLineItemDAO> TpchRealWorld::GetInittedDAO() {
FLAGS_tpch_table_name,
FLAGS_tpch_max_batch_size,
FLAGS_tpch_test_client_timeout_msec,
+ strategy,
+ FLAGS_tpch_num_inserters,
split_rows));
dao->Init();
return std::move(dao);
[2/3] kudu git commit: Simplify MemTracker and move process
throttling elsewhere
Posted by jd...@apache.org.
Simplify MemTracker and move process throttling elsewhere
This takes a first step towards simplifying MemTracker:
- Remove the "GC function" callbacks (we never used this)
- Remove the 'ExpandLimits' code which was unimplemented.
- Remove the logging functionality, which we've never used
as far as I can remember.
- Remove soft limiting. We only used this on the root tracker, so
I just moved it to a new process_memory.{h,cc}
- Remove 'consumption_func' and un-tie the root tracker from
the global process memory usage. Now the root tracker is a simple
sum of its descendents.
For a stress/benchmark I ran a 500GB YCSB with a memory limit set to
10GB. Results showed no major difference with this patch (throughput was
a few percent faster but within the realm of noise). Details at [1]
[1] https://docs.google.com/document/d/1dOe5-L5BWUhF-uV4-AE5hduvfUWctXizlaoSLlp38yM/edit?usp=sharing
Change-Id: Id16bad7d9a29a83e820a38e9d703811391cffe90
Reviewed-on: http://gerrit.cloudera.org:8080/6620
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2be32d54
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2be32d54
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2be32d54
Branch: refs/heads/master
Commit: 2be32d5441827e0914ad9b5008545003bfda4575
Parents: f03d47e
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Apr 12 17:06:38 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Apr 26 21:25:47 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/raft_consensus.cc | 3 +-
.../integration-tests/raft_consensus-itest.cc | 8 +-
src/kudu/server/default-path-handlers.cc | 24 +-
src/kudu/tablet/multi_column_writer.cc | 2 +-
src/kudu/tserver/tablet_service.cc | 3 +-
src/kudu/util/CMakeLists.txt | 1 +
src/kudu/util/maintenance_manager-test.cc | 51 ++-
src/kudu/util/maintenance_manager.cc | 10 +-
src/kudu/util/maintenance_manager.h | 13 +-
src/kudu/util/mem_tracker-test.cc | 115 -------
src/kudu/util/mem_tracker.cc | 327 +------------------
src/kudu/util/mem_tracker.h | 153 +--------
src/kudu/util/process_memory.cc | 235 +++++++++++++
src/kudu/util/process_memory.h | 41 +++
14 files changed, 373 insertions(+), 613 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 6a87719..f75057f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -43,6 +43,7 @@
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
+#include "kudu/util/process_memory.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/threadpool.h"
@@ -1188,7 +1189,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
// This request contains at least one message, and is likely to increase
// our memory pressure.
double capacity_pct;
- if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
+ if (process_memory::SoftLimitExceeded(&capacity_pct)) {
follower_memory_pressure_rejections_->Increment();
string msg = StringPrintf(
"Soft memory limit exceeded (at %.2f%% of capacity)",
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 579e3f3..0f15d84 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -2251,8 +2251,14 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
// Very low memory limit to ease testing.
+ // When using tcmalloc, we set it to 30MB, since we can get accurate process memory
+ // usage statistics. Otherwise, set to only 4MB, since we'll only be throttling based
+ // on our tracked memory.
+#ifdef TCMALLOC_ENABLED
+ ts_flags.push_back("--memory_limit_hard_bytes=30000000");
+#else
ts_flags.push_back("--memory_limit_hard_bytes=4194304");
-
+#endif
// Don't let transaction memory tracking get in the way.
ts_flags.push_back("--tablet_transaction_memory_limit_mb=-1");
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/server/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/default-path-handlers.cc b/src/kudu/server/default-path-handlers.cc
index 4182c39..f4f9dd5 100644
--- a/src/kudu/server/default-path-handlers.cc
+++ b/src/kudu/server/default-path-handlers.cc
@@ -38,12 +38,12 @@
#include "kudu/util/flag_tags.h"
#include "kudu/util/flags.h"
#include "kudu/util/histogram.pb.h"
+#include "kudu/util/jsonwriter.h"
#include "kudu/util/logging.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
-#include "kudu/util/jsonwriter.h"
+#include "kudu/util/process_memory.h"
-using boost::replace_all;
using std::ifstream;
using std::string;
using std::endl;
@@ -138,13 +138,29 @@ static void MemUsageHandler(const Webserver::WebRequest& req, std::ostringstream
MallocExtension::instance()->GetStats(buf, 2048);
// Replace new lines with <br> for html
string tmp(buf);
- replace_all(tmp, "\n", tags.line_break);
+ boost::replace_all(tmp, "\n", tags.line_break);
(*output) << tmp << tags.end_pre_tag;
#endif
}
// Registered to handle "/mem-trackers", and prints out to handle memory tracker information.
-static void MemTrackersHandler(const Webserver::WebRequest& req, std::ostringstream* output) {
+static void MemTrackersHandler(const Webserver::WebRequest& /*req*/, std::ostringstream* output) {
+ *output << "<h1>Process memory usage</h1>\n";
+ *output << "<table class='table table-striped'>\n";
+ *output << Substitute(" <tr><th>Total consumption</th><td>$0</td></tr>\n",
+ HumanReadableNumBytes::ToString(process_memory::CurrentConsumption()));
+ *output << Substitute(" <tr><th>Memory limit</th><td>$0</td></tr>\n",
+ HumanReadableNumBytes::ToString(process_memory::HardLimit()));
+ *output << "</table>\n";
+#ifndef TCMALLOC_ENABLED
+ *output << R"(
+ <div class="alert alert-warning">
+ <strong>NOTE:</strong> This build of Kudu has not enabled tcmalloc.
+ The above process memory stats will be inaccurate.
+ </div>
+ )";
+#endif
+
*output << "<h1>Memory usage by subsystem</h1>\n";
*output << "<table class='table table-striped'>\n";
*output << " <tr><th>Id</th><th>Parent</th><th>Limit</th><th>Current Consumption</th>"
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/tablet/multi_column_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/multi_column_writer.cc b/src/kudu/tablet/multi_column_writer.cc
index 51d6c29..7b19df8 100644
--- a/src/kudu/tablet/multi_column_writer.cc
+++ b/src/kudu/tablet/multi_column_writer.cc
@@ -83,10 +83,10 @@ Status MultiColumnWriter::Open() {
RETURN_NOT_OK_PREPEND(writer->Start(),
"Unable to Start() writer for column " + col.ToString());
- LOG(INFO) << "Opened CFile writer for column " << col.ToString();
cfile_writers_.push_back(writer.release());
block_ids_.push_back(block_id);
}
+ LOG(INFO) << "Opened CFile writers for " << cfile_writers_.size() << " column(s)";
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 91b288c..3fa9abb 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -57,6 +57,7 @@
#include "kudu/util/mem_tracker.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
+#include "kudu/util/process_memory.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/trace.h"
@@ -747,7 +748,7 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
// Check for memory pressure; don't bother doing any additional work if we've
// exceeded the limit.
double capacity_pct;
- if (tablet->mem_tracker()->AnySoftLimitExceeded(&capacity_pct)) {
+ if (process_memory::SoftLimitExceeded(&capacity_pct)) {
tablet->metrics()->leader_memory_pressure_rejections->Increment();
string msg = StringPrintf(
"Soft memory limit exceeded (at %.2f%% of capacity)",
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 7d049a9..04fe5e8 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -169,6 +169,7 @@ set(UTIL_SRCS
path_util.cc
pb_util.cc
pb_util-internal.cc
+ process_memory.cc
random_util.cc
resettable_heartbeater.cc
rolling_log.cc
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index e5dcc96..98dc9ba 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
+#include <atomic>
#include <memory>
#include <mutex>
#include <vector>
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/tablet.pb.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
@@ -53,13 +54,15 @@ static const char kFakeUuid[] = "12345";
class MaintenanceManagerTest : public KuduTest {
public:
void SetUp() override {
- test_tracker_ = MemTracker::CreateTracker(1000, "test");
MaintenanceManager::Options options;
options.num_threads = 2;
options.polling_interval_ms = 1;
options.history_size = kHistorySize;
- options.parent_mem_tracker = test_tracker_;
manager_.reset(new MaintenanceManager(options));
+ manager_->set_memory_pressure_func_for_tests(
+ [&](double* consumption) {
+ return indicate_memory_pressure_.load();
+ });
ASSERT_OK(manager_->Init(kFakeUuid));
}
@@ -68,8 +71,8 @@ class MaintenanceManagerTest : public KuduTest {
}
protected:
- shared_ptr<MemTracker> test_tracker_;
shared_ptr<MaintenanceManager> manager_;
+ std::atomic<bool> indicate_memory_pressure_ { false };
};
// Just create the MaintenanceManager and then shut it down, to make sure
@@ -80,10 +83,9 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
class TestMaintenanceOp : public MaintenanceOp {
public:
TestMaintenanceOp(const std::string& name,
- IOUsage io_usage,
- const shared_ptr<MemTracker>& tracker)
+ IOUsage io_usage)
: MaintenanceOp(name, io_usage),
- consumption_(tracker, 500),
+ ram_anchored_(500),
logs_retained_bytes_(0),
perf_improvement_(0),
metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
@@ -124,7 +126,7 @@ class TestMaintenanceOp : public MaintenanceOp {
virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
std::lock_guard<Mutex> guard(lock_);
stats->set_runnable(remaining_runs_ > 0);
- stats->set_ram_anchored(consumption_.consumption());
+ stats->set_ram_anchored(ram_anchored_);
stats->set_logs_retained_bytes(logs_retained_bytes_);
stats->set_perf_improvement(perf_improvement_);
}
@@ -141,7 +143,7 @@ class TestMaintenanceOp : public MaintenanceOp {
void set_ram_anchored(uint64_t ram_anchored) {
std::lock_guard<Mutex> guard(lock_);
- consumption_.Reset(ram_anchored);
+ ram_anchored_ = ram_anchored;
}
void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
@@ -165,7 +167,7 @@ class TestMaintenanceOp : public MaintenanceOp {
private:
Mutex lock_;
- ScopedTrackedConsumption consumption_;
+ uint64_t ram_anchored_;
uint64_t logs_retained_bytes_;
uint64_t perf_improvement_;
MetricRegistry metric_registry_;
@@ -187,8 +189,8 @@ class TestMaintenanceOp : public MaintenanceOp {
// running and verify that UnregisterOp waits for it to finish before
// proceeding.
TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
- TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
- op1.set_ram_anchored(1001);
+ TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+ op1.set_perf_improvement(10);
// Register initially with no remaining runs. We'll later enable it once it's
// already registered.
op1.set_remaining_runs(0);
@@ -207,8 +209,8 @@ TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
// Regression test for KUDU-1495: when an operation is being unregistered,
// new instances of that operation should not be scheduled.
TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
- TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
- op1.set_ram_anchored(1001);
+ TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+ op1.set_perf_improvement(10);
// Set the op to run up to 10 times, and each time should sleep for a second.
op1.set_remaining_runs(10);
@@ -231,7 +233,7 @@ TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
// Test that we'll run an operation that doesn't improve performance when memory
// pressure gets high.
TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
- TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+ TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
op.set_ram_anchored(100);
manager_->RegisterOp(&op);
@@ -239,16 +241,13 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
SleepFor(MonoDelta::FromMilliseconds(20));
ASSERT_EQ(0, op.DurationHistogram()->TotalCount());
- // set the ram_anchored by the high mem op so high that we'll have to run it.
- scoped_refptr<kudu::Thread> thread;
- ASSERT_OK(Thread::Create("TestThread", "MaintenanceManagerTest",
- boost::bind(&TestMaintenanceOp::set_ram_anchored, &op, 1100), &thread));
+ // Fake that the server is under memory pressure.
+ indicate_memory_pressure_ = true;
AssertEventually([&]() {
ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
});
manager_->UnregisterOp(&op);
- ThreadJoiner(thread.get()).Join();
}
// Test that ops are prioritized correctly when we add log retention.
@@ -257,15 +256,15 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
manager_->Shutdown();
- TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, test_tracker_);
+ TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
op1.set_ram_anchored(0);
op1.set_logs_retained_bytes(100 * kMB);
- TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+ TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE);
op2.set_ram_anchored(100);
op2.set_logs_retained_bytes(100 * kMB);
- TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+ TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE);
op3.set_ram_anchored(200);
op3.set_logs_retained_bytes(100 * kMB);
@@ -299,7 +298,7 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
for (int i = 0; i < 5; i++) {
string name = Substitute("op$0", i);
- TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+ TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
op.set_perf_improvement(1);
op.set_ram_anchored(100);
manager_->RegisterOp(&op);
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index eda71e8..d2206dc 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -29,8 +29,8 @@
#include "kudu/util/debug/trace_logging.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
-#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/process_memory.h"
#include "kudu/util/random_util.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/thread.h"
@@ -115,7 +115,6 @@ const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
.num_threads = 0,
.polling_interval_ms = 0,
.history_size = 0,
- .parent_mem_tracker = shared_ptr<MemTracker>(),
};
MaintenanceManager::MaintenanceManager(const Options& options)
@@ -128,9 +127,8 @@ MaintenanceManager::MaintenanceManager(const Options& options)
FLAGS_maintenance_manager_polling_interval_ms :
options.polling_interval_ms),
completed_ops_count_(0),
- parent_mem_tracker_(!options.parent_mem_tracker ?
- MemTracker::GetRootTracker() : options.parent_mem_tracker),
- rand_(GetRandomSeed32()) {
+ rand_(GetRandomSeed32()),
+ memory_pressure_func_(&process_memory::SoftLimitExceeded) {
CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
.set_max_threads(num_threads_).Build(&thread_pool_));
uint32_t history_size = options.history_size == 0 ?
@@ -364,7 +362,7 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
// Look at free memory. If it is dangerously low, we must select something
// that frees memory-- the op with the most anchored memory.
double capacity_pct;
- if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
+ if (memory_pressure_func_(&capacity_pct)) {
if (!most_mem_anchored_op) {
string msg = StringPrintf("we have exceeded our soft memory limit "
"(current capacity is %.2f%%). However, there are no ops currently "
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index 5c432a7..6070e2d 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -19,6 +19,7 @@
#include <stdint.h>
+#include <functional>
#include <map>
#include <memory>
#include <set>
@@ -42,7 +43,6 @@ template<class T>
class AtomicGauge;
class Histogram;
class MaintenanceManager;
-class MemTracker;
class MaintenanceOpStats {
public:
@@ -259,7 +259,6 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
int32_t num_threads;
int32_t polling_interval_ms;
uint32_t history_size;
- std::shared_ptr<MemTracker> parent_mem_tracker;
};
explicit MaintenanceManager(const Options& options);
@@ -278,6 +277,11 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
+ void set_memory_pressure_func_for_tests(std::function<bool(double*)> f) {
+ std::lock_guard<Mutex> guard(lock_);
+ memory_pressure_func_ = std::move(f);
+ }
+
static const Options DEFAULT_OPTIONS;
private:
@@ -307,10 +311,13 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
// the completed_ops_count_ % the vector's size and then the count needs to be incremented.
std::vector<CompletedOp> completed_ops_;
int64_t completed_ops_count_;
- std::shared_ptr<MemTracker> parent_mem_tracker_;
std::string server_uuid_;
Random rand_;
+ // Function which should return true if the server is under global memory pressure.
+ // This is indirected for testing purposes.
+ std::function<bool(double*)> memory_pressure_func_;
+
DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/mem_tracker-test.cc b/src/kudu/util/mem_tracker-test.cc
index 11bf2a2..7e78cbe 100644
--- a/src/kudu/util/mem_tracker-test.cc
+++ b/src/kudu/util/mem_tracker-test.cc
@@ -24,14 +24,9 @@
#include <utility>
#include <vector>
-#include <boost/bind.hpp>
-#include <gperftools/malloc_extension.h>
-
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/test_util.h"
-DECLARE_int32(memory_limit_soft_percentage);
-
namespace kudu {
using std::equal_to;
@@ -127,54 +122,6 @@ class GcFunctionHelper {
MemTracker* tracker_;
};
-TEST(MemTrackerTest, GcFunctions) {
- shared_ptr<MemTracker> t = MemTracker::CreateTracker(10, "");
- ASSERT_TRUE(t->has_limit());
-
- t->Consume(9);
- EXPECT_FALSE(t->LimitExceeded());
-
- // Test TryConsume()
- EXPECT_FALSE(t->TryConsume(2));
- EXPECT_EQ(t->consumption(), 9);
- EXPECT_FALSE(t->LimitExceeded());
-
- // Attach GcFunction that releases 1 byte
- GcFunctionHelper gc_func_helper(t.get());
- t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper));
- EXPECT_TRUE(t->TryConsume(2));
- EXPECT_EQ(t->consumption(), 10);
- EXPECT_FALSE(t->LimitExceeded());
-
- // GcFunction will be called even though TryConsume() fails
- EXPECT_FALSE(t->TryConsume(2));
- EXPECT_EQ(t->consumption(), 9);
- EXPECT_FALSE(t->LimitExceeded());
-
- // GcFunction won't be called
- EXPECT_TRUE(t->TryConsume(1));
- EXPECT_EQ(t->consumption(), 10);
- EXPECT_FALSE(t->LimitExceeded());
-
- // Test LimitExceeded()
- t->Consume(1);
- EXPECT_EQ(t->consumption(), 11);
- EXPECT_FALSE(t->LimitExceeded());
- EXPECT_EQ(t->consumption(), 10);
-
- // Add more GcFunctions, test that we only call them until the limit is no longer
- // exceeded
- GcFunctionHelper gc_func_helper2(t.get());
- t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper2));
- GcFunctionHelper gc_func_helper3(t.get());
- t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper3));
- t->Consume(1);
- EXPECT_EQ(t->consumption(), 11);
- EXPECT_FALSE(t->LimitExceeded());
- EXPECT_EQ(t->consumption(), 10);
- t->Release(10);
-}
-
TEST(MemTrackerTest, STLContainerAllocator) {
shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
MemTrackerAllocator<int> vec_alloc(t);
@@ -249,68 +196,6 @@ TEST(MemTrackerTest, ScopedTrackedConsumption) {
ASSERT_EQ(0, m->consumption());
}
-TEST(MemTrackerTest, SoftLimitExceeded) {
- const int kNumIters = 100000;
- const int kMemLimit = 1000;
- google::FlagSaver saver;
- FLAGS_memory_limit_soft_percentage = 0;
- shared_ptr<MemTracker> m = MemTracker::CreateTracker(kMemLimit, "test");
-
- // Consumption is 0; the soft limit is never exceeded.
- for (int i = 0; i < kNumIters; i++) {
- ASSERT_FALSE(m->SoftLimitExceeded(nullptr));
- }
-
- // Consumption is half of the actual limit, so we expect to exceed the soft
- // limit roughly half the time.
- ScopedTrackedConsumption consumption(m, kMemLimit / 2);
- int exceeded_count = 0;
- for (int i = 0; i < kNumIters; i++) {
- double current_percentage;
- if (m->SoftLimitExceeded(¤t_percentage)) {
- exceeded_count++;
- ASSERT_NEAR(50, current_percentage, 0.1);
- }
- }
- double exceeded_pct = static_cast<double>(exceeded_count) / kNumIters * 100;
- ASSERT_TRUE(exceeded_pct > 47 && exceeded_pct < 52);
-
- // Consumption is over the limit; the soft limit is always exceeded.
- consumption.Reset(kMemLimit + 1);
- for (int i = 0; i < kNumIters; i++) {
- double current_percentage;
- ASSERT_TRUE(m->SoftLimitExceeded(¤t_percentage));
- ASSERT_NEAR(100, current_percentage, 0.1);
- }
-}
-
-#ifdef TCMALLOC_ENABLED
-TEST(MemTrackerTest, TcMallocRootTracker) {
- shared_ptr<MemTracker> root = MemTracker::GetRootTracker();
-
- // The root tracker's consumption and tcmalloc should agree.
- size_t value;
- root->UpdateConsumption();
- ASSERT_TRUE(MallocExtension::instance()->GetNumericProperty(
- "generic.current_allocated_bytes", &value));
- ASSERT_EQ(value, root->consumption());
-
- // Explicit Consume() and Release() have no effect.
- root->Consume(100);
- ASSERT_EQ(value, root->consumption());
- root->Release(3);
- ASSERT_EQ(value, root->consumption());
-
- // But if we allocate something really big, we should see a change.
- gscoped_ptr<char[]> big_alloc(new char[4*1024*1024]);
- // clang in release mode can optimize out the above allocation unless
- // we do something with the pointer... so we just log it.
- VLOG(8) << static_cast<void*>(big_alloc.get());
- root->UpdateConsumption();
- ASSERT_GT(root->consumption(), value);
-}
-#endif
-
TEST(MemTrackerTest, CollisionDetection) {
shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "parent");
shared_ptr<MemTracker> c = MemTracker::CreateTracker(-1, "child", p);
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/mem_tracker.cc b/src/kudu/util/mem_tracker.cc
index b247cb4..88e1d23 100644
--- a/src/kudu/util/mem_tracker.cc
+++ b/src/kudu/util/mem_tracker.cc
@@ -23,49 +23,14 @@
#include <list>
#include <memory>
#include <mutex>
-#include <sstream>
-
-#include <gperftools/malloc_extension.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/once.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/debug-util.h"
-#include "kudu/util/debug/trace_event.h"
-#include "kudu/util/env.h"
-#include "kudu/util/flag_tags.h"
#include "kudu/util/mutex.h"
-#include "kudu/util/random_util.h"
+#include "kudu/util/process_memory.h"
#include "kudu/util/status.h"
-DEFINE_int64(memory_limit_hard_bytes, 0,
- "Maximum amount of memory this daemon should use, in bytes. "
- "A value of 0 autosizes based on the total system memory. "
- "A value of -1 disables all memory limiting.");
-TAG_FLAG(memory_limit_hard_bytes, stable);
-
-DEFINE_int32(memory_limit_soft_percentage, 60,
- "Percentage of the hard memory limit that this daemon may "
- "consume before memory throttling of writes begins. The greater "
- "the excess, the higher the chance of throttling. In general, a "
- "lower soft limit leads to smoother write latencies but "
- "decreased throughput, and vice versa for a higher soft limit.");
-TAG_FLAG(memory_limit_soft_percentage, advanced);
-
-DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
- "Percentage of the hard memory limit that this daemon may "
- "consume before WARNING level messages are periodically logged.");
-TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
-
-#ifdef TCMALLOC_ENABLED
-DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
- "Maximum percentage of the RSS that tcmalloc is allowed to use for "
- "reserved but unallocated memory.");
-TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
-#endif
-
namespace kudu {
// NOTE: this class has been adapted from Impala, so the code style varies
@@ -73,7 +38,6 @@ namespace kudu {
using std::deque;
using std::list;
-using std::ostringstream;
using std::shared_ptr;
using std::string;
using std::vector;
@@ -85,94 +49,29 @@ using strings::Substitute;
static shared_ptr<MemTracker> root_tracker;
static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT;
-// Total amount of memory from calls to Release() since the last GC. If this
-// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
-static Atomic64 released_memory_since_gc;
-
-// Validate that various flags are percentages.
-static bool ValidatePercentage(const char* flagname, int value) {
- if (value >= 0 && value <= 100) {
- return true;
- }
- LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
- flagname, value);
- return false;
-}
-static bool dummy[] = {
- google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage),
- google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage)
-#ifdef TCMALLOC_ENABLED
- ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage)
-#endif
-};
-
-#ifdef TCMALLOC_ENABLED
-static int64_t GetTCMallocProperty(const char* prop) {
- size_t value;
- if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
- LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
- }
- return value;
-}
-
-static int64_t GetTCMallocCurrentAllocatedBytes() {
- return GetTCMallocProperty("generic.current_allocated_bytes");
-}
-#endif
-
void MemTracker::CreateRootTracker() {
- int64_t limit = FLAGS_memory_limit_hard_bytes;
- if (limit == 0) {
- // If no limit is provided, we'll use 80% of system RAM.
- int64_t total_ram;
- CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
- limit = total_ram * 4;
- limit /= 5;
- }
-
- ConsumptionFunction f;
-#ifdef TCMALLOC_ENABLED
- f = &GetTCMallocCurrentAllocatedBytes;
-#endif
- root_tracker.reset(new MemTracker(f, limit, "root",
- shared_ptr<MemTracker>()));
+ root_tracker.reset(new MemTracker(-1, "root", shared_ptr<MemTracker>()));
root_tracker->Init();
- LOG(INFO) << StringPrintf("MemTracker: hard memory limit is %.6f GB",
- (static_cast<float>(limit) / (1024.0 * 1024.0 * 1024.0)));
- LOG(INFO) << StringPrintf("MemTracker: soft memory limit is %.6f GB",
- (static_cast<float>(root_tracker->soft_limit_) /
- (1024.0 * 1024.0 * 1024.0)));
}
shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit,
const string& id,
const shared_ptr<MemTracker>& parent) {
shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker();
- shared_ptr<MemTracker> tracker(
- new MemTracker(ConsumptionFunction(), byte_limit, id, real_parent));
+ shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, id, real_parent));
real_parent->AddChildTracker(tracker);
tracker->Init();
return tracker;
}
-MemTracker::MemTracker(ConsumptionFunction consumption_func, int64_t byte_limit,
- const string& id, shared_ptr<MemTracker> parent)
+MemTracker::MemTracker(int64_t byte_limit, const string& id, shared_ptr<MemTracker> parent)
: limit_(byte_limit),
id_(id),
descr_(Substitute("memory consumption for $0", id)),
parent_(std::move(parent)),
- consumption_(0),
- consumption_func_(std::move(consumption_func)),
- rand_(GetRandomSeed32()),
- enable_logging_(false),
- log_stack_(false) {
+ consumption_(0) {
VLOG(1) << "Creating tracker " << ToString();
- if (consumption_func_) {
- UpdateConsumption();
- }
- soft_limit_ = (limit_ == -1)
- ? -1 : (limit_ * FLAGS_memory_limit_soft_percentage) / 100;
}
MemTracker::~MemTracker() {
@@ -284,67 +183,35 @@ void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) {
}
}
-void MemTracker::UpdateConsumption() {
- DCHECK(!consumption_func_.empty());
- DCHECK(parent_.get() == NULL);
- consumption_.set_value(consumption_func_());
-}
-
void MemTracker::Consume(int64_t bytes) {
if (bytes < 0) {
Release(-bytes);
return;
}
- if (!consumption_func_.empty()) {
- UpdateConsumption();
- return;
- }
if (bytes == 0) {
return;
}
- if (PREDICT_FALSE(enable_logging_)) {
- LogUpdate(true, bytes);
- }
for (auto& tracker : all_trackers_) {
tracker->consumption_.IncrementBy(bytes);
- if (!tracker->consumption_func_.empty()) {
- DCHECK_GE(tracker->consumption_.current_value(), 0);
- }
}
}
bool MemTracker::TryConsume(int64_t bytes) {
- if (!consumption_func_.empty()) {
- UpdateConsumption();
- }
if (bytes <= 0) {
+ Release(-bytes);
return true;
}
- if (PREDICT_FALSE(enable_logging_)) {
- LogUpdate(true, bytes);
- }
int i = 0;
- // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent
- // won't accommodate the change.
+ // Walk the tracker tree top-down, consuming memory from each in turn.
for (i = all_trackers_.size() - 1; i >= 0; --i) {
MemTracker *tracker = all_trackers_[i];
if (tracker->limit_ < 0) {
tracker->consumption_.IncrementBy(bytes);
} else {
if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) {
- // One of the trackers failed, attempt to GC memory or expand our limit. If that
- // succeeds, TryUpdate() again. Bail if either fails.
- if (!tracker->GcMemory(tracker->limit_ - bytes) ||
- tracker->ExpandLimit(bytes)) {
- if (!tracker->consumption_.TryIncrementBy(
- bytes, tracker->limit_)) {
- break;
- }
- } else {
- break;
- }
+ break;
}
}
}
@@ -354,14 +221,10 @@ bool MemTracker::TryConsume(int64_t bytes) {
}
// Someone failed, roll back the ones that succeeded.
- // TODO: this doesn't roll it back completely since the max values for
+ // TODO(todd): this doesn't roll it back completely since the max values for
// the updated trackers aren't decremented. The max values are only used
// for error reporting so this is probably okay. Rolling those back is
// pretty hard; we'd need something like 2PC.
- //
- // TODO: This might leave us with an allocated resource that we can't use. Do we need
- // to adjust the consumption of the query tracker to stop the resource from never
- // getting used by a subsequent TryConsume()?
for (int j = all_trackers_.size() - 1; j > i; --j) {
all_trackers_[j]->consumption_.IncrementBy(-bytes);
}
@@ -374,35 +237,14 @@ void MemTracker::Release(int64_t bytes) {
return;
}
- if (PREDICT_FALSE(base::subtle::Barrier_AtomicIncrement(&released_memory_since_gc, bytes) >
- GC_RELEASE_SIZE)) {
- GcTcmalloc();
- }
-
- if (!consumption_func_.empty()) {
- UpdateConsumption();
- return;
- }
-
if (bytes == 0) {
return;
}
- if (PREDICT_FALSE(enable_logging_)) {
- LogUpdate(false, bytes);
- }
for (auto& tracker : all_trackers_) {
tracker->consumption_.IncrementBy(-bytes);
- // If a UDF calls FunctionContext::TrackAllocation() but allocates less than the
- // reported amount, the subsequent call to FunctionContext::Free() may cause the
- // process mem tracker to go negative until it is synced back to the tcmalloc
- // metric. Don't blow up in this case. (Note that this doesn't affect non-process
- // trackers since we can enforce that the reported memory usage is internally
- // consistent.)
- if (!tracker->consumption_func_.empty()) {
- DCHECK_GE(tracker->consumption_.current_value(), 0);
- }
}
+ process_memory::MaybeGCAfterRelease(bytes);
}
bool MemTracker::AnyLimitExceeded() {
@@ -414,55 +256,6 @@ bool MemTracker::AnyLimitExceeded() {
return false;
}
-bool MemTracker::LimitExceeded() {
- if (PREDICT_FALSE(CheckLimitExceeded())) {
- return GcMemory(limit_);
- }
- return false;
-}
-
-bool MemTracker::SoftLimitExceeded(double* current_capacity_pct) {
- // Did we exceed the actual limit?
- if (LimitExceeded()) {
- if (current_capacity_pct) {
- *current_capacity_pct =
- static_cast<double>(consumption()) / limit() * 100;
- }
- return true;
- }
-
- // No soft limit defined.
- if (!has_limit() || limit_ == soft_limit_) {
- return false;
- }
-
- // Are we under the soft limit threshold?
- int64_t usage = consumption();
- if (usage < soft_limit_) {
- return false;
- }
-
- // We're over the threshold; were we randomly chosen to be over the soft limit?
- if (usage + rand_.Uniform64(limit_ - soft_limit_) > limit_) {
- bool exceeded = GcMemory(soft_limit_);
- if (exceeded && current_capacity_pct) {
- *current_capacity_pct =
- static_cast<double>(consumption()) / limit() * 100;
- }
- return exceeded;
- }
- return false;
-}
-
-bool MemTracker::AnySoftLimitExceeded(double* current_capacity_pct) {
- for (MemTracker* t : limit_trackers_) {
- if (t->SoftLimitExceeded(current_capacity_pct)) {
- return true;
- }
- }
- return false;
-}
-
int64_t MemTracker::SpareCapacity() const {
int64_t result = std::numeric_limits<int64_t>::max();
for (const auto& tracker : limit_trackers_) {
@@ -472,84 +265,6 @@ int64_t MemTracker::SpareCapacity() const {
return result;
}
-bool MemTracker::GcMemory(int64_t max_consumption) {
- if (max_consumption < 0) {
- // Impossible to GC enough memory to reach the goal.
- return true;
- }
-
- std::lock_guard<simple_spinlock> l(gc_lock_);
- if (!consumption_func_.empty()) {
- UpdateConsumption();
- }
- uint64_t pre_gc_consumption = consumption();
- // Check if someone gc'd before us
- if (pre_gc_consumption < max_consumption) {
- return false;
- }
-
- // Try to free up some memory
- for (const auto& gc_function : gc_functions_) {
- gc_function();
- if (!consumption_func_.empty()) {
- UpdateConsumption();
- }
- if (consumption() <= max_consumption) {
- break;
- }
- }
-
- return consumption() > max_consumption;
-}
-
-void MemTracker::GcTcmalloc() {
-#ifdef TCMALLOC_ENABLED
- released_memory_since_gc = 0;
- TRACE_EVENT0("process", "MemTracker::GcTcmalloc");
-
- // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
- // not in use).
- int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
- // Bytes allocated by the application.
- int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
-
- int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0;
- if (bytes_overhead > max_overhead) {
- int64_t extra = bytes_overhead - max_overhead;
- while (extra > 0) {
- // Release 1MB at a time, so that tcmalloc releases its page heap lock
- // allowing other threads to make progress. This still disrupts the current
- // thread, but is better than disrupting all.
- MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
- extra -= 1024 * 1024;
- }
- }
-
-#else
- // Nothing to do if not using tcmalloc.
-#endif
-}
-
-string MemTracker::LogUsage(const string& prefix) const {
- ostringstream ss;
- ss << prefix << id_ << ":";
- if (CheckLimitExceeded()) {
- ss << " memory limit exceeded.";
- }
- if (limit_ > 0) {
- ss << " Limit=" << HumanReadableNumBytes::ToString(limit_);
- }
- ss << " Consumption=" << HumanReadableNumBytes::ToString(consumption());
-
- ostringstream prefix_ss;
- prefix_ss << prefix << " ";
- string new_prefix = prefix_ss.str();
- MutexLock l(child_trackers_lock_);
- if (!child_trackers_.empty()) {
- ss << "\n" << LogUsage(new_prefix, child_trackers_);
- }
- return ss.str();
-}
void MemTracker::Init() {
// populate all_trackers_ and limit_trackers_
@@ -568,28 +283,6 @@ void MemTracker::AddChildTracker(const shared_ptr<MemTracker>& tracker) {
tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker);
}
-void MemTracker::LogUpdate(bool is_consume, int64_t bytes) const {
- ostringstream ss;
- ss << this << " " << (is_consume ? "Consume: " : "Release: ") << bytes
- << " Consumption: " << consumption() << " Limit: " << limit_;
- if (log_stack_) {
- ss << std::endl << GetStackTrace();
- }
- LOG(ERROR) << ss.str();
-}
-
-string MemTracker::LogUsage(const string& prefix,
- const list<weak_ptr<MemTracker>>& trackers) {
- vector<string> usage_strings;
- for (const auto& child_weak : trackers) {
- shared_ptr<MemTracker> child = child_weak.lock();
- if (child) {
- usage_strings.push_back(child->LogUsage(prefix));
- }
- }
- return JoinStrings(usage_strings, "\n");
-}
-
shared_ptr<MemTracker> MemTracker::GetRootTracker() {
GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker);
return root_tracker;
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/mem_tracker.h b/src/kudu/util/mem_tracker.h
index cd00e22..a43e9a2 100644
--- a/src/kudu/util/mem_tracker.h
+++ b/src/kudu/util/mem_tracker.h
@@ -17,7 +17,6 @@
#ifndef KUDU_UTIL_MEM_TRACKER_H
#define KUDU_UTIL_MEM_TRACKER_H
-#include <boost/function.hpp>
#include <list>
#include <memory>
#include <stdint.h>
@@ -28,7 +27,6 @@
#include "kudu/util/high_water_mark.h"
#include "kudu/util/locks.h"
#include "kudu/util/mutex.h"
-#include "kudu/util/random.h"
namespace kudu {
@@ -39,8 +37,8 @@ class MemTracker;
// arranged into a tree structure such that the consumption tracked by a
// MemTracker is also tracked by its ancestors.
//
-// The MemTracker hierarchy is rooted in a single static MemTracker whose limit
-// is set via gflag. The root MemTracker always exists, and it is the common
+// The MemTracker hierarchy is rooted in a single static MemTracker.
+// The root MemTracker always exists, and it is the common
// ancestor to all MemTrackers. All operations that discover MemTrackers begin
// at the root and work their way down the tree, while operations that deal
// with adjusting memory consumption begin at a particular MemTracker and work
@@ -55,51 +53,21 @@ class MemTracker;
// and the parent has a weak reference to it. Both remain for the lifetime of
// the MemTracker.
//
-// By default, memory consumption is tracked via calls to Consume()/Release(), either to
-// the tracker itself or to one of its descendents. Alternatively, a consumption function
-// can specified, and then the function's value is used as the consumption rather than the
-// tally maintained by Consume() and Release(). A tcmalloc function is used to track process
-// memory consumption, since the process memory usage may be higher than the computed
-// total memory (tcmalloc does not release deallocated memory immediately).
-//
-// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is
-// reached. If LimitExceeded() is called and the limit is exceeded, it will first call the
-// GcFunctions to try to free memory and recheck the limit. For example, the process
-// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
-// this will be called before the process limit is reported as exceeded. GcFunctions are
-// called in the order they are added, so expensive functions should be added last.
+// Memory consumption is tracked via calls to Consume()/Release(), either to
+// the tracker itself or to one of its descendants.
//
// This class is thread-safe.
-//
-// NOTE: this class has been partially ported over from Impala with
-// several changes, and as a result the style differs somewhat from
-// the Kudu style.
-//
-// Changes from Impala:
-// 1) Id a string vs. a TUniqueId
-// 2) There is no concept of query trackers vs. pool trackers -- trackers are instead
-// associated with objects. Parent hierarchy is preserved, with the assumption that,
-// e.g., a tablet server's memtracker will have as its children the tablets' memtrackers,
-// which in turn will have memtrackers for their caches, logs, and so forth.
-//
-// TODO: this classes uses a lot of statics fields and methods, which
-// isn't common in Kudu. It is probably wise to later move the
-// 'registry' of trackers to a separate class, but it's better to
-// start using the 'class' *first* and then change this functionality,
-// depending on how MemTracker ends up being used in Kudu.
class MemTracker : public std::enable_shared_from_this<MemTracker> {
public:
-
- // Signature for function that can be called to free some memory after limit is reached.
- typedef boost::function<void ()> GcFunction;
-
~MemTracker();
// Creates and adds the tracker to the tree so that it can be retrieved with
// FindTracker/FindOrCreateTracker.
//
- // byte_limit < 0 means no limit; 'id' is a used as a label for LogUsage()
- // and web UI. Use the two-argument form if there is no parent.
+ // byte_limit < 0 means no limit; 'id' is a used as a label to uniquely identify
+ // the MemTracker for the below Find...() calls as well as the web UI.
+ //
+ // Use the two-argument form if there is no parent.
static std::shared_ptr<MemTracker> CreateTracker(
int64_t byte_limit,
const std::string& id,
@@ -133,19 +101,9 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
// Gets a shared_ptr to the "root" tracker, creating it if necessary.
static std::shared_ptr<MemTracker> GetRootTracker();
- // Updates consumption from the consumption function specified in the constructor.
- // NOTE: this method will crash if 'consumption_func_' is not set.
- void UpdateConsumption();
-
// Increases consumption of this tracker and its ancestors by 'bytes'.
void Consume(int64_t bytes);
- // Try to expand the limit (by asking the resource broker for more memory) by at least
- // 'bytes'. Returns false if not possible, true if the request succeeded. May allocate
- // more memory than was requested.
- // TODO: always returns false for now, not yet implemented.
- bool ExpandLimit(int64_t /* unused: bytes */) { return false; }
-
// Increases consumption of this tracker and its ancestors by 'bytes' only if
// they can all consume 'bytes'. If this brings any of them over, none of them
// are updated.
@@ -153,6 +111,9 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
bool TryConsume(int64_t bytes);
// Decreases consumption of this tracker and its ancestors by 'bytes'.
+ //
+ // This will also cause the process to periodically trigger tcmalloc "ReleaseMemory"
+ // to ensure that memory is released to the OS.
void Release(int64_t bytes);
// Returns true if a valid limit of this tracker or one of its ancestors is
@@ -162,21 +123,9 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
// If this tracker has a limit, checks the limit and attempts to free up some memory if
// the limit is exceeded by calling any added GC functions. Returns true if the limit is
// exceeded after calling the GC functions. Returns false if there is no limit.
- bool LimitExceeded();
-
- // Like LimitExceeded() but may also return true if the soft memory limit is exceeded.
- // The greater the excess, the higher the chance that it returns true.
- //
- // If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage
- // of the hard limit consumed is written to it.
- bool SoftLimitExceeded(double* current_capacity_pct);
-
- // Combines the semantics of AnyLimitExceeded() and SoftLimitExceeded().
- //
- // Note: if there's more than one soft limit defined, the probability of it being
- // exceeded in at least one tracker is much higher (as each soft limit check is an
- // independent event).
- bool AnySoftLimitExceeded(double* current_capacity_pct);
+ bool LimitExceeded() {
+ return limit_ >= 0 && limit_ < consumption();
+ }
// Returns the maximum consumption that can be made without exceeding the limit on
// this tracker or any of its parents. Returns int64_t::max() if there are no
@@ -193,62 +142,19 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
return consumption_.current_value();
}
- // Note that if consumption_ is based on consumption_func_, this
- // will be the max value we've recorded in consumption(), not
- // necessarily the highest value consumption_func_ has ever
- // reached.
int64_t peak_consumption() const { return consumption_.max_value(); }
// Retrieve the parent tracker, or NULL If one is not set.
std::shared_ptr<MemTracker> parent() const { return parent_; }
- // Add a function 'f' to be called if the limit is reached.
- // 'f' does not need to be thread-safe as long as it is added to only one MemTracker.
- // Note that 'f' must be valid for the lifetime of this MemTracker.
- void AddGcFunction(GcFunction f) {
- gc_functions_.push_back(f);
- }
-
- // Logs the usage of this tracker and all of its children (recursively).
- std::string LogUsage(const std::string& prefix = "") const;
-
- void EnableLogging(bool enable, bool log_stack) {
- enable_logging_ = enable;
- log_stack_ = log_stack;
- }
-
// Returns a textual representation of the tracker that is likely (but not
// guaranteed) to be globally unique.
std::string ToString() const;
private:
- // Function signatures for gauge-style memory trackers (where consumption is
- // periodically observed rather than explicitly tracked).
- //
- // Currently only used by the root tracker.
- typedef boost::function<uint64_t ()> ConsumptionFunction;
-
- // If consumption_func is not empty, uses it as the consumption value.
- // Consume()/Release() can still be called.
// byte_limit < 0 means no limit
// 'id' is the label for LogUsage() and web UI.
- MemTracker(ConsumptionFunction consumption_func, int64_t byte_limit,
- const std::string& id, std::shared_ptr<MemTracker> parent);
-
- bool CheckLimitExceeded() const {
- return limit_ >= 0 && limit_ < consumption();
- }
-
- // If consumption is higher than max_consumption, attempts to free memory by calling any
- // added GC functions. Returns true if max_consumption is still exceeded. Takes
- // gc_lock. Updates metrics if initialized.
- bool GcMemory(int64_t max_consumption);
-
- // Called when the total release memory is larger than GC_RELEASE_SIZE.
- // TcMalloc holds onto released memory and very slowly (if ever) releases it back to
- // the OS. This is problematic since it is memory we are not constantly tracking which
- // can cause us to go way over mem limits.
- void GcTcmalloc();
+ MemTracker(int64_t byte_limit, const std::string& id, std::shared_ptr<MemTracker> parent);
// Further initializes the tracker.
void Init();
@@ -256,12 +162,6 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
// Adds tracker to child_trackers_.
void AddChildTracker(const std::shared_ptr<MemTracker>& tracker);
- // Logs the stack of the current consume/release. Used for debugging only.
- void LogUpdate(bool is_consume, int64_t bytes) const;
-
- static std::string LogUsage(const std::string& prefix,
- const std::list<std::weak_ptr<MemTracker>>& trackers);
-
// Variant of FindTracker() that must be called with a non-NULL parent.
static bool FindTrackerInternal(
const std::string& id,
@@ -271,25 +171,13 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
// Creates the root tracker.
static void CreateRootTracker();
- // Size, in bytes, that is considered a large value for Release() (or Consume() with
- // a negative value). If tcmalloc is used, this can trigger it to GC.
- // A higher value will make us call into tcmalloc less often (and therefore more
- // efficient). A lower value will mean our memory overhead is lower.
- // TODO: this is a stopgap.
- static const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L;
-
- simple_spinlock gc_lock_;
-
int64_t limit_;
- int64_t soft_limit_;
const std::string id_;
const std::string descr_;
std::shared_ptr<MemTracker> parent_;
HighWaterMark consumption_;
- ConsumptionFunction consumption_func_;
-
// this tracker plus all of its ancestors
std::vector<MemTracker*> all_trackers_;
// all_trackers_ with valid limits
@@ -304,17 +192,6 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
// Iterator into parent_->child_trackers_ for this object. Stored to have O(1)
// remove.
std::list<std::weak_ptr<MemTracker>>::iterator child_tracker_it_;
-
- // Functions to call after the limit is reached to free memory.
- std::vector<GcFunction> gc_functions_;
-
- ThreadSafeRandom rand_;
-
- // If true, logs to INFO every consume/release called. Used for debugging.
- bool enable_logging_;
-
- // If true, log the stack as well.
- bool log_stack_;
};
// An std::allocator that manipulates a MemTracker during allocation
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/process_memory.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/process_memory.cc b/src/kudu/util/process_memory.cc
new file mode 100644
index 0000000..d3868c2
--- /dev/null
+++ b/src/kudu/util/process_memory.cc
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sys/resource.h>
+
+#include <gflags/gflags.h>
+#include <gperftools/malloc_extension.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random.h"
+
+DEFINE_int64(memory_limit_hard_bytes, 0,
+ "Maximum amount of memory this daemon should use, in bytes. "
+ "A value of 0 autosizes based on the total system memory. "
+ "A value of -1 disables all memory limiting.");
+TAG_FLAG(memory_limit_hard_bytes, stable);
+
+DEFINE_int32(memory_limit_soft_percentage, 60,
+ "Percentage of the hard memory limit that this daemon may "
+ "consume before memory throttling of writes begins. The greater "
+ "the excess, the higher the chance of throttling. In general, a "
+ "lower soft limit leads to smoother write latencies but "
+ "decreased throughput, and vice versa for a higher soft limit.");
+TAG_FLAG(memory_limit_soft_percentage, advanced);
+
+DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
+ "Percentage of the hard memory limit that this daemon may "
+ "consume before WARNING level messages are periodically logged.");
+TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
+
+#ifdef TCMALLOC_ENABLED
+DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
+ "Maximum percentage of the RSS that tcmalloc is allowed to use for "
+ "reserved but unallocated memory.");
+TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
+#endif
+
+using strings::Substitute;
+
+namespace kudu {
+namespace process_memory {
+
+namespace {
+int64_t g_hard_limit;
+int64_t g_soft_limit;
+
+ThreadSafeRandom* g_rand = nullptr;
+
+#ifdef TCMALLOC_ENABLED
+// Total amount of memory released since the last GC. If this
+// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
+Atomic64 g_released_memory_since_gc;
+
+// Size, in bytes, that is considered a large value for Release() (or Consume() with
+// a negative value). If tcmalloc is used, this can trigger it to GC.
+// A higher value will make us call into tcmalloc less often (and therefore more
+// efficient). A lower value will mean our memory overhead is lower.
+// TODO(todd): this is a stopgap.
+const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L;
+
+#endif // TCMALLOC_ENABLED
+
+} // anonymous namespace
+
+
+// Flag validation
+// ------------------------------------------------------------
+// Validate that various flags are percentages.
+static bool ValidatePercentage(const char* flagname, int value) {
+ if (value >= 0 && value <= 100) {
+ return true;
+ }
+ LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
+ flagname, value);
+ return false;
+}
+
+static bool dummy[] = {
+ google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage),
+ google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage)
+#ifdef TCMALLOC_ENABLED
+ ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage)
+#endif
+};
+
+
+// Wrappers around tcmalloc functionality
+// ------------------------------------------------------------
+#ifdef TCMALLOC_ENABLED
+static int64_t GetTCMallocProperty(const char* prop) {
+ size_t value;
+ if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
+ LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
+ }
+ return value;
+}
+
+static int64_t GetTCMallocCurrentAllocatedBytes() {
+ return GetTCMallocProperty("generic.current_allocated_bytes");
+}
+
+void GcTcmalloc() {
+ TRACE_EVENT0("process", "GcTcmalloc");
+
+ // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
+ // not in use).
+ int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
+ // Bytes allocated by the application.
+ int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
+
+ int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0;
+ if (bytes_overhead > max_overhead) {
+ int64_t extra = bytes_overhead - max_overhead;
+ while (extra > 0) {
+ // Release 1MB at a time, so that tcmalloc releases its page heap lock
+ // allowing other threads to make progress. This still disrupts the current
+ // thread, but is better than disrupting all.
+ MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
+ extra -= 1024 * 1024;
+ }
+ }
+}
+#endif // TCMALLOC_ENABLED
+
+
+// Consumption and soft memory limit behavior
+// ------------------------------------------------------------
+namespace {
+void InitLimits() {
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ int64_t limit = FLAGS_memory_limit_hard_bytes;
+ if (limit == 0) {
+ // If no limit is provided, we'll use 80% of system RAM.
+ int64_t total_ram;
+ CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
+ limit = total_ram * 4;
+ limit /= 5;
+ }
+ g_hard_limit = limit;
+ g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100;
+
+ g_rand = new ThreadSafeRandom(1);
+
+ LOG(INFO) << StringPrintf("Process hard memory limit is %.6f GB",
+ (static_cast<float>(g_hard_limit) / (1024.0 * 1024.0 * 1024.0)));
+ LOG(INFO) << StringPrintf("Process soft memory limit is %.6f GB",
+ (static_cast<float>(g_soft_limit) /
+ (1024.0 * 1024.0 * 1024.0)));
+ });
+}
+} // anonymous namespace
+
+int64_t CurrentConsumption() {
+ // TODO(todd): this is slow to call frequently, since it takes the tcmalloc
+ // global lock. We should look into whether it's faster to hook malloc/free
+ // and update a LongAdder instead, or otherwise make this more incrementally
+ // tracked.
+#ifdef TCMALLOC_ENABLED
+ return GetTCMallocCurrentAllocatedBytes();
+#else
+ // Without tcmalloc, we have no reliable way of determining our own heap
+ // size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back
+ // to just looking at the sum of our tracked memory.
+ return MemTracker::GetRootTracker()->consumption();
+#endif
+}
+
+int64_t HardLimit() {
+ return g_hard_limit;
+}
+
+bool SoftLimitExceeded(double* current_capacity_pct) {
+ InitLimits();
+ int64_t consumption = CurrentConsumption();
+ // Did we exceed the actual limit?
+ if (consumption > g_hard_limit) {
+ if (current_capacity_pct) {
+ *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+ }
+ return true;
+ }
+
+ // No soft limit defined.
+ if (g_hard_limit == g_soft_limit) {
+ return false;
+ }
+
+ // Are we under the soft limit threshold?
+ if (consumption < g_soft_limit) {
+ return false;
+ }
+
+ // We're over the threshold; were we randomly chosen to be over the soft limit?
+ if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > g_hard_limit) {
+ if (current_capacity_pct) {
+ *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+ }
+ return true;
+ }
+ return false;
+}
+
+void MaybeGCAfterRelease(int64_t released_bytes) {
+#ifdef TCMALLOC_ENABLED
+ int64_t now_released = base::subtle::NoBarrier_AtomicIncrement(
+ &g_released_memory_since_gc, -released_bytes);
+ if (PREDICT_FALSE(now_released > GC_RELEASE_SIZE)) {
+ base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0);
+ GcTcmalloc();
+ }
+#endif
+}
+
+} // namespace process_memory
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/process_memory.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/process_memory.h b/src/kudu/util/process_memory.h
new file mode 100644
index 0000000..2868bf5
--- /dev/null
+++ b/src/kudu/util/process_memory.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+
+namespace kudu {
+namespace process_memory {
+
+// Probabilistically returns true if the process-wide soft memory limit is exceeded.
+// The greater the excess, the higher the chance that it returns true.
+//
+// If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage
+// of the hard limit consumed is written to it.
+bool SoftLimitExceeded(double* current_capacity_pct);
+
+// Potentially trigger a call to release tcmalloc memory back to the
+// OS, after the given amount of memory was released.
+void MaybeGCAfterRelease(int64_t released_bytes);
+
+// Return the total current memory consumption of the process.
+int64_t CurrentConsumption();
+
+// Return the configured hard limit for the process.
+int64_t HardLimit();
+
+} // namespace process_memory
+} // namespace kudu
[3/3] kudu git commit: Improve unsupported application feature flags
error status
Posted by jd...@apache.org.
Improve unsupported application feature flags error status
It can be helpful to have the exact set of unsupported flags when
debugging compatibility issues.
Change-Id: Ie59fba2b5ff57a22d16c7b7eca55ab4581e9b64c
Reviewed-on: http://gerrit.cloudera.org:8080/6722
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1edaee01
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1edaee01
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1edaee01
Branch: refs/heads/master
Commit: 1edaee01f55f83eb1c8cb5f21daa11c3753d183b
Parents: 2be32d5
Author: Dan Burkert <da...@apache.org>
Authored: Mon Apr 24 18:28:42 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Apr 27 03:17:36 2017 +0000
----------------------------------------------------------------------
src/kudu/rpc/service_pool.cc | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/1edaee01/src/kudu/rpc/service_pool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index d089350..1a23ca9 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -24,11 +24,12 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/service_queue.h"
-#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
@@ -137,7 +138,10 @@ Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
if (!unsupported_features.empty()) {
c->RespondUnsupportedFeature(unsupported_features);
- return Status::NotSupported("call requires unsupported application feature flags");
+ return Status::NotSupported("call requires unsupported application feature flags",
+ JoinMapped(unsupported_features,
+ [] (uint32_t flag) { return std::to_string(flag); },
+ ", "));
}
TRACE_TO(c->trace(), "Inserting onto call queue");