You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2023/03/23 16:20:42 UTC

[impala] 03/03: IMPALA-11886: Data cache should support asynchronous writes

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1cfd41e8b10f6e91fc79d50ab58e671d63b65eec
Author: Eyizoha <18...@163.com>
AuthorDate: Thu Feb 16 10:48:39 2023 +0800

    IMPALA-11886: Data cache should support asynchronous writes
    
    This patch implements asynchronous writes to the data cache to improve
    scan performance when a cache miss happens.
    Previously, writes to the data cache are synchronous with hdfs file
    reads, and both are handled by remote hdfs IO threads. In other words,
    if a cache miss occurs,  the IO thread needs to take additional
    responsibility for cache writes,  which will lead to scan performance
    deterioration.
    This patch uses a thread pool for asynchronous writes, and the number of
    threads in the pool is determined by the new configuration
    'data_cache_num_write_threads'. In asynchronous write mode, the IO
    thread only needs to copy data to the temporary buffer when storing data
    into the data cache. The additional memory consumption caused by
    temporary buffers can be limited, depending on the new configuration
    'data_cache_write_buffer_limit'.
    
    Testing:
    - Add test cases for asynchronous data writing to the original
    DataCacheTest using different number of threads.
    - Add DataCacheTest,#OutOfWriteBufferLimit
    Used to test the limit of memory consumed by temporary buffers in the
    case of asynchronous writes
    - Add a timer to the MultiThreadedReadWrite function to get the average
    time of multithreaded writes. Here are some test cases and their time
    that differ significantly between synchronous and asynchronous:
    Test case                | Policy | Sync/Async | write time in ms
    MultiThreadedNoMisses    | LRU    | Sync       |   12.20
    MultiThreadedNoMisses    | LRU    | Async      |   20.74
    MultiThreadedNoMisses    | LIRS   | Sync       |    9.42
    MultiThreadedNoMisses    | LIRS   | Async      |   16.75
    MultiThreadedWithMisses  | LRU    | Sync       |  510.87
    MultiThreadedWithMisses  | LRU    | Async      |   10.06
    MultiThreadedWithMisses  | LIRS   | Sync       | 1872.11
    MultiThreadedWithMisses  | LIRS   | Async      |   11.02
    MultiPartitions          | LRU    | Sync       |    1.20
    MultiPartitions          | LRU    | Async      |    5.23
    MultiPartitions          | LIRS   | Sync       |    1.26
    MultiPartitions          | LIRS   | Async      |    7.91
    AccessTraceAnonymization | LRU    | Sync       | 1963.89
    AccessTraceAnonymization | LRU    | Sync       | 2073.62
    AccessTraceAnonymization | LRU    | Async      |    9.43
    AccessTraceAnonymization | LRU    | Async      |   13.13
    AccessTraceAnonymization | LIRS   | Sync       | 1663.93
    AccessTraceAnonymization | LIRS   | Sync       | 1501.86
    AccessTraceAnonymization | LIRS   | Async      |   12.83
    AccessTraceAnonymization | LIRS   | Async      |   12.74
    
    Change-Id: I878f7486d485b6288de1a9145f49576b7155d312
    Reviewed-on: http://gerrit.cloudera.org:8080/19475
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/data-cache-test.cc  | 166 +++++++++++++++++++++++++++------
 be/src/runtime/io/data-cache-trace.cc |   3 +-
 be/src/runtime/io/data-cache.cc       | 168 ++++++++++++++++++++++++++++++----
 be/src/runtime/io/data-cache.h        |  39 +++++++-
 be/src/runtime/io/disk-io-mgr.cc      |   9 +-
 be/src/util/impalad-metrics.cc        |  22 +++++
 be/src/util/impalad-metrics.h         |  18 ++++
 bin/run-all-tests.sh                  |   6 ++
 bin/start-impala-cluster.py           |   8 ++
 common/thrift/metrics.json            |  40 ++++++++
 10 files changed, 427 insertions(+), 52 deletions(-)

diff --git a/be/src/runtime/io/data-cache-test.cc b/be/src/runtime/io/data-cache-test.cc
index f025b32a4..b1540cf59 100644
--- a/be/src/runtime/io/data-cache-test.cc
+++ b/be/src/runtime/io/data-cache-test.cc
@@ -70,12 +70,22 @@ DECLARE_string(data_cache_eviction_policy);
 DECLARE_string(data_cache_trace_dir);
 DECLARE_int32(max_data_cache_trace_file_size);
 DECLARE_int32(data_cache_trace_percentage);
