You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/08/23 19:25:19 UTC

[impala] branch master updated: IMPALA-8691: Query option to disable data cache

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 9874ce3  IMPALA-8691: Query option to disable data cache
9874ce3 is described below

commit 9874ce37a989240571e2473dce3153357a0e417f
Author: Michael Ho <kw...@cloudera.com>
AuthorDate: Fri Jul 12 17:09:02 2019 -0700

    IMPALA-8691: Query option to disable data cache
    
    This change adds a query option to disable the data cache for
    a given session. By default, this option is set to false. When
    it's set to true, all queries will by-pass the data cache. This
    allows users to avoid polluting the cache for accesses to tables
    which they don't want to cache. A follow-up change will add
    a per-table query hint to allow caching disabled for a given
    table only.
    
    There is some small refactoring in the code to make it clearer
    the type of caching being referred to in the code. As the code
    stands now, we have both HDFS caching (for local reads) and the
    data cache (for remote reads). BufferOpts has been extended to
    allow users to explicitly state intention for using either/both
    of the caches.
    
    Change-Id: I39122ac38435cedf94b2b39145863764d0b5b6c8
    Reviewed-on: http://gerrit.cloudera.org:8080/14015
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc          |  7 ++-
 be/src/exec/hdfs-orc-scanner.cc               |  3 +-
 be/src/exec/hdfs-scan-node-base.cc            | 22 +++++++---
 be/src/exec/hdfs-scan-node-base.h             | 13 +++---
 be/src/exec/hdfs-scanner.cc                   | 15 ++++---
 be/src/exec/hdfs-text-scanner.cc              |  2 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc   |  5 ++-
 be/src/exec/parquet/parquet-column-readers.cc |  2 +-
 be/src/exec/parquet/parquet-page-index.cc     |  5 ++-
 be/src/exec/scan-node.cc                      |  4 ++
 be/src/exec/scan-node.h                       |  3 ++
 be/src/exec/scanner-context.cc                |  7 ++-
 be/src/runtime/io/disk-io-mgr-test.cc         | 13 +++---
 be/src/runtime/io/hdfs-file-reader.cc         | 12 ++----
 be/src/runtime/io/request-context.cc          |  6 +--
 be/src/runtime/io/request-ranges.h            | 62 ++++++++++++++++++---------
 be/src/runtime/io/scan-range.cc               |  5 ++-
 be/src/runtime/tmp-file-mgr.cc                |  2 +-
 be/src/scheduling/scheduler-test-util.cc      |  6 +--
 be/src/scheduling/scheduler.cc                | 10 ++---
 be/src/service/query-options.cc               |  4 ++
 be/src/service/query-options.h                |  6 ++-
 common/thrift/ImpalaInternalService.thrift    |  5 ++-
 common/thrift/ImpalaService.thrift            |  3 ++
 tests/custom_cluster/test_data_cache.py       | 29 +++++++++++++
 25 files changed, 171 insertions(+), 80 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index dd7fc63..40234e3 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -62,9 +62,12 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     // it is not cached.
     // TODO: add remote disk id and plumb that through to the io mgr.  It should have
     // 1 queue for each NIC as well?
+    bool expected_local = false;
+    int cache_options = !scan_node->IsDataCacheDisabled() ? BufferOpts::USE_DATA_CACHE :
+        BufferOpts::NO_CACHING;
     ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
-        files[i]->filename.c_str(), header_size, 0, header_metadata, -1, false,
-        files[i]->is_erasure_coded, files[i]->mtime, BufferOpts::Uncached());
+        files[i]->filename.c_str(), header_size, 0, header_metadata, -1, expected_local,
+        files[i]->is_erasure_coded, files[i]->mtime, BufferOpts(cache_options));
     header_ranges.push_back(header_range);
   }
   // When the header is parsed, we will issue more AddDiskIoRanges in
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 37e08d6..0c5739b 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -106,11 +106,12 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
 
   // Set expected_local to false to avoid cache on stale data (IMPALA-6830)
   bool expected_local = false;
+  int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* range = scanner_->scan_node_->AllocateScanRange(
       metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
       split_range->disk_id(), expected_local, split_range->is_erasure_coded(),
       split_range->mtime(),
-      BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length));
+      BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
 
   unique_ptr<BufferDescriptor> io_buffer;
   Status status;
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index a43e9eb..0286d30 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -57,6 +57,10 @@
 DECLARE_bool(skip_file_runtime_filtering);
 #endif
 
