You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/18 16:39:49 UTC

[impala] 04/05: IMPALA-8542. Add an access trace for the data cache

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f4ef9ca87352406cc7a484e90ed1d1d702745810
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu May 23 23:36:19 2019 -0700

    IMPALA-8542. Add an access trace for the data cache
    
    This adds a relatively simple JSON-formatted access trace for the data
    cache feature. Each partition stores a trace file named 'trace.txt',
    with each line representing a hit, miss, or store into the cache.
    
    The trace is collected using the kudu::AsyncLogger class which handles
    buffering and deferring the actual IO to a background thread.
    
    By default, the full cache key info is written to the trace (including
    the file paths), but a flag can enable anonymization (128-bit
    city-hashing) of the file names in case any user would like to capture a
    trace to be shared publically without divulging their table names.
    
    Change-Id: I2302c19abb5db19f1d3d1cd727a82977a9e2ba9c
    Reviewed-on: http://gerrit.cloudera.org:8080/13425
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Michael Ho <kw...@cloudera.com>
---
 be/src/runtime/io/data-cache-test.cc |  67 ++++++++++-
 be/src/runtime/io/data-cache.cc      | 221 ++++++++++++++++++++++++++++++++++-
 be/src/runtime/io/data-cache.h       |  13 +++
 3 files changed, 297 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/io/data-cache-test.cc b/be/src/runtime/io/data-cache-test.cc
index d91065c..416dc2a 100644
--- a/be/src/runtime/io/data-cache-test.cc
+++ b/be/src/runtime/io/data-cache-test.cc
@@ -17,10 +17,14 @@
 
 #include <algorithm>
 #include <boost/bind.hpp>
+#include <fstream>
 #include <gflags/gflags.h>
+#include <iostream>
+#include <rapidjson/document.h>
 #include <sys/sysinfo.h>
 
 #include "gutil/strings/join.h"
+#include "gutil/strings/util.h"
 #include "runtime/io/data-cache.h"
 #include "runtime/io/request-ranges.h"
 #include "runtime/test-env.h"
@@ -43,6 +47,8 @@
 #define TEST_BUFFER_SIZE   (8192)
 
 DECLARE_bool(cache_force_single_shard);
+DECLARE_bool(data_cache_anonymize_trace);
+DECLARE_bool(data_cache_enable_tracing);
 DECLARE_int64(data_cache_file_max_size_bytes);
 DECLARE_int32(data_cache_max_opened_files);
 DECLARE_int32(data_cache_write_concurrency);
@@ -136,11 +142,16 @@ class DataCacheTest : public testing::Test {
 
   // Delete all the test directories created.
   virtual void TearDown() {
-    // Make sure the cache's destructor removes all backing files.
+    // Make sure the cache's destructor removes all backing files, except for
+    // potentially the trace file.
     for (const string& dir_path : data_cache_dirs_) {
       vector<string> entries;
       ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(dir_path, &entries));
-      ASSERT_EQ(0, entries.size());
+      if (entries.size() == 1) {
+        EXPECT_EQ(DataCache::Partition::TRACE_FILE_NAME, entries[0]);
+      } else {
+        ASSERT_EQ(0, entries.size());
+      }
     }
     ASSERT_OK(FileSystemUtil::RemovePaths(data_cache_dirs_));
     flag_saver_.reset();
@@ -495,6 +506,58 @@ TEST_F(DataCacheTest, LargeFootprint) {
   }
 }
 
+TEST_F(DataCacheTest, TestAccessTrace) {
+  FLAGS_data_cache_enable_tracing = true;
+  for (bool anon : { false, true }) {
+    SCOPED_TRACE(anon);
+    FLAGS_data_cache_anonymize_trace = anon;
+    {
+      int64_t cache_size = DEFAULT_CACHE_SIZE;
+      DataCache cache(Substitute(
+          "$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+      ASSERT_OK(cache.Init());
+
+      int64_t max_start_offset = 1024;
+      bool use_per_thread_filename = true;
+      bool expect_misses = true;
+      MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
+                             expect_misses);
+    }
+
+    // Read the trace file and ensure that all of the lines are valid JSON with the
+    // expected fields.
+    std::ifstream trace(Substitute("$0/$1", data_cache_dirs()[0],
+                                   DataCache::Partition::TRACE_FILE_NAME));
+    int line_num = 1;
+    while (trace.good()) {
+      SCOPED_TRACE(line_num++);
+      string line;
+      std::getline(trace, line);
+      if (line.empty()) {
+        ASSERT_TRUE(trace.eof());
+        break;
+      }
+      SCOPED_TRACE(line);
+      rapidjson::Document d;
+      d.Parse<0>(line.c_str());
+      ASSERT_TRUE(d.IsObject());
+      ASSERT_TRUE(d["ts"].IsDouble());
+      ASSERT_TRUE(d["s"].IsString());
+      ASSERT_TRUE(d["m"].IsInt64());
+      ASSERT_TRUE(d["f"].IsString());
+      if (anon) {
+        // We expect anonymized filenames to be 22-character fingerprints:
+        // - 128 bit fingerprint = 16 bytes
+        // - 16 * 4/3 (base64 encoding) = 21.3333
+        // - Round up to 22.
+        EXPECT_EQ(22, d["f"].GetStringLength());
+      } else {
+        EXPECT_TRUE(MatchPattern(d["f"].GetString(), "thread-*file"));
+      }
+    }
+  }
+}
+
 } // namespace io
 } // namespace impala
 
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 70b7aeb..e4d01a2 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -23,13 +23,22 @@
 #include <mutex>
 #include <string.h>
 #include <unistd.h>