+DECLARE_int32(data_cache_num_async_write_threads);
+DECLARE_string(data_cache_async_write_buffer_limit);
 
 namespace impala {
 namespace io {
 
 using boost::filesystem::path;
 
+struct DataCacheTestParam {
+  DataCacheTestParam(const std::string& eviction_policy, int32 num_write_threads)
+    : eviction_policy(eviction_policy),
+      num_write_threads(num_write_threads) { }
+  std::string eviction_policy;
+  int32 num_write_threads;
+};
+
 class DataCacheBaseTest : public testing::Test {
  public:
   const uint8_t* test_buffer() {
@@ -90,6 +100,14 @@ class DataCacheBaseTest : public testing::Test {
     return data_cache_trace_dir_;
   }
 
+  /// A function for testing, we need to make sure that all the store tasks have completed
+  /// before any lookup when running test case in async write mode.
+  static void WaitForAsyncWrite(const DataCache& cache) {
+    while (cache.current_buffer_size_.Load() != 0) {
+      usleep(500);
+    }
+  }
+
   //
   // Use multiple threads to insert and read back a set of ranges from test_buffer().
   // Depending on the setting, the working set may or may not fit in the cache.
@@ -104,13 +122,14 @@ class DataCacheBaseTest : public testing::Test {
   //                             from the cache
   //
   void MultiThreadedReadWrite(DataCache* cache, int64_t max_start_offset,
-      bool use_per_thread_filename, bool expect_misses) {
+      bool use_per_thread_filename, bool expect_misses, double* write_time_ms) {
     // Barrier to synchronize all threads so no thread will start probing the cache until
     // all insertions are done.
     CountingBarrier barrier(NUM_THREADS);
 
     vector<unique_ptr<Thread>> threads;
     int num_misses[NUM_THREADS];
+    int64_t write_times_us[NUM_THREADS];
     for (int i = 0; i < NUM_THREADS; ++i) {
       unique_ptr<Thread> thread;
       num_misses[i] = 0;
@@ -118,14 +137,17 @@ class DataCacheBaseTest : public testing::Test {
       ASSERT_OK(Thread::Create("data-cache-test", thread_name,
           boost::bind(&DataCacheBaseTest::ThreadFn, this,
              use_per_thread_filename ? thread_name : "", cache, max_start_offset,
-             &barrier, &num_misses[i]), &thread));
+             &barrier, &num_misses[i], &write_times_us[i]), &thread));
       threads.emplace_back(std::move(thread));
     }
     int cache_misses = 0;
+    int64_t avg_write_time_us = 0;
     for (int i = 0; i < NUM_THREADS; ++i) {
       threads[i]->Join();
       cache_misses += num_misses[i];
+      avg_write_time_us += write_times_us[i];
     }
+    *write_time_ms = static_cast<double>(avg_write_time_us / NUM_THREADS) / 1000;
     if (expect_misses) {
       ASSERT_GT(cache_misses, 0);
     } else {
@@ -168,7 +190,7 @@ class DataCacheBaseTest : public testing::Test {
   }
 
   // Create a bunch of test directories in which the data cache will reside.
-  void SetupWithParameters(std::string eviction_policy) {
+  void SetupWithParameters(DataCacheTestParam param) {
     test_env_.reset(new TestEnv());
     flag_saver_.reset(new google::FlagSaver());
     ASSERT_OK(test_env_->Init());
@@ -189,7 +211,11 @@ class DataCacheBaseTest : public testing::Test {
     FLAGS_data_cache_write_concurrency = NUM_THREADS;
 
     // This is parameterized to allow testing LRU and LIRS
-    FLAGS_data_cache_eviction_policy = eviction_policy;
+    FLAGS_data_cache_eviction_policy = param.eviction_policy;
+
+    // This is parameterized to allow testing asynchronous write.
+    FLAGS_data_cache_num_async_write_threads = param.num_write_threads;
+    FLAGS_data_cache_async_write_buffer_limit = "1G";
   }
 
   // Delete all the test directories created.
@@ -234,23 +260,29 @@ class DataCacheBaseTest : public testing::Test {
   // of cache keys which indirectly control the cache footprint. 'num_misses' records
   // the number of cache misses.
   void ThreadFn(const string& fname_prefix, DataCache* cache, int64_t max_start_offset,
-      CountingBarrier* store_barrier, int* num_misses) {
+      CountingBarrier* store_barrier, int* num_misses, int64_t* write_time_us) {
     const string& custom_fname = Substitute("$0file", fname_prefix);
     vector<int64_t> offsets;
     for (int64_t offset = 0; offset < max_start_offset; ++offset) {
       offsets.push_back(offset);
     }
     random_shuffle(offsets.begin(), offsets.end());
+    int64_t write_start_time = UnixMicros();
     for (int64_t offset : offsets) {
       cache->Store(custom_fname, MTIME, offset, test_buffer() + offset,
           TEMP_BUFFER_SIZE);
     }
+    *write_time_us = UnixMicros() - write_start_time;
     // Wait until all threads have finished inserting. Since different threads may be
     // inserting the same cache key and collide, only one thread which wins the race will
     // insert the cache entry. Make sure other threads which lose out on the race will
     // wait for the insertion to complete first before proceeding.
     store_barrier->Notify();
     store_barrier->Wait();
+
+    // Before continue, we should wait for all asynchronous store tasks (if there are)
+    // finished.
+    WaitForAsyncWrite(*cache);
     for (int64_t offset : offsets) {
       uint8_t buffer[TEMP_BUFFER_SIZE];
       memset(buffer, 0, TEMP_BUFFER_SIZE);
@@ -268,7 +300,7 @@ class DataCacheBaseTest : public testing::Test {
 
 class DataCacheTest :
     public DataCacheBaseTest,
-    public ::testing::WithParamInterface<std::string> {
+    public ::testing::WithParamInterface<DataCacheTestParam> {
  public:
   DataCacheTest()
       : DataCacheBaseTest() {
@@ -280,15 +312,19 @@ class DataCacheTest :
   }
 };
 
-INSTANTIATE_TEST_CASE_P(DataCacheTestTypes, DataCacheTest,
-    ::testing::Values("LRU", "LIRS"));
+INSTANTIATE_TEST_CASE_P(DataCacheTestTypes, DataCacheTest,::testing::Values(
+    DataCacheTestParam("LRU", 0),
+    DataCacheTestParam("LRU", NUM_THREADS),
+    DataCacheTestParam("LIRS", 0),
+    DataCacheTestParam("LIRS", NUM_THREADS)));
 
 // This test exercises the basic insertion and lookup paths by inserting a known set of
 // offsets which fit in the cache entirely. Also tries reading entries which are never
 // inserted into the cache.
 TEST_P(DataCacheTest, TestBasics) {
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   // Temporary buffer for holding results read from the cache.
@@ -308,6 +344,9 @@ TEST_P(DataCacheTest, TestBasics) {
         ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
       }
     }
+    // Before continue, we should wait for all asynchronous store tasks (if there are)
+    // finished.
+    WaitForAsyncWrite(cache);
   }
 
   // Read the same range inserted previously but with a different filename.
@@ -345,6 +384,7 @@ TEST_P(DataCacheTest, TestBasics) {
   uint8_t buffer2[TEMP_BUFFER_SIZE + 10];
   const int64_t larger_entry_size = TEMP_BUFFER_SIZE + 10;
   ASSERT_TRUE(cache.Store(FNAME, MTIME, 0, test_buffer(), larger_entry_size));
+  WaitForAsyncWrite(cache);
   memset(buffer2, 0, larger_entry_size);
   ASSERT_EQ(larger_entry_size,
       cache.Lookup(FNAME, MTIME, 0, larger_entry_size, buffer2));
@@ -373,13 +413,15 @@ TEST_P(DataCacheTest, TestBasics) {
 }
 
 TEST_P(DataCacheTest, NonCanonicalPath) {
-  DataCache cache("/noncanonical/../file/./system/path:1M");
+  DataCache cache("/noncanonical/../file/./system/path:1M",
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_ERROR_MSG(cache.Init(), "/noncanonical/../file/./system/path is not a canonical"
       " path");
 }
 
 TEST_P(DataCacheTest, NonExistantPath) {
-  DataCache cache("/invalid/file/system/path:1M");
+  DataCache cache("/invalid/file/system/path:1M",
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_ERROR_MSG(cache.Init(), "Encountered exception while verifying existence of "
       "directory path /invalid/file/system/path: No such file or directory");
 }
@@ -390,7 +432,8 @@ TEST_P(DataCacheTest, RotateFiles) {
   // Set the maximum size of backing files to be 1/4 of the cache size.
   FLAGS_data_cache_file_max_size_bytes = DEFAULT_CACHE_SIZE / 4;
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   // Read and then insert a range of offsets. Expected all misses in the first iteration
@@ -409,6 +452,7 @@ TEST_P(DataCacheTest, RotateFiles) {
         ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
       }
     }
+    WaitForAsyncWrite(cache);
   }
 
   // Make sure the cache's destructor removes all backing files.
@@ -432,7 +476,8 @@ TEST_P(DataCacheTest, RotateAndDeleteFiles) {
   FLAGS_data_cache_max_opened_files = 1;
 
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   // Read and insert a working set the same size of the cache. Expected all misses
@@ -444,6 +489,7 @@ TEST_P(DataCacheTest, RotateAndDeleteFiles) {
       ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
       ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
           TEMP_BUFFER_SIZE));
+      WaitForAsyncWrite(cache);
     }
   }
 
@@ -486,7 +532,8 @@ TEST_P(DataCacheTest, LRUEviction) {
   // This test is specific to LRU
   if (FLAGS_data_cache_eviction_policy != "LRU") return;
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   // Read and then insert range chunks of size TEMP_BUFFER_SIZE from the test buffer.
@@ -498,6 +545,7 @@ TEST_P(DataCacheTest, LRUEviction) {
       ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
       ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
           TEMP_BUFFER_SIZE));
+      WaitForAsyncWrite(cache);
     }
   }
   // Verifies that the cache is full.
@@ -518,6 +566,7 @@ TEST_P(DataCacheTest, LRUEviction) {
   const string& alt_fname = "random";
   offset = 0;
   ASSERT_TRUE(cache.Store(alt_fname, MTIME, offset, large_buffer.get(), cache_size));
+  WaitForAsyncWrite(cache);
 
   // Verifies that all previous entries are all evicted.
   for (offset = 0; offset < 1028; ++offset) {
@@ -539,28 +588,40 @@ TEST_P(DataCacheTest, LRUEviction) {
 // collision during insertion, all entries in the working set should be found.
 TEST_P(DataCacheTest, MultiThreadedNoMisses) {
   int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   int64_t max_start_offset = NUM_CACHE_ENTRIES_NO_EVICT;
   bool use_per_thread_filename = false;
   bool expect_misses = false;
+  double wait_time_ms;
   MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
-      expect_misses);
+      expect_misses, &wait_time_ms);
+  LOG(INFO) << "MultiThreadedNoMisses, "
+            << FLAGS_data_cache_eviction_policy << ", "
+            << (FLAGS_data_cache_num_async_write_threads > 0 ? "Async" : "Sync")
+            << ", write_time_ms: " << wait_time_ms;
 }
 
 // Inserts a working set which is known to be larger than the cache's capacity.
 // Expect some cache misses in lookups.
 TEST_P(DataCacheTest, MultiThreadedWithMisses) {
   int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   int64_t max_start_offset = 1024;
   bool use_per_thread_filename = true;
   bool expect_misses = true;
+  double wait_time_ms;
   MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
-      expect_misses);
+      expect_misses, &wait_time_ms);
+  LOG(INFO) << "MultiThreadedWithMisses, "
+            << FLAGS_data_cache_eviction_policy << ", "
+            << (FLAGS_data_cache_num_async_write_threads > 0 ? "Async" : "Sync")
+            << ", write_time_ms: " << wait_time_ms;
 }
 
 // Test insertion and lookup with a cache configured with multiple partitions.
@@ -568,14 +629,20 @@ TEST_P(DataCacheTest, MultiPartitions) {
   StringPiece delimiter(",");
   string cache_base = JoinStrings(data_cache_dirs(), delimiter);
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", cache_base, std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", cache_base, std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   int64_t max_start_offset = 512;
   bool use_per_thread_filename = false;
   bool expect_misses = false;
+  double wait_time_ms;
   MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
-      expect_misses);
+      expect_misses, &wait_time_ms);
+  LOG(INFO) << "MultiPartitions, "
+            << FLAGS_data_cache_eviction_policy << ", "
+            << (FLAGS_data_cache_num_async_write_threads > 0 ? "Async" : "Sync")
+            << ", write_time_ms: " << wait_time_ms;
 }
 
 // Tests insertion of a working set whose size is 1/8 of the total memory size.
@@ -585,8 +652,11 @@ TEST_P(DataCacheTest, LargeFootprint) {
   struct sysinfo info;
   ASSERT_EQ(0, sysinfo(&info));
   ASSERT_GT(info.totalram, 0);
+  // Set sufficient buffer size limits to prevent async store failures.
+  FLAGS_data_cache_async_write_buffer_limit = std::to_string(info.totalram);
   DataCache cache(
-      Substitute("$0:$1", data_cache_dirs()[0], std::to_string(info.totalram)));
+      Substitute("$0:$1", data_cache_dirs()[0], std::to_string(info.totalram)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
 
   const int64_t footprint = info.totalram / 8;
@@ -594,6 +664,7 @@ TEST_P(DataCacheTest, LargeFootprint) {
     int64_t offset = i * TEST_BUFFER_SIZE;
     ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer(), TEST_BUFFER_SIZE));
   }
+  WaitForAsyncWrite(cache);
   uint8_t buffer[TEST_BUFFER_SIZE];
   for (int64_t i = 0; i < footprint / TEST_BUFFER_SIZE; ++i) {
     int64_t offset = i * TEST_BUFFER_SIZE;
@@ -620,13 +691,19 @@ TEST_P(DataCacheTest, AccessTraceAnonymization) {
     ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(FLAGS_data_cache_trace_dir));
     {
       DataCache cache(Substitute(
-          "$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+          "$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+          FLAGS_data_cache_num_async_write_threads);
       ASSERT_OK(cache.Init());
 
       bool use_per_thread_filename = true;
       bool expect_misses = true;
+      double wait_time_ms;
       MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
-                             expect_misses);
+          expect_misses, &wait_time_ms);
+      LOG(INFO) << "AccessTraceAnonymization, "
+                << FLAGS_data_cache_eviction_policy << ", "
+                << (FLAGS_data_cache_num_async_write_threads > 0 ? "Async" : "Sync")
+                << ", write_time_ms: " << wait_time_ms;
     }
 
     // Part 1: Read the trace files and ensure that the JSON entries are valid
@@ -686,13 +763,19 @@ TEST_P(DataCacheTest, AccessTraceSubsetPercentage) {
     ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(FLAGS_data_cache_trace_dir));
     {
       DataCache cache(Substitute(
-          "$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+          "$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+          FLAGS_data_cache_num_async_write_threads);
       ASSERT_OK(cache.Init());
 
       bool use_per_thread_filename = true;
       bool expect_misses = true;
+      double wait_time_ms;
       MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
-                             expect_misses);
+          expect_misses, &wait_time_ms);
+      LOG(INFO) << "AccessTraceSubsetPercentage, "
+                << FLAGS_data_cache_eviction_policy << ", "
+                << (FLAGS_data_cache_num_async_write_threads > 0 ? "Async" : "Sync")
+                << ", write_time_ms: " << wait_time_ms;
     }
 
     // Replay the trace and record the counts
@@ -732,7 +815,8 @@ TEST_P(DataCacheTest, RotationalDisk) {
   MockDisk("sda", /*is_rotational*/ true);
 
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
   ASSERT_EQ(1, cache.partitions_[0]->data_cache_write_concurrency_);
 }
@@ -743,7 +827,8 @@ TEST_P(DataCacheTest, NonRotationalDisk) {
   MockDisk("nvme0n1", /*is_rotational*/ false);
 
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
   ASSERT_EQ(8, cache.partitions_[0]->data_cache_write_concurrency_);
 }
@@ -754,11 +839,36 @@ TEST_P(DataCacheTest, InvalidDisk) {
   MockDisk("won't parse", /*is_rotational*/ false);
 
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
-  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
   ASSERT_OK(cache.Init());
   ASSERT_EQ(1, cache.partitions_[0]->data_cache_write_concurrency_);
 }
 
+TEST_P(DataCacheTest, OutOfWriteBufferLimit) {
+  // This test is specific to asynchronous write
+  if (FLAGS_data_cache_num_async_write_threads == 0) return;
+
+  // Setting a lower buffer size limit will fail to insert data larger than this value at
+  // once.
+  FLAGS_data_cache_async_write_buffer_limit = Substitute("$0", TEMP_BUFFER_SIZE << 1);
+
+  const int64_t cache_size = DEFAULT_CACHE_SIZE;
+  DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)),
+      FLAGS_data_cache_num_async_write_threads);
+  ASSERT_OK(cache.Init());
+
+  // Continuously inserting a large amount of entries, there will be a certain number of
+  // store tasks fail due to the write buffer limit.
+  int32_t count = 0;
+  for (int64_t offset = 0; offset < NUM_CACHE_ENTRIES_NO_EVICT; ++offset) {
+    if (cache.Store(FNAME, MTIME, offset, test_buffer() + offset, TEMP_BUFFER_SIZE)) {
+      ++count;
+    }
+  }
+  EXPECT_LT(count, NUM_CACHE_ENTRIES_NO_EVICT);
+}
+
 } // namespace io
 } // namespace impala
 