+DEFINE_bool(always_use_data_cache, false, "(Advanced) Always uses the IO data cache "
+    "for all reads, regardless of whether the read is local or remote. By default, the "
+    "IO data cache is only used if the data is expected to be remote. Used by tests.");
+
 namespace filesystem = boost::filesystem;
 using namespace impala;
 using namespace impala::io;
@@ -235,11 +239,17 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     bool expected_local = params.__isset.is_remote && !params.is_remote;
     if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id;
 
-    bool try_cache = params.is_cached;
+    int cache_options = BufferOpts::NO_CACHING;
+    if (params.__isset.try_hdfs_cache && params.try_hdfs_cache) {
+      cache_options |= BufferOpts::USE_HDFS_CACHE;
+    }
+    if ((!expected_local || FLAGS_always_use_data_cache) && !IsDataCacheDisabled()) {
+      cache_options |= BufferOpts::USE_DATA_CACHE;
+    }
     file_desc->splits.push_back(
         AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
             split.offset, split.partition_id, params.volume_id, expected_local,
-            file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(try_cache)));
+            file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(cache_options)));
   }
 
   // Update server wide metrics for number of scan ranges and ranges that have
@@ -633,11 +643,11 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
 }
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
-    int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
-    bool expected_local, int64_t mtime, bool is_erasure_coded,
-    const ScanRange* original_split) {
+    int64_t len, int64_t offset, int64_t partition_id, int disk_id,
+    int cache_options, bool expected_local, int64_t mtime,
+    bool is_erasure_coded, const ScanRange* original_split) {
   return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local,
-      is_erasure_coded, mtime, BufferOpts(try_cache), original_split);
+      is_erasure_coded, mtime, BufferOpts(cache_options), original_split);
 }
 
 Status HdfsScanNodeBase::AddDiskIoRanges(const vector<ScanRange*>& ranges,
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 4cb8d0f..12b4748 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -254,7 +254,7 @@ class HdfsScanNodeBase : public ScanNode {
       int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
       bool is_erasure_coded, int64_t mtime,
       const io::BufferOpts& buffer_opts,
-      const io::ScanRange* original_split = NULL);
+      const io::ScanRange* original_split = nullptr);
 
   /// Same as above, but it takes a pointer to a ScanRangeMetadata object which contains
   /// the partition_id, original_splits, and other information about the scan range.
@@ -266,8 +266,8 @@ class HdfsScanNodeBase : public ScanNode {
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
       int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded,
-      int64_t mtime,
-      const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = NULL);
+      int64_t mtime, const io::BufferOpts& buffer_opts,
+      const io::ScanRange* original_split = nullptr);
 
   /// Same as above, but it takes both sub-ranges and metadata.
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
@@ -277,10 +277,9 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// Old API for compatibility with text scanners (e.g. LZO text scanner).
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
-      int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
-      bool expected_local, int64_t mtime,
-      bool is_erasure_coded = false,
-      const io::ScanRange* original_split = NULL);
+      int64_t offset, int64_t partition_id, int disk_id, int cache_options,
+      bool expected_local, int64_t mtime, bool is_erasure_coded = false,
+      const io::ScanRange* original_split = nullptr);
 
   /// Adds ranges to the io mgr queue. Can be overridden to add scan-node specific
   /// actions like starting scanner threads. Must not be called once
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index a177748..345a140 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -812,18 +812,19 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
           footer_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), footer_size, footer_start,
               split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->expected_local(), files[i]->is_erasure_coded,
-              files[i]->mtime, BufferOpts(footer_split->try_cache()),
-              split);
+              footer_split->expected_local(), files[i]->is_erasure_coded, files[i]->mtime,
+              BufferOpts(footer_split->cache_options()), split);
         } else {
           // If we did not find the last split, we know it is going to be a remote read.
+          bool expected_local = false;
+          int cache_options = !scan_node->IsDataCacheDisabled() ?
+              BufferOpts::USE_DATA_CACHE : BufferOpts::NO_CACHING;
           footer_range =
               scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
-                   footer_size, footer_start, split_metadata->partition_id, -1, false,
-                   files[i]->is_erasure_coded, files[i]->mtime, BufferOpts::Uncached(),
-                   split);
+                   footer_size, footer_start, split_metadata->partition_id, -1,
+                   expected_local, files[i]->is_erasure_coded, files[i]->mtime,
+                   BufferOpts(cache_options), split);
         }