+#include <sstream>
+
+#include <glog/logging.h>
 
 #include "exec/kudu-util.h"
+#include "kudu/util/async_logger.h"
 #include "kudu/util/cache.h"
 #include "kudu/util/env.h"
+#include "kudu/util/jsonwriter.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/path_util.h"
+#include "gutil/hash/city.h"
+#include "gutil/port.h"
+#include "gutil/strings/escaping.h"
 #include "gutil/strings/split.h"
+#include "gutil/walltime.h"
 #include "util/bit-util.h"
 #include "util/error-util.h"
 #include "util/filesystem-util.h"
@@ -47,12 +56,14 @@
 
 #include "common/names.h"
 
+using kudu::Env;
 using kudu::faststring;
 using kudu::JoinPathSegments;
 using kudu::percpu_rwlock;
 using kudu::RWFile;
 using kudu::rw_spinlock;
 using kudu::Slice;
+using kudu::WritableFile;
 using strings::SkipEmpty;
 using strings::Split;
 
@@ -74,13 +85,115 @@ DEFINE_int32(data_cache_write_concurrency, 1,
 DEFINE_bool(data_cache_checksum, ENABLE_CHECKSUMMING,
     "(Advanced) Enable checksumming for the cached buffer.");
 
+DEFINE_bool(data_cache_enable_tracing, false,
+    "(Advanced) Collect a trace of all lookups in the data cache.");
+DEFINE_bool(data_cache_anonymize_trace, false,
+    "(Advanced) Use hashes of filenames rather than file paths in the data "
+    "cache access trace.");
+
 namespace impala {
 namespace io {
 
 static const int64_t PAGE_SIZE = 1L << 12;
 const char* DataCache::Partition::CACHE_FILE_PREFIX = "impala-cache-file-";
+const char* DataCache::Partition::TRACE_FILE_NAME = "impala-cache-trace.txt";
 const int MAX_FILE_DELETER_QUEUE_SIZE = 500;
 
+
+namespace {
+
+class FileLogger;
+
+// Simple implementation of a glog Logger that writes to a file, used for
+// cache access tracing.
+//
+// This doesn't fully implement the Logger interface -- only the bare minimum
+// to be usable with kudu::AsyncLogger.
+class FileLogger : public google::base::Logger {
+ public:
+  explicit FileLogger(string path) : path_(std::move(path)) {}
+
+  virtual ~FileLogger() {
+    if (file_) Flush();
+  }
+
+  Status Open() {
+    KUDU_RETURN_IF_ERROR(Env::Default()->NewWritableFile({}, path_, &file_),
+                         "Failed to create trace log file");
+    return Status::OK();
+  }
+
+  void Write(bool force_flush,
+             time_t timestamp,
+             const char* message,
+             int message_len) override {
+    buf_.append(message, message_len);
+    if (force_flush || buf_.size() > kBufSize) {
+      Flush();
+    }
+  }
+
+  // Flush any buffered messages.
+  // NOTE: declared 'final' to allow safe calls from the destructor.
+  void Flush() override final {
+    if (buf_.empty()) return;
+
+    KUDU_WARN_NOT_OK(file_->Append(buf_), "Could not append to trace log");
+    buf_.clear();
+  }
+
+  uint32 LogSize() override {
+    LOG(FATAL) << "Unimplemented";
+    return 0;
+  }
+
+ private:
+  const string path_;
+  string buf_;
+  unique_ptr<WritableFile> file_;
+
+  static constexpr int kBufSize = 64*1024;
+};
+
+} // anonymous namespace
+
+class DataCache::Partition::Tracer {
+ public:
+  explicit Tracer(string path) : underlying_logger_(new FileLogger(std::move(path))) {}
+
+  ~Tracer() {
+    if (logger_) logger_->Stop();
+  }
+
+  Status Start() {
+    RETURN_IF_ERROR(underlying_logger_->Open());
+    logger_.reset(new kudu::AsyncLogger(underlying_logger_.get(), 8 * 1024 * 1024));
+    logger_->Start();
+    return Status::OK();
+  }
+
+  enum CacheStatus {
+    HIT,
+    MISS,
+    STORE,
+    STORE_FAILED_BUSY
+  };
+
+  void Trace(CacheStatus status, const DataCache::CacheKey& key,
+             int64_t lookup_len, int64_t entry_len);
+
+ private:
+  // The underlying logger that we wrap with the AsyncLogger wrapper
+  // 'logger_'. NOTE: AsyncLogger consumes a raw pointer which must
+  // outlive the AsyncLogger instance, so it's important that these
+  // are declared in this order (logger_ must destruct before
+  // underlying_logger_).
+  unique_ptr<FileLogger> underlying_logger_;
+  // The async wrapper around underlying_logger_ (see above).
+  unique_ptr<kudu::AsyncLogger> logger_;
+};
+
+
 /// This class is an implementation of backing files in a cache partition.
 ///
 /// A partition uses the interface Create() to create a backing file. A reader can read
@@ -289,20 +402,40 @@ struct DataCache::CacheKey {
     : key_(filename.size() + sizeof(mtime) + sizeof(offset)) {
     DCHECK_GE(mtime, 0);
     DCHECK_GE(offset, 0);
-    key_.append(filename);
     key_.append(&mtime, sizeof(mtime));
     key_.append(&offset, sizeof(offset));
+    key_.append(filename);
   }
 
   int64_t Hash() const {
     return HashUtil::FastHash64(key_.data(), key_.size(), 0);
   }
 
+  Slice filename() const {
+    return Slice(key_.data() + OFFSETOF_FILENAME, key_.size() - OFFSETOF_FILENAME);
+  }
+
+  int64_t mtime() const {
+    return UNALIGNED_LOAD64(key_.data() + OFFSETOF_MTIME);
+  }
+
+  int64_t offset() const {
+    return UNALIGNED_LOAD64(key_.data() + OFFSETOF_OFFSET);
+  }
+
   Slice ToSlice() const {
     return key_;
   }
 
  private:
+  // Key encoding stored in key_:
+  //
+  //  int64_t mtime;
+  //  int64_t offset;
+  //  <variable length bytes> filename;
+  static constexpr int OFFSETOF_MTIME = 0;
+  static constexpr int OFFSETOF_OFFSET = OFFSETOF_MTIME + sizeof(int64_t);
+  static constexpr int OFFSETOF_FILENAME = OFFSETOF_OFFSET + sizeof(int64_t);
   faststring key_;
 };
 
@@ -369,6 +502,11 @@ Status DataCache::Partition::Init() {
   // Make sure hole punching is supported for the caching directory.
   RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(path_));
 
+  if (FLAGS_data_cache_enable_tracing) {
+    tracer_.reset(new Tracer(path_ + "/" + TRACE_FILE_NAME));
+    RETURN_IF_ERROR(tracer_->Start());
+  }
+
   // Create a backing file for the partition.
   RETURN_IF_ERROR(CreateCacheFile());
   oldest_opened_file_ = 0;
@@ -414,12 +552,24 @@ int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to
   Slice key = cache_key.ToSlice();
   kudu::Cache::Handle* handle =
       meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE);
-  if (handle == nullptr) return 0;
+
+
+  if (handle == nullptr) {
+    if (tracer_ != nullptr) {
+      tracer_->Trace(Tracer::MISS, cache_key, bytes_to_read, /*entry_len=*/-1);
+    }
+    return 0;
+  }
   auto handle_release =
       MakeScopeExitTrigger([this, &handle]() { meta_cache_->Release(handle); });
 
   // Read from the backing file.
   CacheEntry entry(meta_cache_->Value(handle));
+
+  if (tracer_ != nullptr) {
+    tracer_->Trace(Tracer::HIT, cache_key, bytes_to_read, entry.len());
+  }
+
   CacheFile* cache_file = entry.file();
   bytes_to_read = min(entry.len(), bytes_to_read);
   VLOG(3) << Substitute("Reading file $0 offset $1 len $2 checksum $3 bytes_to_read $4",
@@ -521,6 +671,10 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
     if (exceed_concurrency ||
         pending_insert_set_.find(key.ToString()) != pending_insert_set_.end()) {
       ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES->Increment(buffer_len);
+      if (tracer_ != nullptr) {
+        tracer_->Trace(Tracer::STORE_FAILED_BUSY, cache_key, /*lookup_len=*/-1,
+            buffer_len);
+      }
       return false;
     }
 
@@ -543,6 +697,10 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
     pending_insert_set_.emplace(key.ToString());
   }
 
+  if (tracer_ != nullptr) {
+    tracer_->Trace(Tracer::STORE, cache_key, /* lookup_len=*/-1, buffer_len);
+  }
+
   // Set up a scoped exit to always remove entry from the pending insertion set.
   auto remove_from_pending_set = MakeScopeExitTrigger([this, &key]() {
     std::unique_lock<SpinLock> partition_lock(lock_);
@@ -718,5 +876,64 @@ void DataCache::DeleteOldFiles(uint32_t thread_id, int partition_idx) {
   partitions_[partition_idx]->DeleteOldFiles();
 }
 
+void DataCache::Partition::Tracer::Trace(
+    CacheStatus status, const DataCache::CacheKey& key,
+    int64_t lookup_len, int64_t entry_len) {
+
+  ostringstream buf;
+  kudu::JsonWriter jw(&buf, kudu::JsonWriter::COMPACT);
+
+  jw.StartObject();
+  jw.String("ts");
+  jw.Double(WallTime_Now());
+  jw.String("s");
+  switch (status) {
+    case HIT: jw.String("H"); break;
+    case MISS: jw.String("M"); break;
+    case STORE: jw.String("S"); break;
+    case STORE_FAILED_BUSY: jw.String("F"); break;
+  }
+
+  jw.String("f");
+  if (FLAGS_data_cache_anonymize_trace) {
+    uint128 hash = util_hash::CityHash128(
+        reinterpret_cast<const char*>(key.filename().data()),
+        key.filename().size());
+    // A 128-bit (16-byte) hash results in a 24-byte base64-encoded string, including
+    // two characters of padding.
+    const int BUF_LEN = 24;
+    DCHECK_EQ(BUF_LEN, CalculateBase64EscapedLen(sizeof(hash)));
+    char b64_buf[BUF_LEN];
+    int out_len = Base64Escape(reinterpret_cast<const unsigned char*>(&hash),
+        sizeof(hash), b64_buf, BUF_LEN);
+    DCHECK_EQ(out_len, BUF_LEN);
+    // Chop off the two padding bytes.
+    DCHECK(b64_buf[23] == '=' && b64_buf[22] == '=');
+    out_len = 22;
+    jw.String(b64_buf, out_len);
+  } else {
+    jw.String(reinterpret_cast<const char*>(key.filename().data()),
+              key.filename().size());
+  }
+  jw.String("m");
+  jw.Int64(key.mtime());
+  jw.String("o");
+  jw.Int64(key.offset());
+
+  if (lookup_len != -1) {
+    jw.String("lLen");
+    jw.Int64(lookup_len);
+  }
+  if (entry_len != -1) {
+    jw.String("eLen");
+    jw.Int64(entry_len);
+  }
+  jw.EndObject();
+  buf << "\n";
+
+  string s = buf.str();
+  logger_->Write(/*force_flush=*/false, /*timestamp=*/0, s.data(), s.size());
+}
+
 } // namespace io
 } // namespace impala
diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h
index 6a290d9..30cdaf8 100644
--- a/be/src/runtime/io/data-cache.h
+++ b/be/src/runtime/io/data-cache.h
@@ -22,6 +22,7 @@
 #include <unistd.h>
 #include <unordered_map>
 #include <unordered_set>
+#include <gtest/gtest_prod.h>
 
 #include "common/status.h"
 #include "util/spinlock.h"
@@ -182,6 +183,9 @@ class DataCache {
   Status CloseFilesAndVerifySizes();
 
  private:
+  friend class DataCacheTest;
+  FRIEND_TEST(DataCacheTest, TestAccessTrace);
+
   class CacheFile;
   struct CacheKey;
   class CacheEntry;
@@ -245,6 +249,10 @@ class DataCache {
     void DeleteOldFiles();
 
    private:
+    friend class DataCacheTest;
+    FRIEND_TEST(DataCacheTest, TestAccessTrace);
+    class Tracer;
+
     /// The directory path which this partition stores cached data in.
     const std::string path_;
 
@@ -261,6 +269,9 @@ class DataCache {
     /// The prefix of the names of the cache backing files.
     static const char* CACHE_FILE_PREFIX;
 
+    /// The file name used for the access trace.
+    static const char* TRACE_FILE_NAME;
+
     /// Protects the following fields.
     SpinLock lock_;
 
@@ -289,6 +300,8 @@ class DataCache {
     /// content. Please see comments at CachedEntry for details.
     std::unique_ptr<kudu::Cache> meta_cache_;
 
+    std::unique_ptr<Tracer> tracer_;
+
     /// Utility function for creating a new backing file in 'path_'. The cache
     /// partition's lock needs to be held when calling this function. Returns
     /// error on failure.