You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2018/08/29 01:51:23 UTC

[3/3] kudu git commit: KUDU-2469 pt 1: add an IOContext

KUDU-2469 pt 1: add an IOContext

This patch introduces the IOContext class that can be used to pass state
from the Tablet layer to the CFile layer. The top-level caller (whether
it is a Tablet method, an iterator, etc.) will own the IOContext, and
pass a pointer to it to lower-level classes.

I intend on using this to pass the tablet id down to the CFileReaders
for error handling. As such, only codepaths that lead to CFileReaders
are plumbed. Currently the IOContext only contains a tablet id, though
in the future it could be used for more sophisticated things like
resource management, QoS for IO, etc.

Change-Id: I645e10a2fda1f2564326b7052e6a0debc5ad738c
Reviewed-on: http://gerrit.cloudera.org:8080/11248
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2974f5a5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2974f5a5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2974f5a5

Branch: refs/heads/master
Commit: 2974f5a50c95862960d09f3f18831ca259d921b2
Parents: 846eb7f
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Aug 23 00:17:26 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Wed Aug 29 00:40:15 2018 +0000

----------------------------------------------------------------------
 src/kudu/cfile/bloomfile-test-base.h            |   2 +-
 src/kudu/cfile/bloomfile-test.cc                |   8 +-
 src/kudu/cfile/bloomfile.cc                     |  16 +--
 src/kudu/cfile/bloomfile.h                      |   8 +-
 src/kudu/cfile/cfile-test-base.h                |   2 +-
 src/kudu/cfile/cfile-test.cc                    |  10 +-
 src/kudu/cfile/cfile_reader.cc                  |  26 +++--
 src/kudu/cfile/cfile_reader.h                   |  27 +++--
 src/kudu/cfile/cfile_util.h                     |   9 ++
 src/kudu/fs/io_context.h                        |  32 ++++++
 src/kudu/tablet/cfile_set-test.cc               |  16 +--
 src/kudu/tablet/cfile_set.cc                    |  60 ++++++-----
 src/kudu/tablet/cfile_set.h                     |  36 ++++---
 src/kudu/tablet/compaction-test.cc              |  31 +++---
 src/kudu/tablet/compaction.cc                   |  17 +--
 src/kudu/tablet/compaction.h                    |   8 +-
 src/kudu/tablet/delta_compaction.cc             |  17 +--
 src/kudu/tablet/delta_compaction.h              |  10 +-
 src/kudu/tablet/delta_store.h                   |   9 +-
 src/kudu/tablet/delta_tracker.cc                |  74 +++++++------
 src/kudu/tablet/delta_tracker.h                 |  41 ++++----
 src/kudu/tablet/deltafile-test.cc               |   4 +-
 src/kudu/tablet/deltafile.cc                    |  24 +++--
 src/kudu/tablet/deltafile.h                     |   9 +-
 src/kudu/tablet/deltamemstore-test.cc           |   2 +-
 src/kudu/tablet/deltamemstore.cc                |   7 +-
 src/kudu/tablet/deltamemstore.h                 |   9 +-
 src/kudu/tablet/diskrowset-test-base.h          |   5 +-
 src/kudu/tablet/diskrowset-test.cc              |  51 ++++-----
 src/kudu/tablet/diskrowset.cc                   |  70 ++++++++-----
 src/kudu/tablet/diskrowset.h                    |  24 +++--
 src/kudu/tablet/memrowset-test.cc               |   4 +-
 src/kudu/tablet/memrowset.cc                    |   7 +-
 src/kudu/tablet/memrowset.h                     |  23 +++--
 src/kudu/tablet/mock-rowsets.h                  |  42 +++++---
 src/kudu/tablet/mt-diskrowset-test.cc           |   2 +-
 .../tablet/mt-rowset_delta_compaction-test.cc   |   4 +-
 src/kudu/tablet/rowset.cc                       |  23 +++--
 src/kudu/tablet/rowset.h                        |  38 +++++--
 src/kudu/tablet/tablet.cc                       | 103 ++++++++++++-------
 src/kudu/tablet/tablet.h                        |  30 ++++--
 src/kudu/tablet/tablet_bootstrap.cc             |  94 ++++++++++-------
 src/kudu/tablet/tablet_history_gc-test.cc       |   2 +-
 src/kudu/tools/tool_action_fs.cc                |   2 +-
 src/kudu/tools/tool_action_local_replica.cc     |   1 +
 45 files changed, 658 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/bloomfile-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile-test-base.h b/src/kudu/cfile/bloomfile-test-base.h
index a702d32..74251b1 100644
--- a/src/kudu/cfile/bloomfile-test-base.h
+++ b/src/kudu/cfile/bloomfile-test-base.h
@@ -112,7 +112,7 @@ class BloomFileTestBase : public KuduTest {
 
         Slice s(reinterpret_cast<uint8_t *>(&key), sizeof(key));
         bool present;
-        CHECK_OK(bfr_->CheckKeyPresent(BloomKeyProbe(s), &present));
+        CHECK_OK(bfr_->CheckKeyPresent(BloomKeyProbe(s), nullptr, &present));
         if (present) count_present++;
       }
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/bloomfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile-test.cc b/src/kudu/cfile/bloomfile-test.cc
index f699b01..56bb694 100644
--- a/src/kudu/cfile/bloomfile-test.cc
+++ b/src/kudu/cfile/bloomfile-test.cc
@@ -55,7 +55,7 @@ class BloomFileTest : public BloomFileTestBase {
       Slice s(reinterpret_cast<char *>(&i_byteswapped), sizeof(i));
 
       bool present = false;
-      ASSERT_OK_FAST(bfr_->CheckKeyPresent(BloomKeyProbe(s), &present));
+      ASSERT_OK_FAST(bfr_->CheckKeyPresent(BloomKeyProbe(s), nullptr, &present));
       ASSERT_TRUE(present);
     }
 