-
         footer_ranges.push_back(footer_range);
       } else {
         scan_node->RangeComplete(file_type, THdfsCompression::NONE);
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f1e9d26..ad43cda 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -118,7 +118,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
               files[i]->filename.c_str(), files[i]->file_length, 0,
               metadata->partition_id, split->disk_id(), split->expected_local(),
               files[i]->is_erasure_coded, files[i]->mtime,
-              BufferOpts(split->try_cache()));
+              BufferOpts(split->cache_options()));
           compressed_text_scan_ranges.push_back(file_range);
           scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
         }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 35c2e70..10571b2 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1433,12 +1433,13 @@ Status HdfsParquetScanner::ProcessFooter() {
     }
     metadata_ptr = metadata_buffer.buffer();
 
-    // Read the footer into the metadata buffer.
+    // Read the footer into the metadata buffer. Skip HDFS caching in this case.
+    int cache_options = metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
     ScanRange* metadata_range = scan_node_->AllocateScanRange(
         metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
         metadata_range_->disk_id(), metadata_range_->expected_local(),
         metadata_range_->is_erasure_coded(), metadata_range_->mtime(),
-        BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
+        BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size, cache_options));
 
     unique_ptr<BufferDescriptor> io_buffer;
     bool needs_buffers;
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index 422c89f..f044b86 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1140,7 +1140,7 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
   scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
       filename(), col_len, col_start, move(sub_ranges), partition_id,
       split_range->disk_id(),col_range_local, split_range->is_erasure_coded(),
-      file_desc.mtime, BufferOpts(split_range->try_cache()));
+      file_desc.mtime, BufferOpts(split_range->cache_options()));
   ClearDictionaryDecoder();
   return Status::OK();
 }
diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc
index ef979cd..6c9b2bf 100644
--- a/be/src/exec/parquet/parquet-page-index.cc
+++ b/be/src/exec/parquet/parquet-page-index.cc
@@ -93,12 +93,15 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) {
         "page index for file '$1'.", buffer_size, scanner_->filename()));
   }
   int64_t partition_id = scanner_->context_->partition_descriptor()->id();
+  int cache_options =
+      scanner_->metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* object_range = scanner_->scan_node_->AllocateScanRange(
       scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size,
       scan_range_start, move(sub_ranges), partition_id,
       scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(),
       scanner_->metadata_range_->is_erasure_coded(), scanner_->metadata_range_->mtime(),
-      BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size()));
+      BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(),
+          cache_options));
 
   unique_ptr<BufferDescriptor> io_buffer;
   bool needs_buffers;
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 710d1ea..cad80ac 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -78,6 +78,10 @@ const string ScanNode::AVERAGE_HDFS_READ_THREAD_CONCURRENCY =
 const string ScanNode::NUM_SCANNER_THREADS_STARTED =
     "NumScannerThreadsStarted";
 
+bool ScanNode::IsDataCacheDisabled() const {
+  return runtime_state()->query_options().disable_data_cache;
+}
+
 Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   const TQueryOptions& query_options = state->query_options();
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 7c3a800..b2294bf 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -113,6 +113,9 @@ class ScanNode : public ExecNode {
 
   virtual bool IsScanNode() const { return true; }
 
+  /// Returns true iff the data cache in IoMgr is disabled by query options.
+  bool IsDataCacheDisabled() const;
+
   RuntimeState* runtime_state() const { return runtime_state_; }
   RuntimeProfile::Counter* bytes_read_counter() const { return bytes_read_counter_; }
   RuntimeProfile::Counter* rows_read_counter() const { return rows_read_counter_; }
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 7b44200..a7fbf7a 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -154,10 +154,13 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
       return Status::OK();
     }
     int64_t partition_id = parent_->partition_descriptor()->id();