diff --git a/be/src/runtime/io/data-cache-trace.cc b/be/src/runtime/io/data-cache-trace.cc
index 68bf31deb..f571c52e4 100644
--- a/be/src/runtime/io/data-cache-trace.cc
+++ b/be/src/runtime/io/data-cache-trace.cc
@@ -266,7 +266,8 @@ TraceReplayer::TraceReplayer(string trace_configuration)
 TraceReplayer::~TraceReplayer() {}
 
 Status TraceReplayer::Init() {
-  data_cache_.reset(new DataCache(trace_configuration_, /* trace_replay */ true));
+  data_cache_.reset(new DataCache(trace_configuration_, /* num_async_write_threads */ 0,
+      /* trace_replay */ true));
   RETURN_IF_ERROR(data_cache_->Init());
   initialized_ = true;
   return Status::OK();
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 71ea2452a..2cf45041d 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -116,12 +116,20 @@ DEFINE_string(data_cache_eviction_policy, "LRU",
     "(Advanced) The cache eviction policy to use for the data cache. "
     "Either 'LRU' (default) or 'LIRS' (experimental)");
 
+DEFINE_string(data_cache_async_write_buffer_limit, "1GB",
+    "(Experimental) Limit of the total buffer size used by asynchronous store tasks.");
+
 namespace impala {
 namespace io {
 
 static const int64_t PAGE_SIZE = 1L << 12;
 const char* DataCache::Partition::CACHE_FILE_PREFIX = "impala-cache-file-";
 const int MAX_FILE_DELETER_QUEUE_SIZE = 500;
+
+// This large value for the queue size is harmless because the total size of the entries
+// on the queue are bound by --data_cache_async_write_bufer_limit.
+const int MAX_STORE_TASK_QUEUE_SIZE = 1 << 20;
+
 static const char* PARTITION_PATH_METRIC_KEY_TEMPLATE =
     "impala-server.io-mgr.remote-data-cache-partition-$0.path";
 static const char* PARTITION_READ_LATENCY_METRIC_KEY_TEMPLATE =
@@ -378,6 +386,32 @@ struct DataCache::CacheKey {
   faststring key_;
 };
 
+/// The class to abstract store behavior, holds a temporary buffer copied from the source
+/// buffer until store complete.
+class DataCache::StoreTask {
+ public:
+  explicit StoreTask(const std::string& filename, int64_t mtime, int64_t offset,
+      uint8_t* buffer, int64_t buffer_len, DataCache* cache)
+    : key_(filename, mtime, offset),
+      buffer_(buffer),
+      buffer_len_(buffer_len),
+      cache_(cache) { }
+
+  ~StoreTask() { cache_->CompleteStoreTask(*this); }
+
+  const CacheKey& key() const { return key_; }
+  const uint8_t* buffer() const { return buffer_.get(); }
+  int64_t buffer_len() const { return buffer_len_; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(StoreTask);
+
+  CacheKey key_;
+  std::unique_ptr<uint8_t[]> buffer_;
+  int64_t buffer_len_;
+  DataCache* cache_;
+};
+
 static Cache::EvictionPolicy GetCacheEvictionPolicy(const std::string& policy_string) {
   Cache::EvictionPolicy policy = Cache::ParseEvictionPolicy(policy_string);
   if (policy != Cache::EvictionPolicy::LRU && policy != Cache::EvictionPolicy::LIRS) {
@@ -742,9 +776,6 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
     // Limit the write concurrency to avoid blocking the caller (which could be calling
     // from the critical path of an IO read) when the cache becomes IO bound due to either
     // limited memory for page cache or the cache is undersized which leads to eviction.
-    //
-    // TODO: defer the writes to another thread which writes asynchronously. Need to bound
-    // the extra memory consumption for holding the temporary buffer though.
     const bool exceed_concurrency =
         pending_insert_set_.size() >= data_cache_write_concurrency_;
     if (exceed_concurrency ||
@@ -840,6 +871,14 @@ bool DataCache::Partition::VerifyChecksum(const string& ops_name, const CacheEnt
   return true;
 }
 
+DataCache::DataCache(const std::string config, int32_t num_async_write_threads,
+    bool trace_replay)
+  : config_(config),
+    trace_replay_(trace_replay),
+    num_async_write_threads_(num_async_write_threads) { }
+
+DataCache::~DataCache() { ReleaseResources(); }
+
 Status DataCache::Init() {
   // Verifies all the configured flags are sane.
   if (FLAGS_data_cache_file_max_size_bytes < PAGE_SIZE) {
@@ -861,12 +900,12 @@ Status DataCache::Init() {
 
   // Parse the capacity string to make sure it's well-formed.
   bool is_percent;
-  int64_t capacity = ParseUtil::ParseMemSpec(all_cache_configs[1], &is_percent, 0);
+  per_partition_capacity_ = ParseUtil::ParseMemSpec(all_cache_configs[1], &is_percent, 0);
   if (is_percent) {
     return Status(Substitute("Malformed data cache capacity configuration $0",
         all_cache_configs[1]));
   }
-  if (capacity < PAGE_SIZE) {
+  if (per_partition_capacity_ < PAGE_SIZE) {
     return Status(Substitute("Configured data cache capacity $0 is too small",
         all_cache_configs[1]));
   }
@@ -882,9 +921,9 @@ Status DataCache::Init() {
   int32_t partition_idx = 0;
   for (const string& dir_path : cache_dirs) {
     LOG(INFO) << "Adding partition " << dir_path << " with capacity "
-              << PrettyPrinter::PrintBytes(capacity);
+              << PrettyPrinter::PrintBytes(per_partition_capacity_);
     std::unique_ptr<Partition> partition =
-        make_unique<Partition>(partition_idx, dir_path, capacity,
+        make_unique<Partition>(partition_idx, dir_path, per_partition_capacity_,
             max_opened_files_per_partition, trace_replay_);
     RETURN_IF_ERROR(partition->Init());
     partitions_.emplace_back(move(partition));
@@ -897,16 +936,46 @@ Status DataCache::Init() {
     // will enqueue a request (i.e. a partition index) when it notices the number of files
     // in a partition exceeds the per-partition limit. The files in a partition will be
     // closed in the order they are created until it's within the per-partition limit.
-    file_deleter_pool_.reset(new ThreadPool<int>("impala-server",
+    file_deleter_pool_.reset(new ThreadPool<int>("data-cache",
         "data-cache-file-deleter", 1, MAX_FILE_DELETER_QUEUE_SIZE,
         bind<void>(&DataCache::DeleteOldFiles, this, _1, _2)));
     RETURN_IF_ERROR(file_deleter_pool_->Init());
   }
 
+  /// If --data_cache_num_async_write_threads has been set above 0, the store behavior
+  /// will be asynchronous. A task queue and thread pool will be started for that.
+  if (num_async_write_threads_ > 0) {
+    if (UNLIKELY(trace_replay_)) {
+      return Status("Data cache does not support asynchronous writes when doing trace "
+          "replay. Please set 'data_cache_num_async_write_threads' to 0.");
+    }
+    // Parse the buffer limit string to make sure it's well-formed.
+    bool is_percent;
+    int64_t buffer_limit = ParseUtil::ParseMemSpec(
+        FLAGS_data_cache_async_write_buffer_limit, &is_percent, 0);
+    if (is_percent) {
+      return Status(Substitute("Malformed data cache write buffer limit configuration $0",
+          FLAGS_data_cache_async_write_buffer_limit));
+    }
+    if (!TestInfo::is_be_test() && buffer_limit < (1 << 23 /* 8MB */)) {
+      return Status(Substitute("Configured data cache write buffer limit $0 is too small "
+          "(less than 8MB)", FLAGS_data_cache_async_write_buffer_limit));
+    }
+    store_buffer_capacity_ = buffer_limit;
+    storer_pool_.reset(new ThreadPool<StoreTaskHandle>("data-cache", "data-cache-storer",
+        num_async_write_threads_, MAX_STORE_TASK_QUEUE_SIZE,
+        bind<void>(&DataCache::HandleStoreTask, this, _1, _2)));
+    RETURN_IF_ERROR(storer_pool_->Init());
+  }
+
   return Status::OK();
 }
 
 void DataCache::ReleaseResources() {
+  if (storer_pool_) {
+    storer_pool_->Shutdown();
+    storer_pool_->Join();
+  }
   if (file_deleter_pool_) file_deleter_pool_->Shutdown();
   for (auto& partition : partitions_) partition->ReleaseResources();
 }
@@ -945,19 +1014,14 @@ bool DataCache::Store(const string& filename, int64_t mtime, int64_t offset,
     return false;
   }
 
+  // If the storer thread pool is available, data will be stored asynchronously.
+  if (num_async_write_threads_ > 0) {
+    return SubmitStoreTask(filename, mtime, offset, buffer, buffer_len);
+  }
+
   // Construct a cache key. The cache key is also hashed to compute the partition index.
   const CacheKey key(filename, mtime, offset);
-  int idx = key.Hash() % partitions_.size();
-  bool start_reclaim;
-  bool stored = partitions_[idx]->Store(key, buffer, buffer_len, &start_reclaim);
-  if (VLOG_IS_ON(3)) {
-    stringstream ss;
-    ss << std::hex << reinterpret_cast<int64_t>(buffer);
-    LOG(INFO) << Substitute("Storing $0 mtime: $1 offset: $2 bytes_to_read: $3 "
-        "buffer: 0x$4 stored: $5", filename, mtime, offset, buffer_len, ss.str(), stored);
-  }
-  if (start_reclaim) file_deleter_pool_->Offer(idx);
-  return stored;
+  return StoreInternal(key, buffer, buffer_len);
 }
 
 Status DataCache::CloseFilesAndVerifySizes() {
@@ -972,6 +1036,72 @@ void DataCache::DeleteOldFiles(uint32_t thread_id, int partition_idx) {
   partitions_[partition_idx]->DeleteOldFiles();
 }
 
+bool DataCache::SubmitStoreTask(const std::string& filename, int64_t mtime,
+    int64_t offset, const uint8_t* buffer, int64_t buffer_len) {
+  const int64_t charge_len = BitUtil::RoundUp(buffer_len, PAGE_SIZE);
+  if (UNLIKELY(charge_len > per_partition_capacity_)) return false;
+
+  // Tries to increase the current_buffer_size_ by buffer_len before allocate buffer.
+  // If new size exceeds store_buffer_capacity_, return false and current_buffer_size_ is
+  // not changed.
+  while (true) {
+    int64_t current_size = current_buffer_size_.Load();
+    int64_t new_size = current_size + buffer_len;
+    if (UNLIKELY(new_size > store_buffer_capacity_)) {
+      VLOG(2) << Substitute("Failed to create store task due to buffer size limitation, "
+          "current buffer size: $0 size limitation: $1 require: $2",
+          current_size, store_buffer_capacity_, buffer_len);
+      ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES->
+          Increment(buffer_len);
+      ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES->Increment(1);
+      return false;
+    }
+    if (LIKELY(current_buffer_size_.CompareAndSwap(current_size, new_size))) {
+      ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES->SetValue(
+          current_buffer_size_.Load());
+      break;
+    }
+  }
+
+  DCHECK(buffer != nullptr);
+  // TODO: Should we use buffer pool instead of piecemeal memory allocate?
+  uint8_t* task_buffer = new uint8_t[buffer_len];
+  memcpy(task_buffer, buffer, buffer_len);
+
+  const StoreTask* task =
+      new StoreTask(filename, mtime, offset, task_buffer, buffer_len, this);
+  storer_pool_->Offer(StoreTaskHandle(task));
+  ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_ASYNC_WRITES_SUBMITTED->Increment(1);
+  return true;
+}
+
+void DataCache::CompleteStoreTask(const StoreTask& task) {
+  current_buffer_size_.Add(-task.buffer_len());
+  DCHECK_GE(current_buffer_size_.Load(), 0);
+  ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES->SetValue(
+      current_buffer_size_.Load());
+}
+
+void DataCache::HandleStoreTask(uint32_t thread_id, const StoreTaskHandle& task) {
+    StoreInternal(task->key(), task->buffer(), task->buffer_len());
+}
+
+bool DataCache::StoreInternal(const CacheKey& key, const uint8_t* buffer,
+    int64_t buffer_len) {
+  int idx = key.Hash() % partitions_.size();
+  bool start_reclaim;
+  bool stored = partitions_[idx]->Store(key, buffer, buffer_len, &start_reclaim);
+  if (VLOG_IS_ON(3)) {
+    stringstream ss;
+    ss << std::hex << reinterpret_cast<int64_t>(buffer);
+    LOG(INFO) << Substitute("Storing $0 mtime: $1 offset: $2 bytes_to_read: $3 "
+        "buffer: 0x$4 stored: $5", key.filename().ToString(), key.mtime(), key.offset(),
+        buffer_len, ss.str(), stored);
+  }
+  if (start_reclaim) file_deleter_pool_->Offer(idx);
+  return stored;
+}
+
 void DataCache::Partition::Trace(
     const trace::EventType& type, const DataCache::CacheKey& key,
     int64_t lookup_len, int64_t entry_len) {
diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h
index f7a580e7a..1f5cd870b 100644
--- a/be/src/runtime/io/data-cache.h
+++ b/be/src/runtime/io/data-cache.h
@@ -123,10 +123,10 @@ class DataCache {
   /// an optimized mode that skips all file operations and only does the metadata
   /// operations. This is used to replay the access trace and compare different cache
   /// configurations. See data-cache-trace.h
-  explicit DataCache(const std::string config, bool trace_replay = false)
-    : config_(config), trace_replay_(trace_replay) { }
+  explicit DataCache(const std::string config, int32_t num_async_write_threads = 0,
+      bool trace_replay = false);
 
-  ~DataCache() { ReleaseResources(); }
+  ~DataCache();
 
   /// Parses the configuration string, initializes all partitions in the cache by
   /// checking for storage space available and creates a backing file for caching.
@@ -200,6 +200,7 @@ class DataCache {
   class CacheFile;
   struct CacheKey;
   class CacheEntry;
+  class StoreTask;
 
   /// An implementation of a cache partition. Each partition maintains its own set of
   /// cache keys in a LRU cache.
@@ -390,6 +391,9 @@ class DataCache {
   /// The configuration string for the data cache.
   const std::string config_;
 
+  /// The capacity in bytes of one partition.
+  int64_t per_partition_capacity_;
+
   /// Set to true if this is only doing trace replay. Trace replay does only metadata
   /// operations, and no filesystem operations are required.
   bool trace_replay_;
@@ -406,6 +410,35 @@ class DataCache {
   /// in partitions_[partition_idx].
   void DeleteOldFiles(uint32_t thread_id, int partition_idx);
 
+  /// Create a new store task and copy the data to a temporary buffer, then submit it to
+  /// the asynchronous write thread pool for handling. May abort due to buffer size limit.
+  /// Return true if success.
+  bool SubmitStoreTask(const std::string& filename, int64_t mtime, int64_t offset,
+      const uint8_t* buffer, int64_t buffer_len);
+
+  /// Called by StoreTask's d'tor, decrease the current_buffer_size_ by task's buffer_len.
+  void CompleteStoreTask(const StoreTask& task);
+
+  /// Thread pool for storing cache asynchronously, it is initialized only if
+  /// 'data_cache_num_async_write_threads' has been set above 0, and creates a
+  /// corresponding number of worker threads.
+  int32_t num_async_write_threads_;
+  using StoreTaskHandle = std::unique_ptr<const StoreTask>;
+  std::unique_ptr<ThreadPool<StoreTaskHandle>> storer_pool_;
+
+  /// Thread function called by threads in 'storer_pool_' for handling store task.
+  void HandleStoreTask(uint32_t thread_id, const StoreTaskHandle& task);
+
+  /// Limit of the total buffer size used by asynchronous store tasks, when the current
+  /// buffer size reaches the limit, the subsequent store task will be abandoned.
+  int64_t store_buffer_capacity_;
+
+  /// Total buffer size currently used by all asynchronous store tasks.
+  AtomicInt64 current_buffer_size_{0};
+
+  /// Call the corresponding cache partition for storing.
+  bool StoreInternal(const CacheKey& key, const uint8_t* buffer, int64_t buffer_len);
+
 };
 
 } // namespace io
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 43b744f69..29babb3e0 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -74,6 +74,12 @@ DEFINE_string(data_cache, "", "The configuration string for IO data cache. "
     "a capacity quota per directory. For example /data/0,/data/1:1TB means the cache "
     "may use up to 2TB, with 1TB max in /data/0 and /data/1 respectively. Please note "
     "that each Impala daemon on a host must have a unique caching directory.");
+DEFINE_int32(data_cache_num_async_write_threads, 0,
+    "(Experimental) Number of data cache async write threads. Write threads will write "
+    "the cache asynchronously after IO thread read data, so IO thread will return more "
+    "quickly. The extra memory for temporary buffers is limited by "
+    "--data_cache_async_write_buffer_limit. If this's 0, then write will be "
+    "synchronous.");
 
 // Rotational disks should have 1 thread per disk to minimize seeks.  Non-rotational
 // don't have this penalty and benefit from multiple concurrent IO requests.
@@ -682,7 +688,8 @@ Status DiskIoMgr::Init() {
   DCHECK_EQ(ret, 0);
 
   if (!FLAGS_data_cache.empty()) {
-    remote_data_cache_.reset(new DataCache(FLAGS_data_cache));
+    remote_data_cache_.reset(
+        new DataCache(FLAGS_data_cache, FLAGS_data_cache_num_async_write_threads));
     RETURN_IF_ERROR(remote_data_cache_->Init());
   }
   return Status::OK();
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index cb88adca5..b5be78330 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -83,6 +83,14 @@ const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_DROPPED_ENTRIES =
     "impala-server.io-mgr.remote-data-cache-dropped-entries";
 const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_INSTANT_EVICTIONS =
     "impala-server.io-mgr.remote-data-cache-instant-evictions";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES =
+    "impala-server.io-mgr.remote-data-cache-async-writes-outstanding-bytes";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_NUM_ASYNC_WRITES_SUBMITTED =
+    "impala-server.io-mgr.remote-data-cache-num-async-writes-submitted";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES =
+    "impala-server.io-mgr.remote-data-cache-async-writes-dropped-bytes";
+const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES =
+    "impala-server.io-mgr.remote-data-cache-async-writes-dropped-entries";
 const char* ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN =
     "impala-server.io-mgr.bytes-written";
 const char* ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES =
@@ -181,6 +189,12 @@ IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_WRITES = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_ENTRIES = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_INSTANT_EVICTIONS = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES =
+    nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_ASYNC_WRITES_SUBMITTED = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES =
+    nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_BYTES_WRITTEN = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED = nullptr;
 IntCounter* ImpaladMetrics::HEDGED_READ_OPS = nullptr;
@@ -376,6 +390,14 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_DROPPED_ENTRIES, 0);
   IO_MGR_REMOTE_DATA_CACHE_INSTANT_EVICTIONS = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_INSTANT_EVICTIONS, 0);
+  IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES = IO_MGR_METRICS->AddCounter(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES, 0);
+  IO_MGR_REMOTE_DATA_CACHE_NUM_ASYNC_WRITES_SUBMITTED = IO_MGR_METRICS->AddCounter(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_NUM_ASYNC_WRITES_SUBMITTED, 0);
+  IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES = IO_MGR_METRICS->AddCounter(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES, 0);
+  IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES = IO_MGR_METRICS->AddCounter(
+      ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES, 0);
 
   IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO =
       StatsMetric<uint64_t, StatsType::MEAN>::CreateAndRegister(IO_MGR_METRICS,
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index 08131da30..3c1f0e17f 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -114,6 +114,20 @@ class ImpaladMetricKeys {
   /// Total number of entries evicted immediately from the remote data cache.
   static const char* IO_MGR_REMOTE_DATA_CACHE_INSTANT_EVICTIONS;
 
+  /// Total number of bytes async writes outstanding in the remote data cache.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES;
+
+  /// Total number of async writes submitted in the remote data cache.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_NUM_ASYNC_WRITES_SUBMITTED;
+
+  /// Total number of bytes not inserted in the remote data cache due to async writes
+  /// buffer size limit.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES;
+
+  /// Total number of entries not inserted in the remote data cache due to async writes
+  /// buffer size limit.
+  static const char* IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES;
+
   /// Total number of bytes written to disk by the io mgr (for spilling)
   static const char* IO_MGR_BYTES_WRITTEN;
 
@@ -274,6 +288,10 @@ class ImpaladMetrics {
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_DROPPED_ENTRIES;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_INSTANT_EVICTIONS;
+  static IntCounter* IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_OUTSTANDING_BYTES;
+  static IntCounter* IO_MGR_REMOTE_DATA_CACHE_NUM_ASYNC_WRITES_SUBMITTED;
+  static IntCounter* IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES;
+  static IntCounter* IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES;
   static IntCounter* IO_MGR_SHORT_CIRCUIT_BYTES_READ;
   static IntCounter* IO_MGR_BYTES_WRITTEN;
   static IntCounter* IO_MGR_CACHED_FILE_HANDLES_REOPENED;
diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh
index ca45f6a61..e4381293c 100755
--- a/bin/run-all-tests.sh
+++ b/bin/run-all-tests.sh
@@ -65,6 +65,8 @@ fi
 : ${DATA_CACHE_SIZE:=}
 # Data cache eviction policy
 : ${DATA_CACHE_EVICTION_POLICY:=}
+# Number of data cache async write threads.
+: ${DATA_CACHE_NUM_ASYNC_WRITE_THREADS:=}
 if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
   # TODO: Remove abort_on_config_error flag from here and create-load-data.sh once
   # checkConfiguration() accepts the local filesystem (see IMPALA-1850).
@@ -83,6 +85,10 @@ if [[ -n "${DATA_CACHE_DIR}" && -n "${DATA_CACHE_SIZE}" ]]; then
        TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} "`
            `"--data_cache_eviction_policy=${DATA_CACHE_EVICTION_POLICY}"
    fi
+   if [[ -n "${DATA_CACHE_NUM_ASYNC_WRITE_THREADS}" ]]; then
+       TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} "`
+           `"--data_cache_num_async_write_threads=${DATA_CACHE_NUM_ASYNC_WRITE_THREADS}"
+   fi
    # Force use of data cache for HDFS. Data cache is only enabled for remote reads.
    if [[ "${TARGET_FILESYSTEM}" == "hdfs" ]]; then
       TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} "`
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 40ccd3d36..5f93f8170 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -132,6 +132,10 @@ parser.add_option("--data_cache_size", dest="data_cache_size", default=0,
 parser.add_option("--data_cache_eviction_policy", dest="data_cache_eviction_policy",
                   default="LRU", help="This specifies the cache eviction policy to use "
                   "for the data cache")
+parser.add_option("--data_cache_num_async_write_threads",
+                  dest="data_cache_num_async_write_threads", default=0,
+                  help="This specifies the number of asynchronous write threads for the "
+                  "data cache, with 0 set means synchronous writes.")
 parser.add_option("--data_cache_enable_tracing", dest="data_cache_enable_tracing",
                   action="store_true", default=False,
                   help="If the data cache is enabled, this enables tracing accesses.")
@@ -409,6 +413,10 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi
       args = "-data_cache_eviction_policy={policy} {args}".format(
           policy=options.data_cache_eviction_policy, args=args)
 
+      # Add the number of async write threads.
+      args = "-data_cache_num_async_write_threads={num_threads} {args}".format(
+          num_threads=options.data_cache_num_async_write_threads, args=args)
+
       # Add access tracing arguments if requested
       if options.data_cache_enable_tracing:
         tracing_args = ""
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 403724480..c24d49e7d 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -659,6 +659,46 @@
     "kind": "HISTOGRAM",
     "key": "impala-server.io-mgr.remote-data-cache-partition-$0.eviction-latency"
   },
+  {
+    "description": "Total number of bytes of async writes outstanding in the remote data cache.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Async Writes Outstanding In Bytes",
+    "units": "BYTES",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.remote-data-cache-async-writes-outstanding-bytes"
+  },
+  {
+    "description": "Total number of async writes submitted in the remote data cache.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Num Async Writes Submitted",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.remote-data-cache-num-async-writes-submitted"
+  },
+  {
+    "description": "Total number of bytes not inserted in the remote data cache due to async writes buffer size limit.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Bytes Not Inserted Due To Async Writes Buffer Size Limit",
+    "units": "BYTES",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.remote-data-cache-async-writes-dropped-bytes"
+  },
+  {
+    "description": "Total number of entries not inserted in the remote data cache due to async writes buffer size limit.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Remote Data Cache Entries Not Inserted Due To Async Writes Buffer Size Limit",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.remote-data-cache-async-writes-dropped-entries"
+  },
   {
     "description": "The number of allocated IO buffers. IO buffers are shared by all queries.",
     "contexts": [