@@ -66,7 +66,7 @@ class BloomFileTest : public BloomFileTestBase {
       Slice s(reinterpret_cast<char *>(&key), sizeof(key));
 
       bool present = false;
-      ASSERT_OK_FAST(bfr_->CheckKeyPresent(BloomKeyProbe(s), &present));
+      ASSERT_OK_FAST(bfr_->CheckKeyPresent(BloomKeyProbe(s), nullptr, &present));
       if (present) {
         positive_count++;
       }
@@ -133,10 +133,10 @@ TEST_F(BloomFileTest, TestLazyInit) {
 
   // But initializing it should (only the first time), and the bloom's
   // memory usage should increase.
-  ASSERT_OK(reader->Init());
+  ASSERT_OK(reader->Init(nullptr));
   ASSERT_GT(bytes_read, 0);
   size_t bytes_read_after_init = bytes_read;
-  ASSERT_OK(reader->Init());
+  ASSERT_OK(reader->Init(nullptr));
   ASSERT_EQ(bytes_read_after_init, bytes_read);
   ASSERT_GT(tracker->consumption(), lazy_mem_usage);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/bloomfile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index 0dbfa63..ed8efb9 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -57,6 +57,7 @@ namespace cfile {
 
 using fs::BlockCreationTransaction;
 using fs::BlockManager;
+using fs::IOContext;
 using fs::ReadableBlock;
 using fs::WritableBlock;
 
@@ -199,9 +200,10 @@ Status BloomFileReader::Open(unique_ptr<ReadableBlock> block,
                              ReaderOptions options,
                              unique_ptr<BloomFileReader> *reader) {
   unique_ptr<BloomFileReader> bf_reader;
+  const IOContext* io_context = options.io_context;
   RETURN_NOT_OK(OpenNoInit(std::move(block),
                            std::move(options), &bf_reader));
-  RETURN_NOT_OK(bf_reader->Init());
+  RETURN_NOT_OK(bf_reader->Init(io_context));
 
   *reader = std::move(bf_reader);
   return Status::OK();
@@ -213,10 +215,11 @@ Status BloomFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
   unique_ptr<CFileReader> cf_reader;
   RETURN_NOT_OK(CFileReader::OpenNoInit(std::move(block),
                                         options, &cf_reader));
+  const IOContext* io_context = options.io_context;
   unique_ptr<BloomFileReader> bf_reader(new BloomFileReader(
       std::move(cf_reader), std::move(options)));
   if (!FLAGS_cfile_lazy_open) {
-    RETURN_NOT_OK(bf_reader->Init());
+    RETURN_NOT_OK(bf_reader->Init(io_context));
   }
 
   *reader = std::move(bf_reader);
@@ -232,15 +235,15 @@ BloomFileReader::BloomFileReader(unique_ptr<CFileReader> reader,
                        memory_footprint_excluding_reader()) {
 }
 
-Status BloomFileReader::Init() {
-  return init_once_.Init([this] { return InitOnce(); });
+Status BloomFileReader::Init(const IOContext* io_context) {
+  return init_once_.Init([this, io_context] { return InitOnce(io_context); });
 }
 
-Status BloomFileReader::InitOnce() {
+Status BloomFileReader::InitOnce(const IOContext* io_context) {
   // Fully open the CFileReader if it was lazily opened earlier.
   //
   // If it's already initialized, this is a no-op.
-  RETURN_NOT_OK(reader_->Init());
+  RETURN_NOT_OK(reader_->Init(io_context));
 
   if (reader_->is_compressed()) {
     return Status::Corruption("bloom file is compressed (compression not supported)",
@@ -283,6 +286,7 @@ Status BloomFileReader::ParseBlockHeader(const Slice &block,
 }
 
 Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
+                                        const IOContext* io_context,
                                         bool *maybe_present) {
   DCHECK(init_once_.init_succeeded());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/bloomfile.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile.h b/src/kudu/cfile/bloomfile.h
index e1553ec..63efdfd 100644
--- a/src/kudu/cfile/bloomfile.h
+++ b/src/kudu/cfile/bloomfile.h
@@ -35,6 +35,7 @@ namespace kudu {
 
 namespace fs {
 class BlockCreationTransaction;
+struct IOContext;
 class ReadableBlock;
 class WritableBlock;
 }
@@ -106,13 +107,14 @@ class BloomFileReader {
   // validating its contents.
   //
   // May be called multiple times; subsequent calls will no-op.
-  Status Init();
+  Status Init(const fs::IOContext* io_context);
 
   // Check if the given key may be present in the file.
   //
   // Sets *maybe_present to false if the key is definitely not
   // present, otherwise sets it to true to indicate maybe present.
-  Status CheckKeyPresent(const BloomKeyProbe &probe,
+  Status CheckKeyPresent(const BloomKeyProbe& probe,
+                         const fs::IOContext* io_context,
                          bool* maybe_present);
 
   // Can be called before Init().
@@ -135,7 +137,7 @@ class BloomFileReader {
                           Slice* bloom_data) const;
 
   // Callback used in 'init_once_' to initialize this bloom file.
-  Status InitOnce();
+  Status InitOnce(const fs::IOContext* io_context);
 
   // Returns the memory usage of this object including the object itself but
   // excluding the CFileReader, which is tracked independently.

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/cfile-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h
index e21ba83..e48d6eb 100644
--- a/src/kudu/cfile/cfile-test-base.h
+++ b/src/kudu/cfile/cfile-test-base.h
@@ -486,7 +486,7 @@ void TimeReadFile(FsManager* fs_manager, const BlockId& block_id, size_t *count_
   ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
 
   gscoped_ptr<CFileIterator> iter;
-  ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK));
+  ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
   ASSERT_OK(iter->SeekToOrdinal(0));
 
   Arena arena(8192);

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 3213495..dc1458f 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -118,7 +118,7 @@ class TestCFile : public CFileTestBase {
 
     BlockPointer ptr;
     gscoped_ptr<CFileIterator> iter;
-    ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK));
+    ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
 
     ASSERT_OK(iter->SeekToOrdinal(5000));
     ASSERT_EQ(5000u, iter->GetCurrentOrdinal());
@@ -201,7 +201,7 @@ class TestCFile : public CFileTestBase {
     ASSERT_EQ(DataGeneratorType::kDataType, reader->type_info()->type());
 
     gscoped_ptr<CFileIterator> iter;
-    ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK));
+    ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
 
     Arena arena(8192);
     ScopedColumnBlock<DataGeneratorType::kDataType> cb(10);
@@ -615,7 +615,7 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
   BlockPointer ptr;
 
   gscoped_ptr<CFileIterator> iter;
-  ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK));
+  ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
 
   Arena arena(1024);
 
@@ -970,10 +970,10 @@ TEST_P(TestCFileBothCacheTypes, TestLazyInit) {
 
   // But initializing it should (only the first time), and the reader's
   // memory usage should increase.
-  ASSERT_OK(reader->Init());
+  ASSERT_OK(reader->Init(nullptr));
   ASSERT_GT(bytes_read, 0);
   size_t bytes_read_after_init = bytes_read;
-  ASSERT_OK(reader->Init());
+  ASSERT_OK(reader->Init(nullptr));
   ASSERT_EQ(bytes_read_after_init, bytes_read);
   ASSERT_GT(tracker->consumption(), lazy_mem_usage);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/cfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index e0d7688..2ee5236 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -75,6 +75,7 @@ DEFINE_bool(cfile_verify_checksums, true,
             "Verify the checksum for each block on read if one exists");
 TAG_FLAG(cfile_verify_checksums, evolving);
 
+using kudu::fs::IOContext;
 using kudu::fs::ReadableBlock;
 using kudu::pb_util::SecureDebugString;
 using std::string;
@@ -134,10 +135,11 @@ Status CFileReader::Open(unique_ptr<ReadableBlock> block,
                          ReaderOptions options,
                          unique_ptr<CFileReader>* reader) {
   unique_ptr<CFileReader> reader_local;
+  const IOContext* io_context = options.io_context;
   RETURN_NOT_OK(OpenNoInit(std::move(block),
                            std::move(options),
                            &reader_local));
-  RETURN_NOT_OK(reader_local->Init());
+  RETURN_NOT_OK(reader_local->Init(io_context));
 
   *reader = std::move(reader_local);
   return Status::OK();
@@ -148,17 +150,18 @@ Status CFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
                                unique_ptr<CFileReader>* reader) {
   uint64_t block_size;
   RETURN_NOT_OK(block->Size(&block_size));
+  const IOContext* io_context = options.io_context;
   unique_ptr<CFileReader> reader_local(
       new CFileReader(std::move(options), block_size, std::move(block)));
   if (!FLAGS_cfile_lazy_open) {
-    RETURN_NOT_OK(reader_local->Init());
+    RETURN_NOT_OK(reader_local->Init(io_context));
   }
 
   *reader = std::move(reader_local);
   return Status::OK();
 }
 
-Status CFileReader::InitOnce() {
+Status CFileReader::InitOnce(const IOContext* io_context) {
   VLOG(1) << "Initializing CFile with ID " << block_->id().ToString();
   TRACE_COUNTER_INCREMENT("cfile_init", 1);
 
@@ -191,8 +194,8 @@ Status CFileReader::InitOnce() {
   return Status::OK();
 }
 
-Status CFileReader::Init() {
-  RETURN_NOT_OK_PREPEND(init_once_.Init([this] { return InitOnce(); }),
+Status CFileReader::Init(const IOContext* io_context) {
+  RETURN_NOT_OK_PREPEND(init_once_.Init([this, io_context] { return InitOnce(io_context); }),
                         Substitute("failed to init CFileReader for block $0",
                                    block_id().ToString()));
   return Status::OK();
@@ -571,8 +574,9 @@ bool CFileReader::GetMetadataEntry(const string &key, string *val) const {
   return false;
 }
 
-Status CFileReader::NewIterator(CFileIterator **iter, CacheControl cache_control) {
-  *iter = new CFileIterator(this, cache_control);
+Status CFileReader::NewIterator(CFileIterator** iter, CacheControl cache_control,
+                                const IOContext* io_context) {
+  *iter = new CFileIterator(this, cache_control, io_context);
   return Status::OK();
 }
 
@@ -655,13 +659,15 @@ Status DefaultColumnValueIterator::FinishBatch() {
 // Iterator
 ////////////////////////////////////////////////////////////
 CFileIterator::CFileIterator(CFileReader* reader,
-                             CFileReader::CacheControl cache_control)
+                             CFileReader::CacheControl cache_control,
+                             const IOContext* io_context)
   : reader_(reader),
     seeked_(nullptr),
     prepared_(false),
     cache_control_(cache_control),
     last_prepare_idx_(-1),
-    last_prepare_count_(-1) {
+    last_prepare_count_(-1),
+    io_context_(io_context) {
 }
 
 CFileIterator::~CFileIterator() {
@@ -846,7 +852,7 @@ Status CFileIterator::PrepareForNewSeek() {
   // Fully open the CFileReader if it was lazily opened earlier.
   //
   // If it's already initialized, this is a no-op.
-  RETURN_NOT_OK(reader_->Init());
+  RETURN_NOT_OK(reader_->Init(io_context_));
 
   // Create the index tree iterators if we haven't already done so.
   if (!posidx_iter_ && reader_->footer().has_posidx_info()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/cfile_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index 7263d3f..6e98fa8 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -54,6 +55,10 @@ class EncodedKey;
 class SelectionVector;
 class TypeInfo;
 
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 template <typename T> class ArrayView;
 
 namespace cfile {
@@ -87,7 +92,7 @@ class CFileReader {
   // its contents.
   //
   // May be called multiple times; subsequent calls will no-op.
-  Status Init();
+  Status Init(const fs::IOContext* io_context);
 
   enum CacheControl {
     CACHE_BLOCK,
@@ -95,11 +100,14 @@ class CFileReader {
   };
 
   // Can be called before Init().
-  Status NewIterator(CFileIterator **iter, CacheControl cache_control);
-  Status NewIterator(gscoped_ptr<CFileIterator> *iter,
-                     CacheControl cache_control) {
-    CFileIterator *iter_ptr;
-    RETURN_NOT_OK(NewIterator(&iter_ptr, cache_control));
+  Status NewIterator(CFileIterator** iter,
+                     CacheControl cache_control,
+                     const fs::IOContext* io_context);
+  Status NewIterator(gscoped_ptr<CFileIterator>* iter,
+                     CacheControl cache_control,
+                     const fs::IOContext* io_context) {
+    CFileIterator* iter_ptr;
+    RETURN_NOT_OK(NewIterator(&iter_ptr, cache_control, io_context));
     (*iter).reset(iter_ptr);
     return Status::OK();
   }
@@ -191,7 +199,7 @@ class CFileReader {
               std::unique_ptr<fs::ReadableBlock> block);
 
   // Callback used in 'init_once_' to initialize this cfile.
-  Status InitOnce();
+  Status InitOnce(const fs::IOContext* io_context);
 
   Status ReadAndParseHeader();
   Status ReadAndParseFooter();
@@ -313,7 +321,8 @@ class DefaultColumnValueIterator : public ColumnIterator {
 class CFileIterator : public ColumnIterator {
  public:
   CFileIterator(CFileReader* reader,
-                CFileReader::CacheControl cache_control);
+                CFileReader::CacheControl cache_control,
+                const fs::IOContext* io_context);
   ~CFileIterator();
 
   // Seek to the first entry in the file. This works for both
@@ -498,6 +507,8 @@ class CFileIterator : public ColumnIterator {
 
   IteratorStats io_stats_;
 
+  const fs::IOContext* io_context_;
+
   // a temporary buffer for encoding
   faststring tmp_buf_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/cfile/cfile_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_util.h b/src/kudu/cfile/cfile_util.h
index 967569e..1574187 100644
--- a/src/kudu/cfile/cfile_util.h
+++ b/src/kudu/cfile/cfile_util.h
@@ -33,6 +33,10 @@ namespace kudu {
 class MemTracker;
 class faststring;
 
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace cfile {
 
 class CFileReader;
@@ -91,6 +95,11 @@ struct WriterOptions {
 struct ReaderOptions {
   ReaderOptions();
 
+  // The IO context of this reader.
+  //
+  // Default: nullptr
+  const fs::IOContext* io_context;
+
   // The MemTracker that should account for this reader's memory consumption.
   //
   // Default: the root tracker.

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/fs/io_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/io_context.h b/src/kudu/fs/io_context.h
new file mode 100644
index 0000000..9a335a6
--- /dev/null
+++ b/src/kudu/fs/io_context.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+namespace kudu {
+namespace fs {
+
+// An IOContext provides a single interface to pass state around during IO.
+struct IOContext {
+  // The tablet_id associated with this IO.
+  std::string tablet_id;
+};
+
+}  // namespace fs
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/cfile_set-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index a1c4316..cef4b71 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -106,7 +106,7 @@ class TestCFileSet : public KuduRowSetTest {
                        int32_t lower,
                        int32_t upper) {
     // Create iterator.
-    shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_));
+    shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
     gscoped_ptr<RowwiseIterator> iter(new MaterializingIterator(cfile_iter));
 
     // Create a scan with a range predicate on the key column.
@@ -163,9 +163,9 @@ TEST_F(TestCFileSet, TestPartiallyMaterialize) {
   WriteTestRowSet(kNumRows);
 
   shared_ptr<CFileSet> fileset;
-  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), &fileset));
+  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), nullptr, &fileset));
 
-  gscoped_ptr<CFileSet::Iterator> iter(fileset->NewIterator(&schema_));
+  gscoped_ptr<CFileSet::Iterator> iter(fileset->NewIterator(&schema_, nullptr));
   ASSERT_OK(iter->Init(nullptr));
 
   Arena arena(4096);
@@ -243,11 +243,11 @@ TEST_F(TestCFileSet, TestIteratePartialSchema) {
   WriteTestRowSet(kNumRows);
 
   shared_ptr<CFileSet> fileset;
-  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), &fileset));
+  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), nullptr, &fileset));
 
   Schema new_schema;
   ASSERT_OK(schema_.CreateProjectionByNames({ "c0", "c2" }, &new_schema));
-  shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&new_schema));
+  shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&new_schema, nullptr));
   gscoped_ptr<RowwiseIterator> iter(new MaterializingIterator(cfile_iter));
 
   ASSERT_OK(iter->Init(nullptr));
@@ -276,10 +276,10 @@ TEST_F(TestCFileSet, TestRangeScan) {
   WriteTestRowSet(kNumRows);
 
   shared_ptr<CFileSet> fileset;
-  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), &fileset));
+  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), nullptr, &fileset));
 
   // Create iterator.
-  shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_));
+  shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
   gscoped_ptr<RowwiseIterator> iter(new MaterializingIterator(cfile_iter));
   Schema key_schema = schema_.CreateKeyProjection();
   Arena arena(1024);
@@ -329,7 +329,7 @@ TEST_F(TestCFileSet, TestRangePredicates2) {
   WriteTestRowSet(kNumRows);
 
   shared_ptr<CFileSet> fileset;
-  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), &fileset));
+  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), nullptr, &fileset));
 
   // Range scan where rows match on both ends
   DoTestRangeScan(fileset, 2000, 2010);

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/cfile_set.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc
index 63a28aa..3d16b39 100644
--- a/src/kudu/tablet/cfile_set.cc
+++ b/src/kudu/tablet/cfile_set.cc
@@ -76,6 +76,7 @@ using cfile::CFileReader;
 using cfile::ColumnIterator;
 using cfile::ReaderOptions;
 using cfile::DefaultColumnValueIterator;
+using fs::IOContext;
 using fs::ReadableBlock;
 using std::shared_ptr;
 using std::string;
@@ -90,12 +91,14 @@ using strings::Substitute;
 static Status OpenReader(FsManager* fs,
                          shared_ptr<MemTracker> parent_mem_tracker,
                          const BlockId& block_id,
+                         const IOContext* io_context,
                          unique_ptr<CFileReader>* new_reader) {
   unique_ptr<ReadableBlock> block;
   RETURN_NOT_OK(fs->OpenBlock(block_id, &block));
 
   ReaderOptions opts;
   opts.parent_mem_tracker = std::move(parent_mem_tracker);
+  opts.io_context = io_context;
   return CFileReader::OpenNoInit(std::move(block),
                                  std::move(opts),
                                  new_reader);
@@ -116,17 +119,18 @@ CFileSet::~CFileSet() {
 
 Status CFileSet::Open(shared_ptr<RowSetMetadata> rowset_metadata,
                       shared_ptr<MemTracker> parent_mem_tracker,
+                      const IOContext* io_context,
                       shared_ptr<CFileSet>* cfile_set) {
   shared_ptr<CFileSet> cfs(new CFileSet(std::move(rowset_metadata),
                                         std::move(parent_mem_tracker)));
-  RETURN_NOT_OK(cfs->DoOpen());
+  RETURN_NOT_OK(cfs->DoOpen(io_context));
 
   cfile_set->swap(cfs);
   return Status::OK();
 }
 
-Status CFileSet::DoOpen() {
-  RETURN_NOT_OK(OpenBloomReader());
+Status CFileSet::DoOpen(const IOContext* io_context) {
+  RETURN_NOT_OK(OpenBloomReader(io_context));
 
   // Lazily open the column data cfiles. Each one will be fully opened
   // later, when the first iterator seeks for the first time.
@@ -139,6 +143,7 @@ Status CFileSet::DoOpen() {
     RETURN_NOT_OK(OpenReader(rowset_metadata_->fs_manager(),
                              parent_mem_tracker_,
                              rowset_metadata_->column_data_block_for_col_id(col_id),
+                             io_context,
                              &reader));
     readers_by_col_id_[col_id] = std::move(reader);
     VLOG(1) << "Successfully opened cfile for column id " << col_id
@@ -150,6 +155,7 @@ Status CFileSet::DoOpen() {
     RETURN_NOT_OK(OpenReader(rowset_metadata_->fs_manager(),
                              parent_mem_tracker_,
                              rowset_metadata_->adhoc_index_block(),
+                             io_context,
                              &ad_hoc_idx_reader_));
   }
 
@@ -159,7 +165,7 @@ Status CFileSet::DoOpen() {
     min_encoded_key_ = rowset_metadata_->min_encoded_key();
     max_encoded_key_ = rowset_metadata_->max_encoded_key();
   } else {
-    RETURN_NOT_OK(LoadMinMaxKeys());
+    RETURN_NOT_OK(LoadMinMaxKeys(io_context));
   }
   // Verify the loaded keys are valid.
   if (Slice(min_encoded_key_).compare(max_encoded_key_) > 0) {
@@ -172,13 +178,14 @@ Status CFileSet::DoOpen() {
   return Status::OK();
 }
 
-Status CFileSet::OpenBloomReader() {
+Status CFileSet::OpenBloomReader(const IOContext* io_context) {
   FsManager* fs = rowset_metadata_->fs_manager();
   unique_ptr<ReadableBlock> block;
   RETURN_NOT_OK(fs->OpenBlock(rowset_metadata_->bloom_block(), &block));
 
   ReaderOptions opts;
   opts.parent_mem_tracker = parent_mem_tracker_;
+  opts.io_context = io_context;
   Status s = BloomFileReader::OpenNoInit(std::move(block),
                                          std::move(opts),
                                          &bloom_reader_);
@@ -191,9 +198,9 @@ Status CFileSet::OpenBloomReader() {
   return Status::OK();
 }
 
-Status CFileSet::LoadMinMaxKeys() {
+Status CFileSet::LoadMinMaxKeys(const IOContext* io_context) {
   CFileReader* key_reader = key_index_reader();
-  RETURN_NOT_OK(key_index_reader()->Init());
+  RETURN_NOT_OK(key_index_reader()->Init(io_context));
   if (!key_reader->GetMetadataEntry(DiskRowSet::kMinKeyMetaEntryName, &min_encoded_key_)) {
     return Status::Corruption("No min key found", ToString());
   }
@@ -215,16 +222,18 @@ CFileReader* CFileSet::key_index_reader() const {
 }
 
 Status CFileSet::NewColumnIterator(ColumnId col_id, CFileReader::CacheControl cache_blocks,
-                                   CFileIterator **iter) const {
-  return FindOrDie(readers_by_col_id_, col_id)->NewIterator(iter, cache_blocks);
+                                   const fs::IOContext* io_context, CFileIterator **iter) const {
+  return FindOrDie(readers_by_col_id_, col_id)->NewIterator(iter, cache_blocks,
+                                                            io_context);
 }
 
-CFileSet::Iterator *CFileSet::NewIterator(const Schema *projection) const {
-  return new CFileSet::Iterator(shared_from_this(), projection);
+CFileSet::Iterator* CFileSet::NewIterator(const Schema* projection,
+                                          const IOContext* io_context) const {
+  return new CFileSet::Iterator(shared_from_this(), projection, io_context);
 }
 
-Status CFileSet::CountRows(rowid_t *count) const {
-  RETURN_NOT_OK(key_index_reader()->Init());
+Status CFileSet::CountRows(const IOContext* io_context, rowid_t *count) const {
+  RETURN_NOT_OK(key_index_reader()->Init(io_context));
   return key_index_reader()->CountRows(count);
 }
 
@@ -255,17 +264,18 @@ uint64_t CFileSet::OnDiskDataSize() const {
 }
 
 Status CFileSet::FindRow(const RowSetKeyProbe &probe,
+                         const IOContext* io_context,
                          boost::optional<rowid_t>* idx,
                          ProbeStats* stats) const {
   if (FLAGS_consult_bloom_filters) {
     // Fully open the BloomFileReader if it was lazily opened earlier.
     //
     // If it's already initialized, this is a no-op.
-    RETURN_NOT_OK(bloom_reader_->Init());
+    RETURN_NOT_OK(bloom_reader_->Init(io_context));
 
     stats->blooms_consulted++;
     bool present;
-    Status s = bloom_reader_->CheckKeyPresent(probe.bloom_probe(), &present);
+    Status s = bloom_reader_->CheckKeyPresent(probe.bloom_probe(), io_context, &present);
     if (s.ok() && !present) {
       *idx = boost::none;
       return Status::OK();
@@ -284,7 +294,7 @@ Status CFileSet::FindRow(const RowSetKeyProbe &probe,
 
   stats->keys_consulted++;
   CFileIterator *key_iter = nullptr;
-  RETURN_NOT_OK(NewKeyIterator(&key_iter));
+  RETURN_NOT_OK(NewKeyIterator(io_context, &key_iter));
 
   unique_ptr<CFileIterator> key_iter_scoped(key_iter); // free on return
 
@@ -300,10 +310,10 @@ Status CFileSet::FindRow(const RowSetKeyProbe &probe,
   return Status::OK();
 }
 
-Status CFileSet::CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                                 rowid_t *rowid, ProbeStats* stats) const {
+Status CFileSet::CheckRowPresent(const RowSetKeyProbe& probe, const IOContext* io_context,
+                                 bool* present, rowid_t* rowid, ProbeStats* stats) const {
   boost::optional<rowid_t> opt_rowid;
-  RETURN_NOT_OK(FindRow(probe, &opt_rowid, stats));
+  RETURN_NOT_OK(FindRow(probe, io_context, &opt_rowid, stats));
   *present = opt_rowid != boost::none;
   if (*present) {
     *rowid = *opt_rowid;
@@ -311,9 +321,9 @@ Status CFileSet::CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
   return Status::OK();
 }
 
-Status CFileSet::NewKeyIterator(CFileIterator **key_iter) const {
-  RETURN_NOT_OK(key_index_reader()->Init());
-  return key_index_reader()->NewIterator(key_iter, CFileReader::CACHE_BLOCK);
+Status CFileSet::NewKeyIterator(const IOContext* io_context, CFileIterator** key_iter) const {
+  RETURN_NOT_OK(key_index_reader()->Init(io_context));
+  return key_index_reader()->NewIterator(key_iter, CFileReader::CACHE_BLOCK, io_context);
 }
 
 ////////////////////////////////////////////////////////////
@@ -351,7 +361,7 @@ Status CFileSet::Iterator::CreateColumnIterators(const ScanSpec* spec) {
       continue;
     }
     CFileIterator *iter;
-    RETURN_NOT_OK_PREPEND(base_data_->NewColumnIterator(col_id, cache_blocks, &iter),
+    RETURN_NOT_OK_PREPEND(base_data_->NewColumnIterator(col_id, cache_blocks, io_context_, &iter),
                           Substitute("could not create iterator for column $0",
                                      projection_->column(proj_col_idx).ToString()));
     ret_iters.emplace_back(iter);
@@ -364,11 +374,11 @@ Status CFileSet::Iterator::CreateColumnIterators(const ScanSpec* spec) {
 Status CFileSet::Iterator::Init(ScanSpec *spec) {
   CHECK(!initted_);
 
-  RETURN_NOT_OK(base_data_->CountRows(&row_count_));
+  RETURN_NOT_OK(base_data_->CountRows(io_context_, &row_count_));
 
   // Setup Key Iterator
   CFileIterator *tmp;
-  RETURN_NOT_OK(base_data_->NewKeyIterator(&tmp));
+  RETURN_NOT_OK(base_data_->NewKeyIterator(io_context_, &tmp));
   key_iter_.reset(tmp);
 
   // Setup column iterators.

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/cfile_set.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h
index 5c533ad..89743d9 100644
--- a/src/kudu/tablet/cfile_set.h
+++ b/src/kudu/tablet/cfile_set.h
@@ -43,7 +43,7 @@
 namespace boost {
 template <class T>
 class optional;
-}
+}  // namespace boost
 
 namespace kudu {
 
@@ -55,7 +55,11 @@ struct IteratorStats;
 
 namespace cfile {
 class BloomFileReader;
-}
+}  // namespace cfile
+
+namespace fs {
+struct IOContext;
+}  // namespace fs
 
 namespace tablet {
 
@@ -72,13 +76,15 @@ class CFileSet : public std::enable_shared_from_this<CFileSet> {
 
   static Status Open(std::shared_ptr<RowSetMetadata> rowset_metadata,
                      std::shared_ptr<MemTracker> parent_mem_tracker,
+                     const fs::IOContext* io_context,
                      std::shared_ptr<CFileSet>* cfile_set);
 
   // Create an iterator with the given projection. 'projection' must remain valid
   // for the lifetime of the returned iterator.
-  virtual Iterator *NewIterator(const Schema *projection) const;
+  virtual Iterator* NewIterator(const Schema* projection,
+                                const fs::IOContext* io_context) const;
 
-  Status CountRows(rowid_t *count) const;
+  Status CountRows(const fs::IOContext* io_context, rowid_t *count) const;
 
   // See RowSet::GetBounds
   virtual Status GetBounds(std::string* min_encoded_key,
@@ -99,6 +105,7 @@ class CFileSet : public std::enable_shared_from_this<CFileSet> {
   // Determine the index of the given row key.
   // Sets *idx to boost::none if the row is not found.
   Status FindRow(const RowSetKeyProbe& probe,
+                 const fs::IOContext* io_context,
                  boost::optional<rowid_t>* idx,
                  ProbeStats* stats) const;
 
@@ -108,8 +115,8 @@ class CFileSet : public std::enable_shared_from_this<CFileSet> {
 
   // Check if the given row is present. If it is, sets *rowid to the
   // row's index.
-  Status CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                         rowid_t *rowid, ProbeStats* stats) const;
+  Status CheckRowPresent(const RowSetKeyProbe& probe, const fs::IOContext* io_context,
+                         bool* present, rowid_t* rowid, ProbeStats* stats) const;
 
   // Return true if there exists a CFile for the given column ID.
   bool has_data_for_column_id(ColumnId col_id) const {
@@ -126,15 +133,15 @@ class CFileSet : public std::enable_shared_from_this<CFileSet> {
   CFileSet(std::shared_ptr<RowSetMetadata> rowset_metadata,
            std::shared_ptr<MemTracker> parent_mem_tracker);
 
-  Status DoOpen();
-  Status OpenBloomReader();
-  Status OpenAdHocIndexReader();
-  Status LoadMinMaxKeys();
+  Status DoOpen(const fs::IOContext* io_context);
+  Status OpenBloomReader(const fs::IOContext* io_context);
+  Status LoadMinMaxKeys(const fs::IOContext* io_context);
 
   Status NewColumnIterator(ColumnId col_id,
                            cfile::CFileReader::CacheControl cache_blocks,
+                           const fs::IOContext* io_context,
                            cfile::CFileIterator **iter) const;
-  Status NewKeyIterator(cfile::CFileIterator** key_iter) const;
+  Status NewKeyIterator(const fs::IOContext* io_context, cfile::CFileIterator** key_iter) const;
 
   // Return the CFileReader responsible for reading the key index.
   // (the ad-hoc reader for composite keys, otherwise the key column reader)
@@ -210,12 +217,14 @@ class CFileSet::Iterator : public ColumnwiseIterator {
   friend class CFileSet;
 
   // 'projection' must remain valid for the lifetime of this object.
-  Iterator(std::shared_ptr<CFileSet const> base_data, const Schema *projection)
+  Iterator(std::shared_ptr<CFileSet const> base_data, const Schema* projection,
+           const fs::IOContext* io_context)
       : base_data_(std::move(base_data)),
         projection_(projection),
         initted_(false),
         cur_idx_(0),
-        prepared_count_(0) {}
+        prepared_count_(0),
+        io_context_(io_context) {}
 
   // Fill in col_iters_ for each of the requested columns.
   Status CreateColumnIterators(const ScanSpec* spec);
@@ -252,6 +261,7 @@ class CFileSet::Iterator : public ColumnwiseIterator {
   rowid_t lower_bound_idx_;
   rowid_t upper_bound_idx_;
 
+  const fs::IOContext* io_context_;
 
   // The underlying columns are prepared lazily, so that if a column is never
   // materialized, it doesn't need to be read off disk.

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/compaction-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index e18d966..6c83169 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -225,6 +225,7 @@ class TestCompaction : public KuduRowSetTest {
                                 probe,
                                 RowChangeList(update_buf),
                                 op_id_,
+                                nullptr,
                                 &stats,
                                 &result));
   }
@@ -261,6 +262,7 @@ class TestCompaction : public KuduRowSetTest {
                                 probe,
                                 RowChangeList(update_buf),
                                 op_id_,
+                                nullptr,
                                 &stats,
                                 &result));
   }
@@ -297,7 +299,7 @@ class TestCompaction : public KuduRowSetTest {
       for (const shared_ptr<RowSetMetadata>& meta : metas) {
         shared_ptr<DiskRowSet> rs;
         ASSERT_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
-                                   mem_trackers_, &rs));
+                                   mem_trackers_, nullptr, &rs));
         result_rowsets->push_back(rs);
       }
     }
@@ -310,7 +312,7 @@ class TestCompaction : public KuduRowSetTest {
     vector<shared_ptr<CompactionInput> > merge_inputs;
     for (const shared_ptr<DiskRowSet> &rs : rowsets) {
       gscoped_ptr<CompactionInput> input;
-      RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap, &input));
+      RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap, nullptr, &input));
       merge_inputs.push_back(shared_ptr<CompactionInput>(input.release()));
     }
     out->reset(CompactionInput::Merge(merge_inputs, &projection));
@@ -396,7 +398,7 @@ class TestCompaction : public KuduRowSetTest {
     // Verify the resulting compaction output has the right number
     // of rows.
     rowid_t count = 0;
-    ASSERT_OK(result_rs->CountRows(&count));
+    ASSERT_OK(result_rs->CountRows(nullptr, &count));
     ASSERT_EQ(1000 * schemas.size(), count);
   }
 
@@ -442,7 +444,7 @@ class TestCompaction : public KuduRowSetTest {
       for (const shared_ptr<RowSetMetadata>& meta : input_meta->rowsets()) {
         shared_ptr<DiskRowSet> rs;
         CHECK_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
-                                  mem_trackers_, &rs));
+                                  mem_trackers_, nullptr, &rs));
         rowsets.push_back(rs);
       }
 
@@ -556,14 +558,14 @@ TEST_F(TestCompaction, TestRowSetInput) {
   UpdateRows(rs.get(), 10, 0, 1);
   UpdateRows(rs.get(), 10, 0, 2);
   // Flush DMS, update some more.
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
   UpdateRows(rs.get(), 10, 0, 3);
   UpdateRows(rs.get(), 10, 0, 4);
 
   // Check compaction input
   vector<string> out;
   gscoped_ptr<CompactionInput> input;
-  ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), &input));
+  ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), nullptr, &input));
   IterateInput(input.get(), &out);
   ASSERT_EQ(10, out.size());
   EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 val=0, )"
@@ -635,6 +637,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
   ASSERT_OK(CompactionInput::Create(*result,
                                     &schema_,
                                     MvccSnapshot::CreateSnapshotIncludingAllTransactions(),
+                                    nullptr,
                                     &input));
   vector<string> out;
   IterateInput(input.get(), &out);
@@ -795,7 +798,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
   vector<shared_ptr<CompactionInput>> inputs;
   for (auto& row_set : row_sets) {
     gscoped_ptr<CompactionInput> ci;
-    CHECK_OK(row_set->NewCompactionInput(&schema_, all_snap, &ci));
+    CHECK_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
     inputs.push_back(shared_ptr<CompactionInput>(ci.release()));
   }
 
@@ -816,7 +819,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
 
   vector<string> out;
   gscoped_ptr<CompactionInput> ci;
-  CHECK_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, &ci));
+  CHECK_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
   IterateInput(ci.get(), &out);
 
   // Finally go through the final compaction input and through the expected one and make sure
@@ -865,10 +868,10 @@ TEST_F(TestCompaction, TestMRSCompactionDoesntOutputUnobservableRows) {
   MvccSnapshot all_snap = MvccSnapshot::CreateSnapshotIncludingAllTransactions();
 
   gscoped_ptr<CompactionInput> rs1_input;
-  ASSERT_OK(CompactionInput::Create(*rs1, &schema_, all_snap, &rs1_input));
+  ASSERT_OK(CompactionInput::Create(*rs1, &schema_, all_snap, nullptr, &rs1_input));
 
   gscoped_ptr<CompactionInput> rs2_input;
-  ASSERT_OK(CompactionInput::Create(*rs2, &schema_, all_snap, &rs2_input));
+  ASSERT_OK(CompactionInput::Create(*rs2, &schema_, all_snap, nullptr, &rs2_input));
 
   vector<shared_ptr<CompactionInput>> to_merge;
   to_merge.push_back(shared_ptr<CompactionInput>(rs1_input.release()));
@@ -917,12 +920,12 @@ TEST_F(TestCompaction, TestOneToOne) {
 
   string dummy_name = "";
 
-  ASSERT_OK(ReupdateMissedDeltas(dummy_name, input.get(), HistoryGcOpts::Disabled(), snap, snap2,
+  ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(), HistoryGcOpts::Disabled(), snap, snap2,
                                  { rs }));
 
   // If we look at the contents of the DiskRowSet now, we should see the "re-updated" data.
   vector<string> out;
-  ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), &input));
+  ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), nullptr, &input));
   IterateInput(input.get(), &out);
   ASSERT_EQ(1000, out.size());
   EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 val=1, )"
@@ -933,7 +936,7 @@ TEST_F(TestCompaction, TestOneToOne) {
   // And compact (1 input to 1 output)
   MvccSnapshot snap3(mvcc_);
   gscoped_ptr<CompactionInput> compact_input;
-  ASSERT_OK(CompactionInput::Create(*rs, &schema_, snap3, &compact_input));
+  ASSERT_OK(CompactionInput::Create(*rs, &schema_, snap3, nullptr, &compact_input));
   DoFlushAndReopen(compact_input.get(), schema_, snap3, kLargeRollThreshold, nullptr);
 }
 
@@ -975,7 +978,7 @@ TEST_F(TestCompaction, TestKUDU102) {
   string dummy_name = "";
 
   // This would fail without KUDU-102
-  ASSERT_OK(ReupdateMissedDeltas(dummy_name, input.get(), HistoryGcOpts::Disabled(), snap, snap2,
+  ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(), HistoryGcOpts::Disabled(), snap, snap2,
                                  { rs, rs_b }));
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index c754e26..fc0ade5 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -58,6 +58,7 @@
 #include "kudu/util/memory/arena.h"
 
 using kudu::clock::HybridClock;
+using kudu::fs::IOContext;
 using std::deque;
 using std::shared_ptr;
 using std::string;
@@ -852,16 +853,18 @@ string CompactionInputRowToString(const CompactionInputRow& input_row) {
 Status CompactionInput::Create(const DiskRowSet &rowset,
                                const Schema* projection,
                                const MvccSnapshot &snap,
+                               const IOContext* io_context,
                                gscoped_ptr<CompactionInput>* out) {
   CHECK(projection->has_column_ids());
 
-  shared_ptr<ColumnwiseIterator> base_cwise(rowset.base_data_->NewIterator(projection));
+  shared_ptr<ColumnwiseIterator> base_cwise(rowset.base_data_->NewIterator(projection, io_context));
   gscoped_ptr<RowwiseIterator> base_iter(new MaterializingIterator(base_cwise));
 
   // Creates a DeltaIteratorMerger that will only include the relevant REDO deltas.
   RowIteratorOptions redo_opts;
   redo_opts.projection = projection;
   redo_opts.snap_to_include = snap;
+  redo_opts.io_context = io_context;
   unique_ptr<DeltaIterator> redo_deltas;
   RETURN_NOT_OK_PREPEND(rowset.delta_tracker_->NewDeltaIterator(
       redo_opts, DeltaTracker::REDOS_ONLY, &redo_deltas), "Could not open REDOs");
@@ -870,6 +873,7 @@ Status CompactionInput::Create(const DiskRowSet &rowset,
   RowIteratorOptions undo_opts;
   undo_opts.projection = projection;
   undo_opts.snap_to_include = MvccSnapshot::CreateSnapshotIncludingNoTransactions();
+  undo_opts.io_context = io_context;
   unique_ptr<DeltaIterator> undo_deltas;
   RETURN_NOT_OK_PREPEND(rowset.delta_tracker_->NewDeltaIterator(
       undo_opts, DeltaTracker::UNDOS_ONLY, &undo_deltas), "Could not open UNDOs");
@@ -896,13 +900,14 @@ CompactionInput *CompactionInput::Merge(const vector<shared_ptr<CompactionInput>
 
 Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot &snap,
                                                   const Schema* schema,
+                                                  const IOContext* io_context,
                                                   shared_ptr<CompactionInput> *out) const {
   CHECK(schema->has_column_ids());
 
   vector<shared_ptr<CompactionInput> > inputs;
   for (const shared_ptr<RowSet> &rs : rowsets_) {
     gscoped_ptr<CompactionInput> input;
-    RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, &input),
+    RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context, &input),
                           Substitute("Could not create compaction input for rowset $0",
                                      rs->ToString()));
     inputs.push_back(shared_ptr<CompactionInput>(input.release()));
@@ -1183,7 +1188,7 @@ Status FlushCompactionInput(CompactionInput* input,
   return Status::OK();
 }
 
-Status ReupdateMissedDeltas(const string &tablet_name,
+Status ReupdateMissedDeltas(const IOContext* io_context,
                             CompactionInput *input,
                             const HistoryGcOpts& history_gc_opts,
                             const MvccSnapshot &snap_to_exclude,
@@ -1294,7 +1299,7 @@ Status ReupdateMissedDeltas(const string &tablet_name,
 
         rowid_t num_rows;
         DiskRowSet* cur_drs = diskrowsets.front();
-        RETURN_NOT_OK(cur_drs->CountRows(&num_rows));
+        RETURN_NOT_OK(cur_drs->CountRows(io_context, &num_rows));
 
         // The index on the input side isn't necessarily the index on the output side:
         // we may have output several small DiskRowSets, so we need to find the index
@@ -1310,7 +1315,7 @@ Status ReupdateMissedDeltas(const string &tablet_name,
           DCHECK_GE(idx_in_delta_tracker, 0);
           diskrowsets.pop_front();
           cur_drs = diskrowsets.front();
-          RETURN_NOT_OK(cur_drs->CountRows(&num_rows));
+          RETURN_NOT_OK(cur_drs->CountRows(io_context, &num_rows));
         }
 
         DeltaTracker* cur_tracker = cur_drs->delta_tracker();
@@ -1355,7 +1360,7 @@ Status ReupdateMissedDeltas(const string &tablet_name,
     TRACE_EVENT0("tablet", "Flushing missed deltas");
     for (DeltaTracker* tracker : updated_trackers) {
       VLOG(1) << "Flushing DeltaTracker updated with missed deltas...";
-      RETURN_NOT_OK_PREPEND(tracker->Flush(DeltaTracker::NO_FLUSH_METADATA),
+      RETURN_NOT_OK_PREPEND(tracker->Flush(io_context, DeltaTracker::NO_FLUSH_METADATA),
                             "Could not flush delta tracker after missed delta update");
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/compaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index ae4e7d3..e6fc30a 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -37,6 +37,10 @@ namespace kudu {
 class Arena;
 class Schema;
 
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace tablet {
 
 class DiskRowSet;
@@ -104,6 +108,7 @@ class CompactionInput {
   static Status Create(const DiskRowSet &rowset,
                        const Schema* projection,
                        const MvccSnapshot &snap,
+                       const fs::IOContext* io_context,
                        gscoped_ptr<CompactionInput>* out);
 
   // Create an input which reads from the given memrowset, yielding base rows and updates
@@ -151,6 +156,7 @@ class RowSetsInCompaction {
   // for the lifetime of the returned CompactionInput.
   Status CreateCompactionInput(const MvccSnapshot &snap,
                                const Schema* schema,
+                               const fs::IOContext* io_context,
                                std::shared_ptr<CompactionInput> *out) const;
 
   // Dump a log message indicating the chosen rowsets.
@@ -237,7 +243,7 @@ Status FlushCompactionInput(CompactionInput *input,
 //
 // After return of this function, this CompactionInput object is "used up" and will
 // yield no further rows.
-Status ReupdateMissedDeltas(const std::string &tablet_name,
+Status ReupdateMissedDeltas(const fs::IOContext* io_context,
                             CompactionInput *input,
                             const HistoryGcOpts& history_gc_opts,
                             const MvccSnapshot &snap_to_exclude,

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/delta_compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index 969cf89..d41f565 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -56,6 +56,7 @@ namespace kudu {
 using fs::BlockCreationTransaction;
 using fs::BlockManager;
 using fs::CreateBlockOptions;
+using fs::IOContext;
 using fs::WritableBlock;
 using std::string;
 using std::unique_ptr;
@@ -109,10 +110,11 @@ string MajorDeltaCompaction::ColumnNamesToString() const {
   return result;
 }
 
-Status MajorDeltaCompaction::FlushRowSetAndDeltas() {
+Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
   CHECK_EQ(state_, kInitialized);
 
-  shared_ptr<ColumnwiseIterator> old_base_data_cwise(base_data_->NewIterator(&partial_schema_));
+  shared_ptr<ColumnwiseIterator> old_base_data_cwise(base_data_->NewIterator(&partial_schema_,
+                                                                             io_context));
   gscoped_ptr<RowwiseIterator> old_base_data_rwise(new MaterializingIterator(old_base_data_cwise));
 
   ScanSpec spec;
@@ -288,7 +290,7 @@ Status MajorDeltaCompaction::OpenUndoDeltaFileWriter() {
   return new_undo_delta_writer_->Start();
 }
 
-Status MajorDeltaCompaction::Compact() {
+Status MajorDeltaCompaction::Compact(const IOContext* io_context) {
   CHECK_EQ(state_, kInitialized);
 
   LOG(INFO) << "Starting major delta compaction for columns " << ColumnNamesToString();
@@ -300,7 +302,7 @@ Status MajorDeltaCompaction::Compact() {
 
   // We defer calling OpenRedoDeltaFileWriter() since we might not need to flush.
   RETURN_NOT_OK(OpenBaseDataWriter());
-  RETURN_NOT_OK(FlushRowSetAndDeltas());
+  RETURN_NOT_OK(FlushRowSetAndDeltas(io_context));
   LOG(INFO) << "Finished major delta compaction of columns " <<
       ColumnNamesToString();
   return Status::OK();
@@ -360,7 +362,8 @@ void MajorDeltaCompaction::CreateMetadataUpdate(
 // We're called under diskrowset's component_lock_ and delta_tracker's compact_flush_lock_
 // so both AtomicUpdateStores calls can be done separately and still be seen as one atomic
 // operation.
-Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker) {
+Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker,
+                                                const IOContext* io_context) {
   CHECK_EQ(state_, kFinished);
 
   // 1. Get all the necessary I/O out of the way. It's OK to fail here
@@ -376,14 +379,14 @@ Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker) {
     new_redo_blocks.push_back(new_redo_delta_block_);
   }
   SharedDeltaStoreVector new_redo_stores;
-  RETURN_NOT_OK(tracker->OpenDeltaReaders(new_redo_blocks, &new_redo_stores, REDO));
+  RETURN_NOT_OK(tracker->OpenDeltaReaders(new_redo_blocks, io_context, &new_redo_stores, REDO));
 
   // Create blocks for the new undo deltas.
   SharedDeltaStoreVector new_undo_stores;
   if (undo_delta_mutations_written_ > 0) {
     vector<BlockId> new_undo_blocks;
     new_undo_blocks.push_back(new_undo_delta_block_);
-    RETURN_NOT_OK(tracker->OpenDeltaReaders(new_undo_blocks, &new_undo_stores, UNDO));
+    RETURN_NOT_OK(tracker->OpenDeltaReaders(new_undo_blocks, io_context, &new_undo_stores, UNDO));
   }
 
   // 2. Only now that we cannot fail do we update the in-memory state.

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/delta_compaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.h b/src/kudu/tablet/delta_compaction.h
index 6c99d4b..f790504 100644
--- a/src/kudu/tablet/delta_compaction.h
+++ b/src/kudu/tablet/delta_compaction.h
@@ -33,6 +33,10 @@ namespace kudu {
 
 class FsManager;
 
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace tablet {
 
 class CFileSet;
@@ -65,7 +69,7 @@ class MajorDeltaCompaction {
 
   // Executes the compaction.
   // This has no effect on the metadata of the tablet, etc.
-  Status Compact();
+  Status Compact(const fs::IOContext* io_context);
 
   // After a compaction is successful, prepares a metadata update which:
   // 1) swaps out the old columns for the new ones
@@ -74,7 +78,7 @@ class MajorDeltaCompaction {
   void CreateMetadataUpdate(RowSetMetadataUpdate* update);
 
   // Apply the changes to the given delta tracker.
-  Status UpdateDeltaTracker(DeltaTracker* tracker);
+  Status UpdateDeltaTracker(DeltaTracker* tracker, const fs::IOContext* io_context);
 
  private:
   std::string ColumnNamesToString() const;
@@ -93,7 +97,7 @@ class MajorDeltaCompaction {
   // Reads the current base data, applies the deltas, and then writes the new base data.
   // A new delta file is written if not all columns were selected for compaction and some
   // deltas need to be written back into a delta file.
-  Status FlushRowSetAndDeltas();
+  Status FlushRowSetAndDeltas(const fs::IOContext* io_context);
 
   FsManager* const fs_manager_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index aea8a39..d4ea390 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -37,6 +37,10 @@ class Schema;
 class SelectionVector;
 struct ColumnId;
 
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace tablet {
 
 class DeltaFileWriter;
@@ -52,7 +56,7 @@ class DeltaStore {
  public:
   // Performs any post-construction work for the DeltaStore, which may
   // include additional I/O.
-  virtual Status Init() = 0;
+  virtual Status Init(const fs::IOContext* io_context) = 0;
 
   // Whether this delta store was initialized or not.
   virtual bool Initted() = 0;
@@ -72,7 +76,8 @@ class DeltaStore {
                                   DeltaIterator** iterator) const = 0;
 
   // Set *deleted to true if the latest update for the given row is a deletion.
-  virtual Status CheckRowDeleted(rowid_t row_idx, bool *deleted) const = 0;
+  virtual Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context,
+                                 bool *deleted) const = 0;
 
   // Get the store's estimated size in bytes.
   virtual uint64_t EstimateSize() const = 0;

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 0ab956f..0e41203 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -62,6 +62,7 @@ namespace tablet {
 
 using cfile::ReaderOptions;
 using fs::CreateBlockOptions;
+using fs::IOContext;
 using fs::ReadableBlock;
 using fs::WritableBlock;
 using log::LogAnchorRegistry;
@@ -75,11 +76,12 @@ using strings::Substitute;
 Status DeltaTracker::Open(const shared_ptr<RowSetMetadata>& rowset_metadata,
                           LogAnchorRegistry* log_anchor_registry,
                           const TabletMemTrackers& mem_trackers,
+                          const IOContext* io_context,
                           gscoped_ptr<DeltaTracker>* delta_tracker) {
   gscoped_ptr<DeltaTracker> local_dt(
       new DeltaTracker(rowset_metadata, log_anchor_registry,
                        mem_trackers));
-  RETURN_NOT_OK(local_dt->DoOpen());
+  RETURN_NOT_OK(local_dt->DoOpen(io_context));
 
   delta_tracker->swap(local_dt);
   return Status::OK();
@@ -96,6 +98,7 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
       dms_empty_(true) {}
 
 Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
+                                      const IOContext* io_context,
                                       vector<shared_ptr<DeltaStore> >* stores,
                                       DeltaType type) {
   FsManager* fs = rowset_metadata_->fs_manager();
@@ -112,6 +115,7 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
     shared_ptr<DeltaFileReader> dfr;
     ReaderOptions options;
     options.parent_mem_tracker = mem_trackers_.tablet_tracker;
+    options.io_context = io_context;
     s = DeltaFileReader::OpenNoInit(std::move(block),
                                     type,
                                     std::move(options),
@@ -132,15 +136,17 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
 
 
 // Open any previously flushed DeltaFiles in this rowset
-Status DeltaTracker::DoOpen() {
+Status DeltaTracker::DoOpen(const IOContext* io_context) {
   CHECK(redo_delta_stores_.empty()) << "should call before opening any readers";
   CHECK(undo_delta_stores_.empty()) << "should call before opening any readers";
   CHECK(!open_);
 
   RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->redo_delta_blocks(),
+                                 io_context,
                                  &redo_delta_stores_,
                                  REDO));
   RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->undo_delta_blocks(),
+                                 io_context,
                                  &undo_delta_stores_,
                                  UNDO));
 
@@ -150,13 +156,14 @@ Status DeltaTracker::DoOpen() {
                                       log_anchor_registry_,
                                       mem_trackers_.dms_tracker,
                                       &dms_));
-  RETURN_NOT_OK(dms_->Init());
+  RETURN_NOT_OK(dms_->Init(io_context));
 
   open_ = true;
   return Status::OK();
 }
 
-Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(size_t start_idx, size_t end_idx,
+Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context,
+                                                     size_t start_idx, size_t end_idx,
                                                      const Schema* projection,
                                                      vector<shared_ptr<DeltaStore> > *target_stores,
                                                      vector<BlockId> *target_blocks,
@@ -181,6 +188,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(size_t start_idx, size_t en
   }
   RowIteratorOptions opts;
   opts.projection = projection;
+  opts.io_context = io_context;
   RETURN_NOT_OK(DeltaIteratorMerger::Create(inputs, opts, out));
   return Status::OK();
 }
@@ -209,14 +217,14 @@ Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first
     shared_ptr<DeltaFileReader> first_clone;
     RETURN_NOT_OK(down_cast<DeltaFileReader*>(first.get())->CloneForDebugging(
         rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &first_clone));
-    RETURN_NOT_OK(first_clone->Init());
+    RETURN_NOT_OK(first_clone->Init(nullptr));
     first_copy = first_clone;
   }
   if (!second_copy->Initted()) {
     shared_ptr<DeltaFileReader> second_clone;
     RETURN_NOT_OK(down_cast<DeltaFileReader*>(second.get())->CloneForDebugging(
         rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &second_clone));
-    RETURN_NOT_OK(second_clone->Init());
+    RETURN_NOT_OK(second_clone->Init(nullptr));
     second_copy = second_clone;
   }
 
@@ -309,13 +317,14 @@ void DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_re
                       << JoinDeltaStoreStrings(*stores_to_update);
 }
 
-Status DeltaTracker::Compact() {
-  return CompactStores(0, -1);
+Status DeltaTracker::Compact(const IOContext* io_context) {
+  return CompactStores(io_context, 0, -1);
 }
 
 Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
                                                     const SharedDeltaStoreVector& to_remove,
                                                     const vector<BlockId>& new_delta_blocks,
+                                                    const IOContext* io_context,
                                                     DeltaType type,
                                                     MetadataFlushType flush_type) {
   compact_flush_lock_.AssertAcquired();
@@ -326,7 +335,8 @@ Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate&
   DCHECK(!to_remove.empty());
 
   SharedDeltaStoreVector new_stores;
-  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(new_delta_blocks, &new_stores, type),
+  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(new_delta_blocks, io_context,
+                                         &new_stores, type),
                         "Unable to open delta blocks");
 
   vector<BlockId> removed_blocks;
@@ -356,7 +366,7 @@ Status DeltaTracker::CheckWritableUnlocked() const {
   return Status::OK();
 }
 
-Status DeltaTracker::CompactStores(int start_idx, int end_idx) {
+Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, int end_idx) {
   // Prevent concurrent compactions or a compaction concurrent with a flush
   //
   // TODO(perf): this could be more fine grained
@@ -388,7 +398,7 @@ Status DeltaTracker::CompactStores(int start_idx, int end_idx) {
   // Merge and compact the stores.
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
-  RETURN_NOT_OK(DoCompactStores(start_idx, end_idx, std::move(block),
+  RETURN_NOT_OK(DoCompactStores(io_context, start_idx, end_idx, std::move(block),
                                 &compacted_stores, &compacted_blocks));
 
   vector<BlockId> new_blocks = { new_block_id };
@@ -397,8 +407,8 @@ Status DeltaTracker::CompactStores(int start_idx, int end_idx) {
 
   LOG_WITH_PREFIX(INFO) << "Flushing compaction of redo delta blocks { " << compacted_blocks
                         << " } into block " << new_block_id;
-  RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores, new_blocks, REDO,
-                                                       FLUSH_METADATA),
+  RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores, new_blocks,
+                                                       io_context, REDO, FLUSH_METADATA),
                         "DeltaTracker: CompactStores: Unable to commit delta update");
   return Status::OK();
 }
@@ -426,6 +436,7 @@ Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancie
 
 Status DeltaTracker::InitUndoDeltas(Timestamp ancient_history_mark,
                                     MonoTime deadline,
+                                    const IOContext* io_context,
                                     int64_t* delta_blocks_initialized,
                                     int64_t* bytes_in_ancient_undos) {
   SharedDeltaStoreVector undos_newest_first;
@@ -438,7 +449,7 @@ Status DeltaTracker::InitUndoDeltas(Timestamp ancient_history_mark,
     if (deadline.Initialized() && MonoTime::Now() >= deadline) break;
 
     if (!undo->Initted()) {
-      RETURN_NOT_OK(undo->Init());
+      RETURN_NOT_OK(undo->Init(io_context));
       tmp_blocks_initialized++;
     }
 
@@ -458,6 +469,7 @@ Status DeltaTracker::InitUndoDeltas(Timestamp ancient_history_mark,
 }
 
 Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                             const IOContext* io_context,
                                              int64_t* blocks_deleted, int64_t* bytes_deleted) {
   DCHECK_NE(Timestamp::kInvalidTimestamp, ancient_history_mark);
 
@@ -503,7 +515,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
     RowSetMetadataUpdate update;
     update.RemoveUndoDeltaBlocks(block_ids_to_remove);
     // We do not flush the tablet metadata - that is the caller's responsibility.
-    RETURN_NOT_OK(CommitDeltaStoreMetadataUpdate(update, undos_to_remove, {}, UNDO,
+    RETURN_NOT_OK(CommitDeltaStoreMetadataUpdate(update, undos_to_remove, {}, io_context, UNDO,
                                                  NO_FLUSH_METADATA));
   }
 
@@ -512,10 +524,11 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
   return Status::OK();
 }
 
-Status DeltaTracker::DoCompactStores(size_t start_idx, size_t end_idx,
-         unique_ptr<WritableBlock> block,
-         vector<shared_ptr<DeltaStore> > *compacted_stores,
-         vector<BlockId> *compacted_blocks) {
+Status DeltaTracker::DoCompactStores(const IOContext* io_context,
+                                     size_t start_idx, size_t end_idx,
+                                     unique_ptr<WritableBlock> block,
+                                     vector<shared_ptr<DeltaStore> > *compacted_stores,
+                                     vector<BlockId> *compacted_blocks) {
   unique_ptr<DeltaIterator> inputs_merge;
 
   // Currently, DeltaFile iterators ignore the passed-in projection in
@@ -523,7 +536,8 @@ Status DeltaTracker::DoCompactStores(size_t start_idx, size_t end_idx,
   // If this changes in the future, we'll have to pass in the current tablet
   // schema here.
   Schema empty_schema;
-  RETURN_NOT_OK(MakeDeltaIteratorMergerUnlocked(start_idx, end_idx, &empty_schema, compacted_stores,
+  RETURN_NOT_OK(MakeDeltaIteratorMergerUnlocked(io_context, start_idx, end_idx,
+                                                &empty_schema, compacted_stores,
                                                 compacted_blocks, &inputs_merge));
   LOG_WITH_PREFIX(INFO) << "Compacting " << (end_idx - start_idx + 1) << " delta files.";
   DeltaFileWriter dfw(std::move(block));
@@ -614,14 +628,14 @@ Status DeltaTracker::Update(Timestamp timestamp,
   return s;
 }
 
-Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, bool *deleted,
-                                     ProbeStats* stats) const {
+Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const IOContext* io_context,
+                                     bool *deleted, ProbeStats* stats) const {
   shared_lock<rw_spinlock> lock(component_lock_);
 
 
   *deleted = false;
   // Check if the row has a deletion in DeltaMemStore.
-  RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, deleted));
+  RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted));
   if (*deleted) {
     return Status::OK();
   }
@@ -629,7 +643,7 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, bool *deleted,
   // Then check backwards through the list of trackers.
   for (auto ds = redo_delta_stores_.crbegin(); ds != redo_delta_stores_.crend(); ds++) {
     stats->deltas_consulted++;
-    RETURN_NOT_OK((*ds)->CheckRowDeleted(row_idx, deleted));
+    RETURN_NOT_OK((*ds)->CheckRowDeleted(row_idx, io_context, deleted));
     if (*deleted) {
       return Status::OK();
     }
@@ -639,6 +653,7 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, bool *deleted,
 }
 
 Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
+                              const IOContext* io_context,
                               shared_ptr<DeltaFileReader>* dfr,
                               MetadataFlushType flush_type) {
   // Open file for write.
@@ -666,6 +681,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
   RETURN_NOT_OK(fs->OpenBlock(block_id, &readable_block));
   ReaderOptions options;
   options.parent_mem_tracker = mem_trackers_.tablet_tracker;
+  options.io_context = io_context;
   RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(readable_block),
                                             REDO,
                                             std::move(options),
@@ -681,7 +697,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
   return Status::OK();
 }
 
-Status DeltaTracker::Flush(MetadataFlushType flush_type) {
+Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_type) {
   std::lock_guard<Mutex> l(compact_flush_lock_);
   RETURN_NOT_OK(CheckWritableUnlocked());
 
@@ -704,7 +720,7 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
                                         log_anchor_registry_,
                                         mem_trackers_.dms_tracker,
                                         &dms_));
-    RETURN_NOT_OK(dms_->Init());
+    RETURN_NOT_OK(dms_->Init(nullptr));
     dms_empty_.Store(true);
 
     if (count == 0) {
@@ -724,7 +740,7 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
   // TODO(todd): need another lock to prevent concurrent flushers
   // at some point.
   shared_ptr<DeltaFileReader> dfr;
-  Status s = FlushDMS(old_dms.get(), &dfr, flush_type);
+  Status s = FlushDMS(old_dms.get(), io_context, &dfr, flush_type);
   if (PREDICT_FALSE(!s.ok())) {
     // A failure here leaves a DeltaMemStore permanently in the store list.
     // This isn't allowed, and rolling back the store is difficult, so we leave
@@ -805,12 +821,12 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
   shared_lock<rw_spinlock> lock(component_lock_);
   if (stores == UNDOS_AND_REDOS || stores == UNDOS_ONLY) {
     for (const shared_ptr<DeltaStore>& ds : undo_delta_stores_) {
-      RETURN_NOT_OK(ds->Init());
+      RETURN_NOT_OK(ds->Init(nullptr));
     }
   }
   if (stores == UNDOS_AND_REDOS || stores == REDOS_ONLY) {
     for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
-      RETURN_NOT_OK(ds->Init());
+      RETURN_NOT_OK(ds->Init(nullptr));
     }
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/delta_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index ed32c4b..7a1e76e 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -52,6 +52,7 @@ class OpId;
 }
 
 namespace fs {
+struct IOContext;
 class WritableBlock;
 }
 
@@ -85,6 +86,7 @@ class DeltaTracker {
   static Status Open(const std::shared_ptr<RowSetMetadata>& rowset_metadata,
                      log::LogAnchorRegistry* log_anchor_registry,
                      const TabletMemTrackers& mem_trackers,
+                     const fs::IOContext* io_context,
                      gscoped_ptr<DeltaTracker>* delta_tracker);
 
   Status WrapIterator(const std::shared_ptr<CFileSet::Iterator> &base,
@@ -129,9 +131,8 @@ class DeltaTracker {
   // the TabletMetadata) flushed.
   //
   // NOTE: 'flush_type' should almost always be set to 'FLUSH_METADATA', or else
-  // delta stores might become unrecoverable. TODO: see KUDU-204 to clean this up
-  // a bit.
-  Status Flush(MetadataFlushType flush_type);
+  // delta stores might become unrecoverable.
+  Status Flush(const fs::IOContext* io_context, MetadataFlushType flush_type);
 
   // Update the given row in the database.
   // Copies the data, as well as any referenced values into a local arena.
@@ -147,14 +148,11 @@ class DeltaTracker {
   // delta for this row is a deletion.
   //
   // Sets *deleted to true if so; otherwise sets it to false.
-  Status CheckRowDeleted(rowid_t row_idx, bool *deleted, ProbeStats* stats) const;
+  Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context,
+                         bool *deleted, ProbeStats* stats) const;
 
   // Compacts all REDO delta files.
-  //
-  // TODO keep metadata in the delta stores to indicate whether or not
-  // a minor (or -- when implemented -- major) compaction is warranted
-  // and if so, compact the stores.
-  Status Compact();
+  Status Compact(const fs::IOContext* io_context);
 
   // Updates the in-memory list of delta stores and then persists the updated
   // metadata. This should only be used for compactions or ancient history
@@ -165,6 +163,7 @@ class DeltaTracker {
   Status CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
                                         const SharedDeltaStoreVector& to_remove,
                                         const std::vector<BlockId>& new_delta_blocks,
+                                        const fs::IOContext* io_context,
                                         DeltaType type,
                                         MetadataFlushType flush_type);
 
@@ -172,7 +171,7 @@ class DeltaTracker {
   // "start_idx" and "end_idx" (inclusive) and writes this to a
   // new REDO delta block. If "end_idx" is set to -1, then delta files at
   // all indexes starting with "start_idx" will be compacted.
-  Status CompactStores(int start_idx, int end_idx);
+  Status CompactStores(const fs::IOContext* io_context, int start_idx, int end_idx);
 
   // See RowSet::EstimateBytesInPotentiallyAncientUndoDeltas().
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
@@ -181,17 +180,19 @@ class DeltaTracker {
   // See RowSet::InitUndoDeltas().
   Status InitUndoDeltas(Timestamp ancient_history_mark,
                         MonoTime deadline,
+                        const fs::IOContext* io_context,
                         int64_t* delta_blocks_initialized,
                         int64_t* bytes_in_ancient_undos);
 
   // See RowSet::DeleteAncientUndoDeltas().
-  Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+  Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark, const fs::IOContext* io_context,
                                  int64_t* blocks_deleted, int64_t* bytes_deleted);
 
   // Opens the input 'blocks' of type 'type' and returns the opened delta file
   // readers in 'stores'.
   Status OpenDeltaReaders(const std::vector<BlockId>& blocks,
-                          std::vector<std::shared_ptr<DeltaStore> >* stores,
+                          const fs::IOContext* io_context,
+                          std::vector<std::shared_ptr<DeltaStore>>* stores,
                           DeltaType type);
 
   // Validates that 'first' may precede 'second' in an ordered list of deltas,
@@ -268,9 +269,10 @@ class DeltaTracker {
                log::LogAnchorRegistry* log_anchor_registry,
                TabletMemTrackers mem_trackers);
 
-  Status DoOpen();
+  Status DoOpen(const fs::IOContext* io_context);
 
   Status FlushDMS(DeltaMemStore* dms,
+                  const fs::IOContext* io_context,
                   std::shared_ptr<DeltaFileReader>* dfr,
                   MetadataFlushType flush_type);
 
@@ -285,7 +287,8 @@ class DeltaTracker {
   // NOTE: the caller of this method should acquire or already hold an
   // exclusive lock on 'compact_flush_lock_' before calling this
   // method in order to protect 'redo_delta_stores_'.
-  Status DoCompactStores(size_t start_idx, size_t end_idx,
+  Status DoCompactStores(const fs::IOContext* io_context,
+                         size_t start_idx, size_t end_idx,
                          std::unique_ptr<fs::WritableBlock> block,
                          std::vector<std::shared_ptr<DeltaStore>>* compacted_stores,
                          std::vector<BlockId>* compacted_blocks);
@@ -299,11 +302,11 @@ class DeltaTracker {
   // NOTE: the caller of this method must first acquire or already
   // hold a lock on 'compact_flush_lock_'in order to guard against a
   // race on 'redo_delta_stores_'.
-  Status MakeDeltaIteratorMergerUnlocked(size_t start_idx, size_t end_idx,
-                                         const Schema* schema,
-                                         std::vector<std::shared_ptr<DeltaStore > > *target_stores,
-                                         std::vector<BlockId> *target_blocks,
-                                         std::unique_ptr<DeltaIterator> *out);
+  Status MakeDeltaIteratorMergerUnlocked(const fs::IOContext* io_context,
+                                         size_t start_idx, size_t end_idx, const Schema* projection,
+                                         std::vector<std::shared_ptr<DeltaStore>>* target_stores,
+                                         std::vector<BlockId>* target_blocks,
+                                         std::unique_ptr<DeltaIterator>* out);
 
   std::string LogPrefix() const;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/deltafile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 19d08f0..0096e28 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -380,10 +380,10 @@ TEST_F(TestDeltaFile, TestLazyInit) {
   ASSERT_EQ(0, bytes_read);
 
   // But initializing it should (only the first time).
-  ASSERT_OK(reader->Init());
+  ASSERT_OK(reader->Init(nullptr));
   ASSERT_GT(bytes_read, 0);
   size_t bytes_read_after_init = bytes_read;
-  ASSERT_OK(reader->Init());
+  ASSERT_OK(reader->Init(nullptr));
   ASSERT_EQ(bytes_read_after_init, bytes_read);
 
   // And let's test non-lazy open for good measure; it should yield the

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 805483c..c52e65c 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -82,6 +82,7 @@ using cfile::IndexTreeIterator;
 using cfile::ReaderOptions;
 using fs::BlockCreationTransaction;
 using fs::BlockManager;
+using fs::IOContext;
 using fs::ReadableBlock;
 using fs::WritableBlock;
 
@@ -220,11 +221,12 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
                              ReaderOptions options,
                              shared_ptr<DeltaFileReader>* reader_out) {
   shared_ptr<DeltaFileReader> df_reader;
+  const IOContext* io_context = options.io_context;
   RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(block),
                                             delta_type,
                                             std::move(options),
                                             &df_reader));
-  RETURN_NOT_OK(df_reader->Init());
+  RETURN_NOT_OK(df_reader->Init(io_context));
 
   *reader_out = df_reader;
   return Status::OK();
@@ -235,13 +237,14 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
                                    ReaderOptions options,
                                    shared_ptr<DeltaFileReader>* reader_out) {
   unique_ptr<CFileReader> cf_reader;
+  const IOContext* io_context = options.io_context;
   RETURN_NOT_OK(CFileReader::OpenNoInit(std::move(block),
                                         std::move(options),
                                         &cf_reader));
   gscoped_ptr<DeltaFileReader> df_reader(
       new DeltaFileReader(std::move(cf_reader), delta_type));
   if (!FLAGS_cfile_lazy_open) {
-    RETURN_NOT_OK(df_reader->Init());
+    RETURN_NOT_OK(df_reader->Init(io_context));
   }
 
   reader_out->reset(df_reader.release());
@@ -254,15 +257,15 @@ DeltaFileReader::DeltaFileReader(unique_ptr<CFileReader> cf_reader,
     : reader_(cf_reader.release()),
       delta_type_(delta_type) {}
 
-Status DeltaFileReader::Init() {
-  return init_once_.Init([this] { return InitOnce(); });
+Status DeltaFileReader::Init(const IOContext* io_context) {
+  return init_once_.Init([this, io_context] { return InitOnce(io_context); });
 }
 
-Status DeltaFileReader::InitOnce() {
+Status DeltaFileReader::InitOnce(const IOContext* io_context) {
   // Fully open the CFileReader if it was lazily opened earlier.
   //
   // If it's already initialized, this is a no-op.
-  RETURN_NOT_OK(reader_->Init());
+  RETURN_NOT_OK(reader_->Init(io_context));
 
   if (!reader_->has_validx()) {
     return Status::Corruption("file does not have a value index!");
@@ -349,9 +352,9 @@ Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
   return Status::NotFound("MvccSnapshot outside the range of this delta.");
 }
 
-Status DeltaFileReader::CheckRowDeleted(rowid_t row_idx, bool *deleted) const {
-  RETURN_NOT_OK(const_cast<DeltaFileReader*>(this)->Init());
-
+Status DeltaFileReader::CheckRowDeleted(rowid_t row_idx, const IOContext* io_context,
+                                        bool* deleted) const {
+  RETURN_NOT_OK(const_cast<DeltaFileReader*>(this)->Init(io_context));
   // If there are no deletes in the delta file at all, we can short-circuit
   // the seek.
   if (delta_stats_->delete_count() == 0) {
@@ -365,6 +368,7 @@ Status DeltaFileReader::CheckRowDeleted(rowid_t row_idx, bool *deleted) const {
   Schema empty_schema;
   RowIteratorOptions opts;
   opts.projection = &empty_schema;
+  opts.io_context = io_context;
   DeltaIterator* raw_iter;
   Status s = NewDeltaIterator(opts, &raw_iter);
   if (s.IsNotFound()) {
@@ -427,7 +431,7 @@ Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) {
   DCHECK(initted_) << "Must call Init()";
 
   // Finish the initialization of any lazily-initialized state.
-  RETURN_NOT_OK(dfr_->Init());
+  RETURN_NOT_OK(dfr_->Init(opts_.io_context));
 
   // Check again whether this delta file is relevant given the snapshot
   // that we are querying. We did this already before creating the

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index a7203c3..811db5b 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -70,6 +70,7 @@ namespace fs {
 class BlockCreationTransaction;
 class ReadableBlock;
 class WritableBlock;
+struct IOContext;
 } // namespace fs
 
 namespace tablet {
@@ -158,7 +159,7 @@ class DeltaFileReader : public DeltaStore,
                            cfile::ReaderOptions options,
                            std::shared_ptr<DeltaFileReader>* reader_out);
 
-  virtual Status Init() OVERRIDE;
+  virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
 
   virtual bool Initted() OVERRIDE {
     return init_once_.init_succeeded();
@@ -169,7 +170,9 @@ class DeltaFileReader : public DeltaStore,
                           DeltaIterator** iterator) const OVERRIDE;
 
   // See DeltaStore::CheckRowDeleted
-  virtual Status CheckRowDeleted(rowid_t row_idx, bool *deleted) const OVERRIDE;
+  virtual Status CheckRowDeleted(rowid_t row_idx,
+                                 const fs::IOContext* io_context,
+                                 bool *deleted) const OVERRIDE;
 
   virtual uint64_t EstimateSize() const OVERRIDE;
 
@@ -209,7 +212,7 @@ class DeltaFileReader : public DeltaStore,
                   DeltaType delta_type);
 
   // Callback used in 'init_once_' to initialize this delta file.
-  Status InitOnce();
+  Status InitOnce(const fs::IOContext* io_context);
 
   Status ReadDeltaStats();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/deltamemstore-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc
index 0c3930c..ae7e1c3 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -87,7 +87,7 @@ class TestDeltaMemStore : public KuduTest {
     CHECK_OK(DeltaMemStore::Create(0, 0,
                                    new log::LogAnchorRegistry(),
                                    MemTracker::GetRootTracker(), &dms_));
-    CHECK_OK(dms_->Init());
+    CHECK_OK(dms_->Init(nullptr));
   }
 
   void SetUp() OVERRIDE {

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index ec07372..842a538 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -45,6 +45,7 @@
 namespace kudu {
 namespace tablet {
 
+using fs::IOContext;
 using log::LogAnchorRegistry;
 using std::string;
 using std::shared_ptr;
@@ -85,7 +86,7 @@ DeltaMemStore::DeltaMemStore(int64_t id,
     disambiguator_sequence_number_(0) {
 }
 
-Status DeltaMemStore::Init() {
+Status DeltaMemStore::Init(const IOContext* /*io_context*/) {
   return Status::OK();
 }
 
@@ -151,7 +152,9 @@ Status DeltaMemStore::NewDeltaIterator(const RowIteratorOptions& opts,
   return Status::OK();
 }
 
-Status DeltaMemStore::CheckRowDeleted(rowid_t row_idx, bool *deleted) const {
+Status DeltaMemStore::CheckRowDeleted(rowid_t row_idx,
+                                      const IOContext* /*io_context*/,
+                                      bool *deleted) const {
   *deleted = false;
 
   DeltaKey key(row_idx, Timestamp(0));