+    bool expected_local = false;
+    // Disable HDFS caching as we are reading past the end.
+    int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
-        scan_range_->disk_id(), false, scan_range_->is_erasure_coded(),
-        scan_range_->mtime(), BufferOpts::Uncached());
+        scan_range_->disk_id(), expected_local, scan_range_->is_erasure_coded(),
+        scan_range_->mtime(), BufferOpts(cache_options));
     bool needs_buffers;
     RETURN_IF_ERROR(
         parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index bff89f6..871b671 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -239,11 +239,13 @@ class DiskIoMgrTest : public testing::Test {
   }
 
   ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len,
-      int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false,
+      int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_hdfs_cached = false,
       std::vector<ScanRange::SubRange> sub_ranges = {}) {
     ScanRange* range = pool->Add(new ScanRange);
+    int cache_options =
+        is_hdfs_cached ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
     range->Reset(nullptr, file_path, len, offset, disk_id, true, false, mtime,
-        BufferOpts(is_cached), move(sub_ranges), meta_data);
+        BufferOpts(cache_options), move(sub_ranges), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
@@ -1447,7 +1449,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     int scan_len = min(len, buffer_len);
     ScanRange* range = pool_.Add(new ScanRange);
     range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, false, ScanRange::INVALID_MTIME,
-        BufferOpts::ReadInto(client_buffer.data(), buffer_len));
+        BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
     ASSERT_FALSE(needs_buffers);
@@ -1492,8 +1494,9 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
     int result_len = strlen(expected_result);
     vector<uint8_t> client_buffer(result_len);
     ScanRange* range = pool_.Add(new ScanRange);
+    int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
     range->Reset(nullptr, tmp_file, data_len, 0, 0, true, false, stat_val.st_mtime,
-        BufferOpts::ReadInto(fake_cache, client_buffer.data(), result_len),
+        BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
         move(sub_ranges));
     if (fake_cache) {
       SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
@@ -1543,7 +1546,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
     unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
     ScanRange* range = pool_.Add(new ScanRange);
     range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, false, ScanRange::INVALID_MTIME,
-        BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
+        BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
     ASSERT_FALSE(needs_buffers);
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 7e959ae..629c09c 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -34,9 +34,6 @@
 DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
     "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
     "(assuming the HDFS client is configured to do so).");
-DEFINE_bool(always_use_data_cache, false, "(Advanced) Always uses the IO data cache "
-    "for all reads, regardless of whether the read is local or remote. By default, the "
-    "IO data cache is only used if the data is expected to be remote. Used by tests.");
 
 #ifndef NDEBUG
 DECLARE_int32(stress_disk_read_delay_ms);
@@ -119,12 +116,11 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
     ScopedTimer<MonotonicStopWatch> req_context_read_timer(
         scan_range_->reader_->read_timer_);
 
-    // If it's a remote scan range, try reading from the remote data cache.
+    // Try reading from the remote data cache if it's enabled for the scan range.
     DataCache* remote_data_cache = io_mgr->remote_data_cache();
-    bool try_cache = (!expected_local_ || FLAGS_always_use_data_cache) &&
-        remote_data_cache != nullptr;
+    bool try_data_cache = scan_range_->UseDataCache() && remote_data_cache != nullptr;
     int64_t cached_read = 0;
