You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/18 16:39:49 UTC
[impala] 04/05: IMPALA-8542. Add an access trace for the data cache
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit f4ef9ca87352406cc7a484e90ed1d1d702745810
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu May 23 23:36:19 2019 -0700
IMPALA-8542. Add an access trace for the data cache
This adds a relatively simple JSON-formatted access trace for the data
cache feature. Each partition stores a trace file named 'trace.txt',
with each line representing a hit, miss, or store into the cache.
The trace is collected using the kudu::AsyncLogger class which handles
buffering and deferring the actual IO to a background thread.
By default, the full cache key info is written to the trace (including
the file paths), but a flag can enable anonymization (128-bit
city-hashing) of the file names in case any user would like to capture a
trace to be shared publically without divulging their table names.
Change-Id: I2302c19abb5db19f1d3d1cd727a82977a9e2ba9c
Reviewed-on: http://gerrit.cloudera.org:8080/13425
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Michael Ho <kw...@cloudera.com>
---
be/src/runtime/io/data-cache-test.cc | 67 ++++++++++-
be/src/runtime/io/data-cache.cc | 221 ++++++++++++++++++++++++++++++++++-
be/src/runtime/io/data-cache.h | 13 +++
3 files changed, 297 insertions(+), 4 deletions(-)
diff --git a/be/src/runtime/io/data-cache-test.cc b/be/src/runtime/io/data-cache-test.cc
index d91065c..416dc2a 100644
--- a/be/src/runtime/io/data-cache-test.cc
+++ b/be/src/runtime/io/data-cache-test.cc
@@ -17,10 +17,14 @@
#include <algorithm>
#include <boost/bind.hpp>
+#include <fstream>
#include <gflags/gflags.h>
+#include <iostream>
+#include <rapidjson/document.h>
#include <sys/sysinfo.h>
#include "gutil/strings/join.h"
+#include "gutil/strings/util.h"
#include "runtime/io/data-cache.h"
#include "runtime/io/request-ranges.h"
#include "runtime/test-env.h"
@@ -43,6 +47,8 @@
#define TEST_BUFFER_SIZE (8192)
DECLARE_bool(cache_force_single_shard);
+DECLARE_bool(data_cache_anonymize_trace);
+DECLARE_bool(data_cache_enable_tracing);
DECLARE_int64(data_cache_file_max_size_bytes);
DECLARE_int32(data_cache_max_opened_files);
DECLARE_int32(data_cache_write_concurrency);
@@ -136,11 +142,16 @@ class DataCacheTest : public testing::Test {
// Delete all the test directories created.
virtual void TearDown() {
- // Make sure the cache's destructor removes all backing files.
+ // Make sure the cache's destructor removes all backing files, except for
+ // potentially the trace file.
for (const string& dir_path : data_cache_dirs_) {
vector<string> entries;
ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(dir_path, &entries));
- ASSERT_EQ(0, entries.size());
+ if (entries.size() == 1) {
+ EXPECT_EQ(DataCache::Partition::TRACE_FILE_NAME, entries[0]);
+ } else {
+ ASSERT_EQ(0, entries.size());
+ }
}
ASSERT_OK(FileSystemUtil::RemovePaths(data_cache_dirs_));
flag_saver_.reset();
@@ -495,6 +506,58 @@ TEST_F(DataCacheTest, LargeFootprint) {
}
}
+TEST_F(DataCacheTest, TestAccessTrace) {
+ FLAGS_data_cache_enable_tracing = true;
+ for (bool anon : { false, true }) {
+ SCOPED_TRACE(anon);
+ FLAGS_data_cache_anonymize_trace = anon;
+ {
+ int64_t cache_size = DEFAULT_CACHE_SIZE;
+ DataCache cache(Substitute(
+ "$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+ ASSERT_OK(cache.Init());
+
+ int64_t max_start_offset = 1024;
+ bool use_per_thread_filename = true;
+ bool expect_misses = true;
+ MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
+ expect_misses);
+ }
+
+ // Read the trace file and ensure that all of the lines are valid JSON with the
+ // expected fields.
+ std::ifstream trace(Substitute("$0/$1", data_cache_dirs()[0],
+ DataCache::Partition::TRACE_FILE_NAME));
+ int line_num = 1;
+ while (trace.good()) {
+ SCOPED_TRACE(line_num++);
+ string line;
+ std::getline(trace, line);
+ if (line.empty()) {
+ ASSERT_TRUE(trace.eof());
+ break;
+ }
+ SCOPED_TRACE(line);
+ rapidjson::Document d;
+ d.Parse<0>(line.c_str());
+ ASSERT_TRUE(d.IsObject());
+ ASSERT_TRUE(d["ts"].IsDouble());
+ ASSERT_TRUE(d["s"].IsString());
+ ASSERT_TRUE(d["m"].IsInt64());
+ ASSERT_TRUE(d["f"].IsString());
+ if (anon) {
+ // We expect anonymized filenames to be 22-character fingerprints:
+ // - 128 bit fingerprint = 16 bytes
+ // - 16 * 4/3 (base64 encoding) = 21.3333
+ // - Round up to 22.
+ EXPECT_EQ(22, d["f"].GetStringLength());
+ } else {
+ EXPECT_TRUE(MatchPattern(d["f"].GetString(), "thread-*file"));
+ }
+ }
+ }
+}
+
} // namespace io
} // namespace impala
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 70b7aeb..e4d01a2 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -23,13 +23,22 @@
#include <mutex>
#include <string.h>
#include <unistd.h>
+#include <sstream>
+
+#include <glog/logging.h>
#include "exec/kudu-util.h"
+#include "kudu/util/async_logger.h"
#include "kudu/util/cache.h"
#include "kudu/util/env.h"
+#include "kudu/util/jsonwriter.h"
#include "kudu/util/locks.h"
#include "kudu/util/path_util.h"
+#include "gutil/hash/city.h"
+#include "gutil/port.h"
+#include "gutil/strings/escaping.h"
#include "gutil/strings/split.h"
+#include "gutil/walltime.h"
#include "util/bit-util.h"
#include "util/error-util.h"
#include "util/filesystem-util.h"
@@ -47,12 +56,14 @@
#include "common/names.h"
+using kudu::Env;
using kudu::faststring;
using kudu::JoinPathSegments;
using kudu::percpu_rwlock;
using kudu::RWFile;
using kudu::rw_spinlock;
using kudu::Slice;
+using kudu::WritableFile;
using strings::SkipEmpty;
using strings::Split;
@@ -74,13 +85,115 @@ DEFINE_int32(data_cache_write_concurrency, 1,
DEFINE_bool(data_cache_checksum, ENABLE_CHECKSUMMING,
"(Advanced) Enable checksumming for the cached buffer.");
+DEFINE_bool(data_cache_enable_tracing, false,
+ "(Advanced) Collect a trace of all lookups in the data cache.");
+DEFINE_bool(data_cache_anonymize_trace, false,
+ "(Advanced) Use hashes of filenames rather than file paths in the data "
+ "cache access trace.");
+
namespace impala {
namespace io {
static const int64_t PAGE_SIZE = 1L << 12;
const char* DataCache::Partition::CACHE_FILE_PREFIX = "impala-cache-file-";
+const char* DataCache::Partition::TRACE_FILE_NAME = "impala-cache-trace.txt";
const int MAX_FILE_DELETER_QUEUE_SIZE = 500;
+
+namespace {
+
+class FileLogger;
+
+// Simple implementation of a glog Logger that writes to a file, used for
+// cache access tracing.
+//
+// This doesn't fully implement the Logger interface -- only the bare minimum
+// to be usable with kudu::AsyncLogger.
+class FileLogger : public google::base::Logger {
+ public:
+ explicit FileLogger(string path) : path_(std::move(path)) {}
+
+ virtual ~FileLogger() {
+ if (file_) Flush();
+ }
+
+ Status Open() {
+ KUDU_RETURN_IF_ERROR(Env::Default()->NewWritableFile({}, path_, &file_),
+ "Failed to create trace log file");
+ return Status::OK();
+ }
+
+ void Write(bool force_flush,
+ time_t timestamp,
+ const char* message,
+ int message_len) override {
+ buf_.append(message, message_len);
+ if (force_flush || buf_.size() > kBufSize) {
+ Flush();
+ }
+ }
+
+ // Flush any buffered messages.
+ // NOTE: declared 'final' to allow safe calls from the destructor.
+ void Flush() override final {
+ if (buf_.empty()) return;
+
+ KUDU_WARN_NOT_OK(file_->Append(buf_), "Could not append to trace log");
+ buf_.clear();
+ }
+
+ uint32 LogSize() override {
+ LOG(FATAL) << "Unimplemented";
+ return 0;
+ }
+
+ private:
+ const string path_;
+ string buf_;
+ unique_ptr<WritableFile> file_;
+
+ static constexpr int kBufSize = 64*1024;
+};
+
+} // anonymous namespace
+
+class DataCache::Partition::Tracer {
+ public:
+ explicit Tracer(string path) : underlying_logger_(new FileLogger(std::move(path))) {}
+
+ ~Tracer() {
+ if (logger_) logger_->Stop();
+ }
+
+ Status Start() {
+ RETURN_IF_ERROR(underlying_logger_->Open());
+ logger_.reset(new kudu::AsyncLogger(underlying_logger_.get(), 8 * 1024 * 1024));
+ logger_->Start();
+ return Status::OK();
+ }
+
+ enum CacheStatus {
+ HIT,
+ MISS,
+ STORE,
+ STORE_FAILED_BUSY
+ };
+
+ void Trace(CacheStatus status, const DataCache::CacheKey& key,
+ int64_t lookup_len, int64_t entry_len);
+
+ private:
+ // The underlying logger that we wrap with the AsyncLogger wrapper
+ // 'logger_'. NOTE: AsyncLogger consumes a raw pointer which must
+ // outlive the AsyncLogger instance, so it's important that these
+ // are declared in this order (logger_ must destruct before
+ // underlying_logger_).
+ unique_ptr<FileLogger> underlying_logger_;
+ // The async wrapper around underlying_logger_ (see above).
+ unique_ptr<kudu::AsyncLogger> logger_;
+};
+
+
/// This class is an implementation of backing files in a cache partition.
///
/// A partition uses the interface Create() to create a backing file. A reader can read
@@ -289,20 +402,40 @@ struct DataCache::CacheKey {
: key_(filename.size() + sizeof(mtime) + sizeof(offset)) {
DCHECK_GE(mtime, 0);
DCHECK_GE(offset, 0);
- key_.append(filename);
key_.append(&mtime, sizeof(mtime));
key_.append(&offset, sizeof(offset));
+ key_.append(filename);
}
int64_t Hash() const {
return HashUtil::FastHash64(key_.data(), key_.size(), 0);
}
+ Slice filename() const {
+ return Slice(key_.data() + OFFSETOF_FILENAME, key_.size() - OFFSETOF_FILENAME);
+ }
+
+ int64_t mtime() const {
+ return UNALIGNED_LOAD64(key_.data() + OFFSETOF_MTIME);
+ }
+
+ int64_t offset() const {
+ return UNALIGNED_LOAD64(key_.data() + OFFSETOF_OFFSET);
+ }
+
Slice ToSlice() const {
return key_;
}
private:
+ // Key encoding stored in key_:
+ //
+ // int64_t mtime;
+ // int64_t offset;
+ // <variable length bytes> filename;
+ static constexpr int OFFSETOF_MTIME = 0;
+ static constexpr int OFFSETOF_OFFSET = OFFSETOF_MTIME + sizeof(int64_t);
+ static constexpr int OFFSETOF_FILENAME = OFFSETOF_OFFSET + sizeof(int64_t);
faststring key_;
};
@@ -369,6 +502,11 @@ Status DataCache::Partition::Init() {
// Make sure hole punching is supported for the caching directory.
RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(path_));
+ if (FLAGS_data_cache_enable_tracing) {
+ tracer_.reset(new Tracer(path_ + "/" + TRACE_FILE_NAME));
+ RETURN_IF_ERROR(tracer_->Start());
+ }
+
// Create a backing file for the partition.
RETURN_IF_ERROR(CreateCacheFile());
oldest_opened_file_ = 0;
@@ -414,12 +552,24 @@ int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to
Slice key = cache_key.ToSlice();
kudu::Cache::Handle* handle =
meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE);
- if (handle == nullptr) return 0;
+
+
+ if (handle == nullptr) {
+ if (tracer_ != nullptr) {
+ tracer_->Trace(Tracer::MISS, cache_key, bytes_to_read, /*entry_len=*/-1);
+ }
+ return 0;
+ }
auto handle_release =
MakeScopeExitTrigger([this, &handle]() { meta_cache_->Release(handle); });
// Read from the backing file.
CacheEntry entry(meta_cache_->Value(handle));
+
+ if (tracer_ != nullptr) {
+ tracer_->Trace(Tracer::HIT, cache_key, bytes_to_read, entry.len());
+ }
+
CacheFile* cache_file = entry.file();
bytes_to_read = min(entry.len(), bytes_to_read);
VLOG(3) << Substitute("Reading file $0 offset $1 len $2 checksum $3 bytes_to_read $4",
@@ -521,6 +671,10 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
if (exceed_concurrency ||
pending_insert_set_.find(key.ToString()) != pending_insert_set_.end()) {
ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES->Increment(buffer_len);
+ if (tracer_ != nullptr) {
+ tracer_->Trace(Tracer::STORE_FAILED_BUSY, cache_key, /*lookup_len=*/-1,
+ buffer_len);
+ }
return false;
}
@@ -543,6 +697,10 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
pending_insert_set_.emplace(key.ToString());
}
+ if (tracer_ != nullptr) {
+ tracer_->Trace(Tracer::STORE, cache_key, /* lookup_len=*/-1, buffer_len);
+ }
+
// Set up a scoped exit to always remove entry from the pending insertion set.
auto remove_from_pending_set = MakeScopeExitTrigger([this, &key]() {
std::unique_lock<SpinLock> partition_lock(lock_);
@@ -718,5 +876,64 @@ void DataCache::DeleteOldFiles(uint32_t thread_id, int partition_idx) {
partitions_[partition_idx]->DeleteOldFiles();
}
+void DataCache::Partition::Tracer::Trace(
+ CacheStatus status, const DataCache::CacheKey& key,
+ int64_t lookup_len, int64_t entry_len) {
+
+ ostringstream buf;
+ kudu::JsonWriter jw(&buf, kudu::JsonWriter::COMPACT);
+
+ jw.StartObject();
+ jw.String("ts");
+ jw.Double(WallTime_Now());
+ jw.String("s");
+ switch (status) {
+ case HIT: jw.String("H"); break;
+ case MISS: jw.String("M"); break;
+ case STORE: jw.String("S"); break;
+ case STORE_FAILED_BUSY: jw.String("F"); break;
+ }
+
+ jw.String("f");
+ if (FLAGS_data_cache_anonymize_trace) {
+ uint128 hash = util_hash::CityHash128(
+ reinterpret_cast<const char*>(key.filename().data()),
+ key.filename().size());
+ // A 128-bit (16-byte) hash results in a 24-byte base64-encoded string, including
+ // two characters of padding.
+ const int BUF_LEN = 24;
+ DCHECK_EQ(BUF_LEN, CalculateBase64EscapedLen(sizeof(hash)));
+ char b64_buf[BUF_LEN];
+ int out_len = Base64Escape(reinterpret_cast<const unsigned char*>(&hash),
+ sizeof(hash), b64_buf, BUF_LEN);
+ DCHECK_EQ(out_len, BUF_LEN);
+ // Chop off the two padding bytes.
+ DCHECK(b64_buf[23] == '=' && b64_buf[22] == '=');
+ out_len = 22;
+ jw.String(b64_buf, out_len);
+ } else {
+ jw.String(reinterpret_cast<const char*>(key.filename().data()),
+ key.filename().size());
+ }
+ jw.String("m");
+ jw.Int64(key.mtime());
+ jw.String("o");
+ jw.Int64(key.offset());
+
+ if (lookup_len != -1) {
+ jw.String("lLen");
+ jw.Int64(lookup_len);
+ }
+ if (entry_len != -1) {
+ jw.String("eLen");
+ jw.Int64(entry_len);
+ }
+ jw.EndObject();
+ buf << "\n";
+
+ string s = buf.str();
+ logger_->Write(/*force_flush=*/false, /*timestamp=*/0, s.data(), s.size());
+}
+
} // namespace io
} // namespace impala
diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h
index 6a290d9..30cdaf8 100644
--- a/be/src/runtime/io/data-cache.h
+++ b/be/src/runtime/io/data-cache.h
@@ -22,6 +22,7 @@
#include <unistd.h>
#include <unordered_map>
#include <unordered_set>
+#include <gtest/gtest_prod.h>
#include "common/status.h"
#include "util/spinlock.h"
@@ -182,6 +183,9 @@ class DataCache {
Status CloseFilesAndVerifySizes();
private:
+ friend class DataCacheTest;
+ FRIEND_TEST(DataCacheTest, TestAccessTrace);
+
class CacheFile;
struct CacheKey;
class CacheEntry;
@@ -245,6 +249,10 @@ class DataCache {
void DeleteOldFiles();
private:
+ friend class DataCacheTest;
+ FRIEND_TEST(DataCacheTest, TestAccessTrace);
+ class Tracer;
+
/// The directory path which this partition stores cached data in.
const std::string path_;
@@ -261,6 +269,9 @@ class DataCache {
/// The prefix of the names of the cache backing files.
static const char* CACHE_FILE_PREFIX;
+ /// The file name used for the access trace.
+ static const char* TRACE_FILE_NAME;
+
/// Protects the following fields.
SpinLock lock_;
@@ -289,6 +300,8 @@ class DataCache {
/// content. Please see comments at CachedEntry for details.
std::unique_ptr<kudu::Cache> meta_cache_;
+ std::unique_ptr<Tracer> tracer_;
+
/// Utility function for creating a new backing file in 'path_'. The cache
/// partition's lock needs to be held when calling this function. Returns
/// error on failure.