-    if (try_cache) {
+    if (try_data_cache) {
       cached_read = ReadDataCache(remote_data_cache, file_offset, buffer, bytes_to_read);
       DCHECK_GE(cached_read, 0);
       *bytes_read = cached_read;
@@ -179,7 +175,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
     }
 
     int64_t cached_bytes_missed = *bytes_read - cached_read;
-    if (try_cache && status.ok() && cached_bytes_missed > 0) {
+    if (try_data_cache && status.ok() && cached_bytes_missed > 0) {
       DCHECK_LE(*bytes_read, bytes_to_read);
       WriteDataCache(remote_data_cache, file_offset, buffer, *bytes_read,
           cached_bytes_missed);
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 590b566..c7066e9 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -398,7 +398,7 @@ Status RequestContext::AddScanRanges(
     // Don't add empty ranges.
     DCHECK_NE(range->len(), 0);
     AddActiveScanRangeLocked(lock, range);
-    if (range->try_cache()) {
+    if (range->UseHdfsCache()) {
       cached_ranges_.Enqueue(range);
     } else {
       AddRangeToDisk(lock, range, (enqueue_location == EnqueueLocation::HEAD) ?
@@ -432,7 +432,7 @@ Status RequestContext::GetNextUnstartedRange(ScanRange** range, bool* needs_buff
     if (!cached_ranges_.empty()) {
       // We have a cached range.
       *range = cached_ranges_.Dequeue();
-      DCHECK((*range)->try_cache());
+      DCHECK((*range)->UseHdfsCache());
       bool cached_read_succeeded;
       RETURN_IF_ERROR(TryReadFromCache(lock, *range, &cached_read_succeeded,
           needs_buffers));
@@ -480,7 +480,7 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) {
   if (state_ == RequestContext::Cancelled) return CONTEXT_CANCELLED;
 
   DCHECK_NE(range->len(), 0);
-  if (range->try_cache()) {
+  if (range->UseHdfsCache()) {
     bool cached_read_succeeded;
     RETURN_IF_ERROR(TryReadFromCache(lock, range, &cached_read_succeeded,
         needs_buffers));
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 8f50be9..8567da3 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -166,32 +166,53 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
 /// Param struct for different combinations of buffering.
 struct BufferOpts {
  public:
-  // Set options for a read into an IoMgr-allocated or HDFS-cached buffer. Caching is
-  // enabled if 'try_cache' is true and the file is in the HDFS cache.
-  BufferOpts(bool try_cache)
-    : try_cache_(try_cache),
+
+  /// Different caching options available for a scan range.
+  ///
+  /// If USE_HDFS_CACHE is set, a read will first be probed against the HDFS cache for
+  /// any hits. If there is a miss, it will fall back to reading from the underlying
+  /// storage. Please note that HDFS cache are only used for local reads. Reads from
+  /// remote locations (e.g. another HDFS data node) will not be cached in the HDFS cache.
+  ///
+  /// If USE_DATA_CACHE is set, any read from the underlying storage will first be probed
+  /// against the data cache. If there is a cache miss in data cache, data will be
+  /// inserted into the data cache upon IO completion. The data cache is usually used for
+  /// caching non-local HDFS data (e.g. remote HDFS data or S3).
+  enum {
+    NO_CACHING  = 0,
+    USE_HDFS_CACHE = 1 << 0,
+    USE_DATA_CACHE = 1 << 2
+  };
+
+  /// Set options for a read into an IoMgr-allocated or HDFS-cached buffer.
+  /// 'cache_options' specifies the caching options used. Please see comments
+  /// of 'USE_HDFS_CACHE' and 'USE_DATA_CACHE' for details of the caching options.
+  BufferOpts(int cache_options)
+    : cache_options_(cache_options),
       client_buffer_(nullptr),
       client_buffer_len_(-1) {}
 
   /// Set options for an uncached read into an IoMgr-allocated buffer.
   static BufferOpts Uncached() {
-    return BufferOpts(false, nullptr, -1);
+    return BufferOpts(NO_CACHING, nullptr, -1);
   }
 
   /// Set options to read the entire scan range into 'client_buffer'. The length of the
-  /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS caching is not
-  /// enabled in this case.
-  static BufferOpts ReadInto(uint8_t* client_buffer, int64_t client_buffer_len) {
-    return BufferOpts(false, client_buffer, client_buffer_len);
+  /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS caching shouldn't
+  /// be enabled in this case.
+  static BufferOpts ReadInto(uint8_t* client_buffer, int64_t client_buffer_len,
+      int cache_options) {
+    DCHECK_EQ(cache_options & USE_HDFS_CACHE, 0);
+    return BufferOpts(cache_options, client_buffer, client_buffer_len);
   }
 
   /// Use only when you don't want to to read the entire scan range, but only sub-ranges
   /// in it. In this case you can copy the relevant parts from the HDFS cache into the
   /// client buffer. The length of the buffer, 'client_buffer_len' must fit the
   /// concatenation of all the sub-ranges.
-  static BufferOpts ReadInto(bool try_cache, uint8_t* client_buffer,
+  static BufferOpts ReadInto(int cache_options, uint8_t* client_buffer,
       int64_t client_buffer_len) {
-    return BufferOpts(try_cache, client_buffer, client_buffer_len);
+    return BufferOpts(cache_options, client_buffer, client_buffer_len);
   }
 
  private:
@@ -199,13 +220,14 @@ struct BufferOpts {
   friend class HdfsFileReader;
   FRIEND_TEST(DataCacheTest, TestBasics);
 
-  BufferOpts(bool try_cache, uint8_t* client_buffer, int64_t client_buffer_len)
-    : try_cache_(try_cache),
+  BufferOpts(int cache_options, uint8_t* client_buffer,
+      int64_t client_buffer_len)
+    : cache_options_(cache_options),
       client_buffer_(client_buffer),
       client_buffer_len_(client_buffer_len) {}
 
-  /// If true, read from HDFS cache if possible.
-  const bool try_cache_;
+  /// Specify options to enable HDFS and data caches.
+  const int cache_options_;
 
   /// A destination buffer provided by the client, nullptr and -1 if no buffer.
   uint8_t* const client_buffer_;
@@ -254,7 +276,9 @@ class ScanRange : public RequestRange {
       void* meta_data = nullptr);
 
   void* meta_data() const { return meta_data_; }
-  bool try_cache() const { return try_cache_; }
+  int cache_options() const { return cache_options_; }
+  bool UseHdfsCache() const { return (cache_options_ & BufferOpts::USE_HDFS_CACHE) != 0; }
+  bool UseDataCache() const { return (cache_options_ & BufferOpts::USE_DATA_CACHE) != 0; }
   bool read_in_flight() const { return read_in_flight_; }
   bool expected_local() const { return expected_local_; }
   bool is_erasure_coded() const { return is_erasure_coded_; }
@@ -436,10 +460,8 @@ class ScanRange : public RequestRange {
   /// and the caller can put whatever auxiliary data in here.
   void* meta_data_ = nullptr;
 
-  /// If true, this scan range is expected to be cached. Note that this might be wrong
-  /// since the block could have been uncached. In that case, the cached path
-  /// will fail and we'll just put the scan range on the normal read path.
-  bool try_cache_ = false;
+  /// Options for enabling HDFS caching and data cache.
+  int cache_options_;
 
   /// If true, we expect this scan range to be a local read. Note that if this is false,
   /// it does not necessarily mean we expect the read to be remote, and that we never
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index ec48097..c118c90 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -475,7 +475,8 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
   bytes_to_read_ = len;
   offset_ = offset;
   disk_id_ = disk_id;
-  try_cache_ = buffer_opts.try_cache_;
+  cache_options_ = buffer_opts.cache_options_;
+
   // HDFS ranges must have an mtime > 0. Local ranges do not use mtime.
   if (fs_) DCHECK_GT(mtime, 0);
   mtime_ = mtime;
@@ -593,7 +594,7 @@ int64_t ScanRange::MaxReadChunkSize() const {
 Status ScanRange::ReadFromCache(
     const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
   DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
-  DCHECK(try_cache_);
+  DCHECK(UseHdfsCache());
   DCHECK_EQ(bytes_read_, 0);
   *read_succeeded = false;
   Status status = file_reader_->Open(false);
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 7954a62..a27ae2e 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -451,7 +451,7 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
   handle->read_range_->Reset(nullptr, handle->write_range_->file(),
       handle->write_range_->len(), handle->write_range_->offset(),
       handle->write_range_->disk_id(), false, false, ScanRange::INVALID_MTIME,
-      BufferOpts::ReadInto(buffer.data(), buffer.len()));
+      BufferOpts::ReadInto(buffer.data(), buffer.len(), BufferOpts::NO_CACHING));
   read_counter_->Add(1);
   bytes_read_counter_->Add(buffer.len());
   bool needs_buffers;
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index e0cd7f3..37ad463 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -560,11 +560,11 @@ void Result::ProcessAssignments(const AssignmentCallback& cb) const {
           const TScanRange& scan_range = scan_range_params.scan_range;
           DCHECK(scan_range.__isset.hdfs_file_split);
           const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split;
-          bool is_cached =
-              scan_range_params.__isset.is_cached ? scan_range_params.is_cached : false;
+          bool try_hdfs_cache = scan_range_params.__isset.try_hdfs_cache ?
+              scan_range_params.try_hdfs_cache : false;
           bool is_remote =
               scan_range_params.__isset.is_remote ? scan_range_params.is_remote : false;
-          cb({addr, hdfs_file_split, is_cached, is_remote});
+          cb({addr, hdfs_file_split, try_hdfs_cache, is_remote});
         }
       }
     }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 7e15bc7..16c471c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -914,10 +914,10 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   // See if the read will be remote. This is not the case if the impalad runs on one of
   // the replica's datanodes.
   bool remote_read = true;
-  // For local reads we can set volume_id and is_cached. For remote reads HDFS will
+  // For local reads we can set volume_id and try_hdfs_cache. For remote reads HDFS will
   // decide which replica to use so we keep those at default values.
   int volume_id = -1;
-  bool is_cached = false;
+  bool try_hdfs_cache = false;
   for (const TScanRangeLocation& location : scan_range_locations.locations) {
     const TNetworkAddress& replica_host = host_list[location.host_idx];
     IpAddr replica_ip;
@@ -925,7 +925,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
         && executor_ip == replica_ip) {
       remote_read = false;
       volume_id = location.volume_id;
-      is_cached = location.is_cached;
+      try_hdfs_cache = location.is_cached;
       break;
     }
   }
@@ -934,7 +934,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
     assignment_byte_counters_.remote_bytes += scan_range_length;
   } else {
     assignment_byte_counters_.local_bytes += scan_range_length;
-    if (is_cached) assignment_byte_counters_.cached_bytes += scan_range_length;
+    if (try_hdfs_cache) assignment_byte_counters_.cached_bytes += scan_range_length;
   }
 
   if (total_assignments_ != nullptr) {
@@ -951,7 +951,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   TScanRangeParams scan_range_params;
   scan_range_params.scan_range = scan_range_locations.scan_range;
   scan_range_params.__set_volume_id(volume_id);
-  scan_range_params.__set_is_cached(is_cached);
+  scan_range_params.__set_try_hdfs_cache(try_hdfs_cache);
   scan_range_params.__set_is_remote(remote_read);
   scan_range_params_list->push_back(scan_range_params);
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d48d0ec..99b7b27 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -841,6 +841,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_max_statement_length_bytes(max_statement_length_bytes);
         break;
       }
+      case TImpalaQueryOptions::DISABLE_DATA_CACHE: {
+        query_options->__set_disable_data_cache(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index ff408dd..e388f78 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES + 1);\
+      TImpalaQueryOptions::DISABLE_DATA_CACHE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -178,7 +178,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(statement_expression_limit, STATEMENT_EXPRESSION_LIMIT,\
       TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(max_statement_length_bytes, MAX_STATEMENT_LENGTH_BYTES,\
-      TQueryOptionLevel::REGULAR)
+      TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(disable_data_cache, DISABLE_DATA_CACHE,\
+      TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index be79822..1d1cd21 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -379,6 +379,9 @@ struct TQueryOptions {
   // the cost of parsing and analyzing the statement, which is required to enforce the
   // statement expression limit.
   89: optional i32 max_statement_length_bytes = 16777216
+
+  // If true, skip using the data cache for this query session.
+  90: optional bool disable_data_cache = false;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
@@ -575,7 +578,7 @@ struct TPlanFragmentCtx {
 struct TScanRangeParams {
   1: required PlanNodes.TScanRange scan_range
   2: optional i32 volume_id = -1
-  3: optional bool is_cached = false
+  3: optional bool try_hdfs_cache = false
   4: optional bool is_remote
 }
 
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index d5c57aa..5ef5a4b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -429,6 +429,9 @@ enum TImpalaQueryOptions {
   // likely to violate the statement expression limit. Rejecting them early avoids
   // the cost of parsing/analysis.
   MAX_STATEMENT_LENGTH_BYTES = 88
+
+  // Disable the data cache.
+  DISABLE_DATA_CACHE = 89
 }
 
 // The summary of a DML statement.
diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py
index f9056c5..8814963 100644
--- a/tests/custom_cluster/test_data_cache.py
+++ b/tests/custom_cluster/test_data_cache.py
@@ -34,6 +34,12 @@ class TestDataCache(CustomClusterTestSuite):
   def get_workload(self):
     return 'functional-query'
 
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestDataCache, cls).setup_class()
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--always_use_data_cache=true --data_cache_write_concurrency=64"
@@ -70,3 +76,26 @@ class TestDataCache(CustomClusterTestSuite):
       # Do a second run. Expect some hits.
       self.execute_query(QUERY)
       assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--always_use_data_cache=true --data_cache_write_concurrency=64"
+      " --cache_force_single_shard=true",
+      start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
+  def test_data_cache_disablement(self, vector):
+    # Verifies that the cache metrics are all zero.
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') == 0
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') == 0
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') == 0
+
+    # Runs a query with the cache disabled and then enabled against multiple file formats.
+    # Verifies that the metrics stay at zero when the cache is disabled.
+    for disable_cache in [True, False]:
+      vector.get_value('exec_option')['disable_data_cache'] = int(disable_cache)
+      for file_format in ['text_gzip', 'parquet', 'avro', 'seq', 'rc']:
+        QUERY = "select * from functional_{0}.alltypes".format(file_format)
+        self.execute_query(QUERY, vector.get_value('exec_option'))
+        assert disable_cache ==\
+            (self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') == 0)
+        assert disable_cache ==\
+            (self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') == 0)