You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/10/18 21:32:42 UTC

[3/4] impala git commit: IMPALA-7543: Enhance scan ranges to support sub-ranges

IMPALA-7543: Enhance scan ranges to support sub-ranges

This commit enhances the ScanRange class to make it possible to only
read some smaller parts of the whole ScanRange. This functionality
is needed by IMPALA-5843.

A sub-range is an offset and length which is located within the scan
range. Sub-ranges can be added to a scan range when calling
ScanRange::Reset(). If done so, the ScanRange class will only read the
parts defined by the sub-ranges.

If we have sub-ranges for a cache read then the ScanRange won't
enqueue the whole cache buffer (which contains the whole ScanRange),
but memcpy() the sub-ranges to IO/client buffers.

Smaller refactorings needed to do:
 * remove scan_range_offset_ from BufferDescriptor
 * number of bytes read are bookkeeped by ScanRange again

Testing:
 * introduced CacheReaderTestStub to fake cache reads during testing
 * extended disk-io-mgr-test.cc with sub-ranges

Change-Id: Iea26ba386713990f7671aab5a372cf449b8d51e4
Reviewed-on: http://gerrit.cloudera.org:8080/11520
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/48fb4902
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/48fb4902
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/48fb4902

Branch: refs/heads/master
Commit: 48fb4902d4f28c9e4d327b80b00b33962c118c22
Parents: 2fb8eba
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Wed Sep 26 13:58:36 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 20:33:36 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/io/CMakeLists.txt           |   1 -
 be/src/runtime/io/cache-reader-test-stub.h |  62 +++++++++
 be/src/runtime/io/disk-io-mgr-stress.cc    |   3 +-
 be/src/runtime/io/disk-io-mgr-test.cc      | 177 +++++++++++++++++++----
 be/src/runtime/io/disk-io-mgr.cc           |   2 +-
 be/src/runtime/io/file-reader.cc           |  31 -----
 be/src/runtime/io/file-reader.h            |  23 ++-
 be/src/runtime/io/hdfs-file-reader.cc      |  32 ++---
 be/src/runtime/io/hdfs-file-reader.h       |   4 +-
 be/src/runtime/io/local-file-reader.cc     |  15 +-
 be/src/runtime/io/local-file-reader.h      |   5 +-
 be/src/runtime/io/request-context.cc       |  41 +++++-
 be/src/runtime/io/request-context.h        |   8 ++
 be/src/runtime/io/request-ranges.h         |  87 ++++++++++--
 be/src/runtime/io/scan-range.cc            | 178 +++++++++++++++++++-----
 15 files changed, 511 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index bb26ef0..675bd6e 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -29,7 +29,6 @@ add_library(Io
   error-converter.cc
   request-context.cc
   scan-range.cc
-  file-reader.cc
   hdfs-file-reader.cc
   local-file-reader.cc
 )

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/cache-reader-test-stub.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/cache-reader-test-stub.h b/be/src/runtime/io/cache-reader-test-stub.h
new file mode 100644
index 0000000..b53b30b
--- /dev/null
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/io/file-reader.h"
+#include "runtime/io/request-ranges.h"
+
+namespace impala {
+namespace io {
+
+/// Only for testing the code path when reading from the cache is successful.
+/// Takes a pointer to a buffer in its constructor, also the length of this buffer.
+/// CachedFile() simply returns the pointer and length.
+/// Invoking ReadFromPos() on it results in an error.
+class CacheReaderTestStub : public FileReader {
+public:
+  CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length) :
+    FileReader(scan_range),
+    cache_(cache),
+    length_(length) {
+  }
+
+  ~CacheReaderTestStub() {}
+
+  virtual Status Open(bool use_file_handle_cache) override {
+    return Status::OK();
+  }
+
+  virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
+    DCHECK(false);
+    return Status("Not implemented");
+  }
+
+  virtual void CachedFile(uint8_t** data, int64_t* length) override {
+    *length = length_;
+    *data = cache_;
+  }
+
+  virtual void Close() override {}
+private:
+  uint8_t* cache_ = nullptr;
+  int64_t length_ = 0;
+};
+
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 16cfbf2..4335aa4 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -119,6 +119,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
     while (!eos) {
       ScanRange* range;
+      int64_t scan_range_offset = 0;
       bool needs_buffers;
       Status status = client->reader->GetNextUnstartedRange(&range, &needs_buffers);
       CHECK(status.ok() || status.IsCancelled());
@@ -135,7 +136,6 @@ void DiskIoMgrStress::ClientThread(int client_id) {
         CHECK(status.ok() || status.IsCancelled());
         if (buffer == NULL) break;
 
-        int64_t scan_range_offset = buffer->scan_range_offset();
         int len = buffer->len();
         CHECK_GE(scan_range_offset, 0);
         CHECK_LT(scan_range_offset, expected.size());
@@ -154,6 +154,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
         range->ReturnBuffer(move(buffer));
         bytes_read += len;
+        scan_range_offset += len;
 
         CHECK_GE(bytes_read, 0);
         CHECK_LE(bytes_read, expected.size());

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index a6689a8..7a9bc23 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -24,6 +24,7 @@
 #include "common/init.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/io/cache-reader-test-stub.h"
 #include "runtime/io/local-file-system-with-fault-injection.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -179,7 +180,7 @@ class DiskIoMgrTest : public testing::Test {
     }
     ASSERT_OK(range->GetNext(&buffer));
     ASSERT_TRUE(buffer != nullptr);
-    EXPECT_EQ(buffer->len(), range->len());
+    EXPECT_EQ(buffer->len(), range->bytes_to_read());
     if (expected_len < 0) expected_len = strlen(expected);
     int cmp = memcmp(buffer->buffer(), expected, expected_len);
     EXPECT_TRUE(cmp == 0);
@@ -190,6 +191,7 @@ class DiskIoMgrTest : public testing::Test {
       const char* expected, int expected_len, const Status& expected_status) {
     char result[expected_len + 1];
     memset(result, 0, expected_len + 1);
+    int64_t scan_range_offset = 0;
 
     while (true) {
       unique_ptr<BufferDescriptor> buffer;
@@ -200,8 +202,9 @@ class DiskIoMgrTest : public testing::Test {
         break;
       }
       ASSERT_LE(buffer->len(), expected_len);
-      memcpy(result + range->offset() + buffer->scan_range_offset(),
+      memcpy(result + range->offset() + scan_range_offset,
           buffer->buffer(), buffer->len());
+      scan_range_offset += buffer->len();
       range->ReturnBuffer(move(buffer));
     }
     ValidateEmptyOrCorrect(expected, result, expected_len);
@@ -229,11 +232,16 @@ class DiskIoMgrTest : public testing::Test {
     }
   }
 
+  static void SetReaderStub(ScanRange* scan_range, unique_ptr<FileReader> reader_stub) {
+    scan_range->SetFileReader(move(reader_stub));
+  }
+
   ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len,
-      int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
+      int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false,
+      std::vector<ScanRange::SubRange> sub_ranges = {}) {
     ScanRange* range = pool->Add(new ScanRange);
     range->Reset(nullptr, file_path, len, offset, disk_id, true,
-        BufferOpts(is_cached, mtime), meta_data);
+        BufferOpts(is_cached, mtime), move(sub_ranges), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
@@ -252,6 +260,12 @@ class DiskIoMgrTest : public testing::Test {
       const string& tmp_file, int offset, RequestContext* writer,
       const string& expected_output);
 
+  void SingleReaderTestBody(const char* data, const char* expected_result,
+      vector<ScanRange::SubRange> sub_ranges = {});
+
+  void CachedReadsTestBody(const char* data, const char* expected,
+      bool fake_cache, vector<ScanRange::SubRange> sub_ranges = {});
+
   /// Convenience function to get a reference to the buffer pool.
   BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); }
 
@@ -522,13 +536,11 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   buffer_pool()->DeregisterClient(&read_client);
 }
 
-// Basic test with a single reader, testing multiple threads, disks and a different
-// number of buffers.
-TEST_F(DiskIoMgrTest, SingleReader) {
-  InitRootReservation(LARGE_RESERVATION_LIMIT);
+void DiskIoMgrTest::SingleReaderTestBody(const char* data, const char* expected_result,
+    vector<ScanRange::SubRange> sub_ranges) {
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
-  const char* data = "abcdefghijklm";
-  int len = strlen(data);
+  int data_len = strlen(data);
+  int expected_result_len = strlen(expected_result);
   CreateTempFile(tmp_file, data);
 
   // Get mtime for file
@@ -554,9 +566,10 @@ TEST_F(DiskIoMgrTest, SingleReader) {
         unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
         vector<ScanRange*> ranges;
-        for (int i = 0; i < len; ++i) {
+        for (int i = 0; i < data_len; ++i) {
           int disk_id = i % num_disks;
-          ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
+          ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, data_len, disk_id,
+              stat_val.st_mtime, nullptr, false, sub_ranges));
         }
         ASSERT_OK(reader->AddScanRanges(ranges));
 
@@ -564,7 +577,8 @@ TEST_F(DiskIoMgrTest, SingleReader) {
         thread_group threads;
         for (int i = 0; i < num_read_threads; ++i) {
           threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
-              &read_client, data, len, Status::OK(), 0, &num_ranges_processed));
+              &read_client, expected_result, expected_result_len, Status::OK(), 0,
+              &num_ranges_processed));
         }
         threads.join_all();
 
@@ -578,6 +592,23 @@ TEST_F(DiskIoMgrTest, SingleReader) {
   EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
+// Basic test with a single reader, testing multiple threads, disks and a different
+// number of buffers.
+TEST_F(DiskIoMgrTest, SingleReader) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  SingleReaderTestBody(data, data);
+}
+
+TEST_F(DiskIoMgrTest, SingleReaderSubRanges) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  int64_t data_len = strlen(data);
+  SingleReaderTestBody(data, data, {{0, data_len}});
+  SingleReaderTestBody(data, "abdef", {{0, 2}, {3, 3}});
+  SingleReaderTestBody(data, "bceflm", {{1, 2}, {4, 2}, {11, 2}});
+}
+
 // This test issues adding additional scan ranges while there are some still in flight.
 TEST_F(DiskIoMgrTest, AddScanRangeTest) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
@@ -872,14 +903,10 @@ TEST_F(DiskIoMgrTest, MemScarcity) {
   }
 }
 
-// Test when some scan ranges are marked as being cached.
-// Since these files are not in HDFS, the cached path always fails so this
-// only tests the fallback mechanism.
-// TODO: we can fake the cached read path without HDFS
-TEST_F(DiskIoMgrTest, CachedReads) {
-  InitRootReservation(LARGE_RESERVATION_LIMIT);
+void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected,
+    bool fake_cache, vector<ScanRange::SubRange> sub_ranges) {
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
-  const char* data = "abcdefghijklm";
+  uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
   int len = strlen(data);
   CreateTempFile(tmp_file, data);
 
@@ -898,17 +925,26 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
     ScanRange* complete_range =
-        InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
+        InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true,
+            sub_ranges);
+    if (fake_cache) {
+      SetReaderStub(complete_range, make_unique<CacheReaderTestStub>(
+          complete_range, cached_data, len));
+    }
 
     // Issue some reads before the async ones are issued
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
 
     vector<ScanRange*> ranges;
     for (int i = 0; i < len; ++i) {
       int disk_id = i % num_disks;
-      ranges.push_back(
-          InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
+      ScanRange* range = InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime,
+          nullptr, true, sub_ranges);
+      ranges.push_back(range);
+      if (fake_cache) {
+        SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cached_data, len));
+      }
     }
     ASSERT_OK(reader->AddScanRanges(ranges));
 
@@ -916,19 +952,19 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     thread_group threads;
     for (int i = 0; i < 5; ++i) {
       threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client,
-          data, strlen(data), Status::OK(), 0, &num_ranges_processed));
+          expected, strlen(expected), Status::OK(), 0, &num_ranges_processed));
     }
 
     // Issue some more sync ranges
     for (int i = 0; i < 5; ++i) {
       sched_yield();
-      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
     }
 
     threads.join_all();
 
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
-    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
 
     EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
     io_mgr.UnregisterContext(reader.get());
@@ -938,6 +974,32 @@ TEST_F(DiskIoMgrTest, CachedReads) {
   EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
+// Test when some scan ranges are marked as being cached.
+TEST_F(DiskIoMgrTest, CachedReads) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  // Don't fake the cache, i.e. test the fallback mechanism
+  CachedReadsTestBody(data, data, false);
+  // Fake the test with a file reader stub.
+  CachedReadsTestBody(data, data, true);
+}
+
+// Test when some scan ranges are marked as being cached and there
+// are sub-ranges as well.
+TEST_F(DiskIoMgrTest, CachedReadsSubRanges) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* data = "abcdefghijklm";
+  int64_t data_len = strlen(data);
+
+  // first iteration tests the fallback mechanism with sub-ranges
+  // second iteration fakes a cache
+  for (bool fake_cache : {false, true}) {
+    CachedReadsTestBody(data, data, fake_cache, {{0, data_len}});
+    CachedReadsTestBody(data, "bc", fake_cache, {{1, 2}});
+    CachedReadsTestBody(data, "abchilm", fake_cache, {{0, 3}, {7, 2}, {11, 2}});
+  }
+}
+
 TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   const int ITERATIONS = 1;
@@ -1401,6 +1463,63 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
+// Test reading into a client-allocated buffer using sub-ranges.
+TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+  const char* data = "the quick brown fox jumped over the lazy dog";
+  uint8_t* cache = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
+  int data_len = strlen(data);
+  int read_len = 4; // Make buffer size smaller than client-provided buffer.
+  CreateTempFile(tmp_file, data);
+
+  // Get mtime for file
+  struct stat stat_val;
+  stat(tmp_file, &stat_val);
+
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
+  ASSERT_OK(io_mgr->Init());
+  // Reader doesn't need to provide client if it's providing buffers.
+  unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
+
+  auto test_case = [&](bool fake_cache, const char* expected_result,
+      vector<ScanRange::SubRange> sub_ranges) {
+    int result_len = strlen(expected_result);
+    vector<uint8_t> client_buffer(result_len);
+    ScanRange* range = pool_.Add(new ScanRange);
+    range->Reset(nullptr, tmp_file, data_len, 0, 0, true,
+        BufferOpts::ReadInto(fake_cache, stat_val.st_mtime, client_buffer.data(),
+            result_len), move(sub_ranges));
+    if (fake_cache) {
+      SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
+    }
+    bool needs_buffers;
+    ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
+    ASSERT_FALSE(needs_buffers);
+
+    unique_ptr<BufferDescriptor> io_buffer;
+    ASSERT_OK(range->GetNext(&io_buffer));
+    ASSERT_TRUE(io_buffer->eosr());
+    ASSERT_EQ(result_len, io_buffer->len());
+    ASSERT_EQ(client_buffer.data(), io_buffer->buffer());
+    ASSERT_EQ(memcmp(io_buffer->buffer(), expected_result, result_len), 0);
+
+    // DiskIoMgr should not have allocated memory.
+    EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
+    range->ReturnBuffer(move(io_buffer));
+  };
+
+  for (bool fake_cache : {false, true}) {
+    test_case(fake_cache, data, {{0, data_len}});
+    test_case(fake_cache, data, {{0, 15}, {15, data_len - 15}});
+    test_case(fake_cache, "quick fox", {{4, 5}, {15, 4}});
+    test_case(fake_cache, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4, 4}});
+  }
+
+  io_mgr->UnregisterContext(reader.get());
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
+}
+
 // Test reading into a client-allocated buffer where the read fails.
 TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 75136ec..fa871d1 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -302,7 +302,7 @@ Status DiskIoMgr::AllocateBuffersForRange(
   BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
   Status status;
   vector<unique_ptr<BufferDescriptor>> buffers;
-  for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
+  for (int64_t buffer_size : ChooseBufferSizes(range->bytes_to_read(), max_bytes)) {
     BufferPool::BufferHandle handle;
     status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
     if (!status.ok()) goto error;

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/file-reader.cc b/be/src/runtime/io/file-reader.cc
deleted file mode 100644
index 81ef090..0000000
--- a/be/src/runtime/io/file-reader.cc
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "gutil/strings/substitute.h"
-#include "runtime/io/file-reader.h"
-
-#include "common/names.h"
-
-namespace impala {
-namespace io {
-
-string FileReader::DebugString() const {
-  return Substitute("bytes_read=$0", bytes_read_);
-}
-
-}
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h
index 40f2a5c..9dbcc31 100644
--- a/be/src/runtime/io/file-reader.h
+++ b/be/src/runtime/io/file-reader.h
@@ -40,33 +40,31 @@ public:
   FileReader(ScanRange* scan_range) : scan_range_(scan_range) {}
   virtual ~FileReader() {}
 
-  /// Returns number of bytes read by this file reader.
-  int bytes_read() const { return bytes_read_; }
-
   /// Opens file that is associated with 'scan_range_'.
   /// 'use_file_handle_cache' currently only used by HdfsFileReader.
   virtual Status Open(bool use_file_handle_cache) = 0;
 
   /// Reads bytes from given position ('file_offset'). Tries to read
   /// 'bytes_to_read' amount of bytes. 'bytes_read' contains the number of
-  /// bytes actually read. 'eosr' is set to true when end of file has reached,
-  /// or the file reader has read all the bytes needed by 'scan_range_'.
+  /// bytes actually read. 'eof' is set to true when end of file has reached.
   virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) = 0;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0;
 
   /// ***Currently only for HDFS***
-  /// Returns a pointer to a cached buffer that contains the contents of the file.
-  virtual void* CachedFile() = 0;
+  /// When successful, sets 'data' to a buffer that contains the contents of a file,
+  /// and 'length' is set to the length of the data.
+  /// When unsuccessful, 'data' is set to nullptr.
+  virtual void CachedFile(uint8_t** data, int64_t* length) = 0;
 
   /// Closes the file associated with 'scan_range_'. It doesn't have effect on other
   /// scan ranges.
   virtual void Close() = 0;
 
-  /// Reset internal bookkeeping, e.g. how many bytes have been read.
-  virtual void ResetState() { bytes_read_ = 0; }
+  /// Resets internal bookkeeping
+  virtual void ResetState() {}
 
   // Debug string of this file reader.
-  virtual std::string DebugString() const;
+  virtual std::string DebugString() const { return ""; }
 
   SpinLock& lock() { return lock_; }
 protected:
@@ -81,9 +79,6 @@ protected:
 
   /// The scan range this file reader serves.
   ScanRange* const scan_range_;
-
-  /// Number of bytes read by this reader.
-  int bytes_read_ = 0;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index ea61b6a..c6a6cec 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -40,6 +40,7 @@ namespace io {
 
 HdfsFileReader::~HdfsFileReader() {
   DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
+  DCHECK(cached_buffer_ == nullptr) << "Cached buffer was not released.";
 }
 
 Status HdfsFileReader::Open(bool use_file_handle_cache) {
@@ -73,7 +74,7 @@ Status HdfsFileReader::Open(bool use_file_handle_cache) {
 }
 
 Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
-    int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) {
+    int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -87,7 +88,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
 
   auto io_mgr = scan_range_->io_mgr_;
   auto request_context = scan_range_->reader_;
-  *eosr = false;
+  *eof = false;
   *bytes_read = 0;
 
   CachedHdfsFileHandle* borrowed_hdfs_fh = nullptr;
@@ -151,7 +152,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
       DCHECK_GT(current_bytes_read, -1);
       if (current_bytes_read == 0) {
         // No more bytes in the file. The scan range went past the end.
-        *eosr = true;
+        *eof = true;
         break;
       }
       *bytes_read += current_bytes_read;
@@ -164,12 +165,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
   if (borrowed_hdfs_fh != nullptr) {
     io_mgr->ReleaseCachedHdfsFileHandle(scan_range_->file_string(), borrowed_hdfs_fh);
   }
-  if (!status.ok())
-    return status;
-  bytes_read_ += *bytes_read;
-  DCHECK_LE(bytes_read_, scan_range_->len());
-  if (bytes_read_ == scan_range_->len()) *eosr = true;
-  return Status::OK();
+  return status;
 }
 
 Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
@@ -203,16 +199,22 @@ Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_
   return Status::OK();
 }
 
-void* HdfsFileReader::CachedFile() {
+void HdfsFileReader::CachedFile(uint8_t** data, int64_t* length) {
   {
     unique_lock<SpinLock> hdfs_lock(lock_);
+    DCHECK(cached_buffer_ == nullptr);
     DCHECK(exclusive_hdfs_fh_ != nullptr);
     cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(),
         scan_range_->io_mgr_->cached_read_options(), scan_range_->len());
   }
-  if (cached_buffer_ == nullptr) return nullptr;
-  bytes_read_ = hadoopRzBufferLength(cached_buffer_);
-  return const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
+  if (cached_buffer_ == nullptr) {
+    *data = nullptr;
+    *length = 0;
+    return;
+  }
+  *data = reinterpret_cast<uint8_t*>(
+      const_cast<void*>(hadoopRzBufferGet(cached_buffer_)));
+  *length = hadoopRzBufferLength(cached_buffer_);
 }
 
 void HdfsFileReader::Close() {
@@ -220,11 +222,9 @@ void HdfsFileReader::Close() {
   if (exclusive_hdfs_fh_ != nullptr) {
     GetHdfsStatistics(exclusive_hdfs_fh_->file());
 
-    if (scan_range_->external_buffer_tag_ ==
-        ScanRange::ExternalBufferTag::CACHED_BUFFER) {
+    if (cached_buffer_ != nullptr) {
       hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
       cached_buffer_ = nullptr;
-      scan_range_->external_buffer_tag_ = ScanRange::ExternalBufferTag::NO_BUFFER;
     }
 
     // Destroy the file handle.

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index 3b3642b..ba95095 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -34,11 +34,11 @@ public:
 
   virtual Status Open(bool use_file_handle_cache) override;
   virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   /// Reads from the DN cache. On success, sets cached_buffer_ to the DN
   /// buffer and returns a pointer to the underlying raw buffer.
   /// Returns nullptr if the data is not cached.
-  virtual void* CachedFile() override;
+  virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
   virtual void ResetState() override;
   virtual std::string DebugString() const override;

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc
index 47c025b..3f88106 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -49,7 +49,7 @@ Status LocalFileReader::Open(bool use_file_handle_cache) {
 }
 
 Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
-    int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) {
+    int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -61,7 +61,7 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
   unique_lock<SpinLock> fs_lock(lock_);
   RETURN_IF_ERROR(scan_range_->cancel_status_);
 
-  *eosr = false;
+  *eof = false;
   *bytes_read = 0;
 
   DCHECK(file_ != nullptr);
@@ -85,17 +85,14 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
     }
     // On Linux, we should only get partial reads from block devices on error or eof.
     DCHECK(feof(file_) != 0);
-    *eosr = true;
+    *eof = true;
   }
-  bytes_read_ += *bytes_read;
-  DCHECK_LE(bytes_read_, scan_range_->len());
-  if (bytes_read_ == scan_range_->len()) *eosr = true;
   return Status::OK();
 }
 
-void* LocalFileReader::CachedFile() {
-  DCHECK(false);
-  return nullptr;
+void LocalFileReader::CachedFile(uint8_t** data, int64_t* length) {
+  *data = nullptr;
+  *length = 0;
 }
 
 void LocalFileReader::Close() {

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-reader.h b/be/src/runtime/io/local-file-reader.h
index d8eedd1..14cff3c 100644
--- a/be/src/runtime/io/local-file-reader.h
+++ b/be/src/runtime/io/local-file-reader.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include "common/hdfs.h"
 #include "runtime/io/file-reader.h"
 
 namespace impala {
@@ -32,9 +31,9 @@ public:
 
   virtual Status Open(bool use_file_handle_cache) override;
   virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   /// We don't cache files of the local file system.
-  virtual void* CachedFile() override;
+  virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
 private:
   /// Points to a C FILE object between calls to Open() and Close(), otherwise nullptr.

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 56f6704..24213f9 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -425,7 +425,8 @@ Status RequestContext::GetNextUnstartedRange(ScanRange** range, bool* needs_buff
       *range = cached_ranges_.Dequeue();
       DCHECK((*range)->try_cache());
       bool cached_read_succeeded;
-      RETURN_IF_ERROR((*range)->ReadFromCache(lock, &cached_read_succeeded));
+      RETURN_IF_ERROR(TryReadFromCache(lock, *range, &cached_read_succeeded,
+          needs_buffers));
       if (cached_read_succeeded) return Status::OK();
 
       // This range ended up not being cached. Loop again and pick up a new range.
@@ -472,12 +473,9 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) {
   DCHECK_NE(range->len(), 0);
   if (range->try_cache()) {
     bool cached_read_succeeded;
-    RETURN_IF_ERROR(range->ReadFromCache(lock, &cached_read_succeeded));
-    if (cached_read_succeeded) {
-      DCHECK(Validate()) << endl << DebugString();
-      *needs_buffers = false;
-      return Status::OK();
-    }
+    RETURN_IF_ERROR(TryReadFromCache(lock, range, &cached_read_succeeded,
+        needs_buffers));
+    if (cached_read_succeeded) return Status::OK();
     // Cached read failed, fall back to normal read path.
   }
   // If we don't have a buffer yet, the caller must allocate buffers for the range.
@@ -491,6 +489,35 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) {
   return Status::OK();
 }
 
+Status RequestContext::TryReadFromCache(const unique_lock<mutex>& lock,
+    ScanRange* range, bool* read_succeeded, bool* needs_buffers) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  RETURN_IF_ERROR(range->ReadFromCache(lock, read_succeeded));
+  if (!*read_succeeded) return Status::OK();
+
+  DCHECK(Validate()) << endl << DebugString();
+  ScanRange::ExternalBufferTag buffer_tag = range->external_buffer_tag();
+  // The following cases are possible at this point:
+  // * The scan range doesn't have sub-ranges:
+  // ** buffer_tag is CACHED_BUFFER and the buffer is already available to the reader.
+  //    (there is nothing to do)
+  //
+  // * The scan range has sub-ranges, and buffer_tag is:
+  // ** NO_BUFFER: the client needs to add buffers to the scan range
+  // ** CLIENT_BUFFER: the client already provided a buffer to copy data into it
+  *needs_buffers = buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER;
+  if (*needs_buffers) {
+    DCHECK(range->HasSubRanges());
+    range->SetBlockedOnBuffer();
+    // The range will be scheduled when buffers are added to it.
+    AddRangeToDisk(lock, range, ScheduleMode::BY_CALLER);
+  } else if (buffer_tag == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+    DCHECK(range->HasSubRanges());
+    AddRangeToDisk(lock, range, ScheduleMode::IMMEDIATELY);
+  }
+  return Status::OK();
+}
+
 Status RequestContext::AddWriteRange(WriteRange* write_range) {
   unique_lock<mutex> lock(lock_);
   if (state_ == RequestContext::Cancelled) return CONTEXT_CANCELLED;

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 0fcbca3..24b1bb5 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -281,6 +281,14 @@ class RequestContext {
   void RemoveActiveScanRangeLocked(
       const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
 
+  /// Try to read the scan range from the cache. '*read_succeeded' is set to true if the
+  /// scan range can be found in the cache, otherwise false.
+  /// If '*needs_buffers' is returned as true, the caller must call
+  /// AllocateBuffersForRange() to add buffers for the data to be read into before the
+  /// range can be scheduled.
+  Status TryReadFromCache(const boost::unique_lock<boost::mutex>& lock, ScanRange* range,
+      bool* read_succeeded, bool* needs_buffers);
+
   // Counters are updated by other classes - expose to other io:: classes for convenience.
 
   /// Total bytes read for this reader

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 3bf55d6..1dc451a 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -60,9 +60,6 @@ class BufferDescriptor {
   int64_t len() { return len_; }
   bool eosr() { return eosr_; }
 
-  /// Returns the offset within the scan range that this buffer starts at
-  int64_t scan_range_offset() const { return scan_range_offset_; }
-
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferDescriptor);
   /// This class is tightly coupled with ScanRange. Making them friends is easiest.
@@ -101,8 +98,6 @@ class BufferDescriptor {
   /// true if the current scan range is complete
   bool eosr_ = false;
 
-  int64_t scan_range_offset_ = 0;
-
   // Handle to an allocated buffer and the client used to allocate it buffer. Only used
   // for non-external buffers.
   BufferPool::ClientHandle* bp_client_ = nullptr;
@@ -191,6 +186,15 @@ struct BufferOpts {
     return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len);
   }
 
+  /// Use only when you don't want to to read the entire scan range, but only sub-ranges
+  /// in it. In this case you can copy the relevant parts from the HDFS cache into the
+  /// client buffer. The length of the buffer, 'client_buffer_len' must fit the
+  /// concatenation of all the sub-ranges.
+  static BufferOpts ReadInto(bool try_cache, int64_t mtime, uint8_t* client_buffer,
+      int64_t client_buffer_len) {
+    return BufferOpts(try_cache, mtime, client_buffer, client_buffer_len);
+  }
+
  private:
   friend class ScanRange;
   friend class HdfsFileReader;
@@ -227,6 +231,12 @@ class ScanRange : public RequestRange {
 
   virtual ~ScanRange();
 
+  /// Defines an internal range within this ScanRange.
+  struct SubRange {
+    int64_t offset;
+    int64_t length;
+  };
+
   /// Resets this scan range object with the scan range description. The scan range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
   /// local filesystem). The scan range must be non-empty and fall within the file bounds
@@ -238,10 +248,17 @@ class ScanRange : public RequestRange {
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
       bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr);
 
+  /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
+  /// in advance, as this method will do the merge.
+  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
+      bool expected_local, const BufferOpts& buffer_opts,
+      std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr);
+
   void* meta_data() const { return meta_data_; }
   bool try_cache() const { return try_cache_; }
   bool read_in_flight() const { return read_in_flight_; }
   bool expected_local() const { return expected_local_; }
+  int64_t bytes_to_read() const { return bytes_to_read_; }
 
   /// Returns the next buffer for this scan range. buffer is an output parameter.
   /// This function blocks until a buffer is ready or an error occurred. If this is
@@ -269,7 +286,7 @@ class ScanRange : public RequestRange {
 
   int64_t mtime() const { return mtime_; }
 
-  int BytesRead() const;
+  bool HasSubRanges() const { return !sub_ranges_.empty(); }
 
  private:
   DISALLOW_COPY_AND_ASSIGN(ScanRange);
@@ -279,6 +296,7 @@ class ScanRange : public RequestRange {
   friend class BufferDescriptor;
   friend class DiskQueue;
   friend class DiskIoMgr;
+  friend class DiskIoMgrTest;
   friend class RequestContext;
   friend class HdfsFileReader;
   friend class LocalFileReader;
@@ -344,6 +362,9 @@ class ScanRange : public RequestRange {
   /// while any thread is inside a critical section.
   Status cancel_status_;
 
+  /// Only for testing
+  void SetFileReader(std::unique_ptr<FileReader> file_reader);
+
   /// END: private members that are accessed by other io:: classes
   /////////////////////////////////////////
 
@@ -383,10 +404,28 @@ class ScanRange : public RequestRange {
     return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty());
   }
 
+  /// Adds sub-ranges to this ScanRange. If sub_ranges is not empty, then ScanRange won't
+  /// read everything from its range, but will only read these sub-ranges.
+  /// Sub-ranges need to be ordered by 'offset' and cannot overlap with each other.
+  /// Doesn't need to merge continuous sub-ranges in advance, this method will do.
+  void InitSubRanges(std::vector<SubRange>&& sub_ranges);
+
+  /// Read the sub-ranges into buffer and track the current position in 'sub_range_pos_'.
+  /// If cached data is available, then memcpy() from it instead of actually reading the
+  /// files.
+  Status ReadSubRanges(BufferDescriptor* buffer, bool* eof);
+
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this.
   bool Validate();
 
+  /// Validates the sub-ranges. All sub-range must be inside of this ScanRange.
+  /// They need to be ordered by offset and cannot overlap.
+  bool ValidateSubRanges();
+
+  /// Merges adjacent and continuous sub-ranges.
+  void MergeSubRanges();
+
   /// Pointer to caller specified metadata. This is untouched by the io manager
   /// and the caller can put whatever auxiliary data in here.
   void* meta_data_ = nullptr;
@@ -418,18 +457,26 @@ class ScanRange : public RequestRange {
   /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER.
   struct {
     /// Client-provided buffer to read the whole scan range into.
-    uint8_t* data;
+    uint8_t* data = nullptr;
 
     /// Length of the client-provided buffer.
-    int64_t len;
+    int64_t len = 0;
   } client_buffer_;
 
+  /// Valid if reading file contents from cache was successful.
+  struct {
+    /// Pointer to the contents of the file.
+    uint8_t* data = nullptr;
+    /// Length of the contents.
+    int64_t len = 0;
+  } cache_;
+
   /// The number of buffers that have been returned to a client via GetNext() that have
   /// not yet been returned with ReturnBuffer().
   AtomicInt32 num_buffers_in_reader_{0};
 
   /// Lock protecting fields below.
-  /// This lock should not be taken during Open()/Read()/Close().
+  /// This lock should not be taken during FileReader::Open()/Read()/Close().
   /// If RequestContext::lock_ and this lock need to be held simultaneously,
   /// RequestContext::lock_ must be taken first.
   boost::mutex lock_;
@@ -475,8 +522,30 @@ class ScanRange : public RequestRange {
   /// cancelled.
   ConditionVariable buffer_ready_cv_;
 
+  /// Number of bytes read by this scan range.
+  int64_t bytes_read_ = 0;
+
   /// Polymorphic object that is responsible for doing file operations.
   std::unique_ptr<FileReader> file_reader_;
+
+  /// If not empty, the ScanRange will only read these parts from the file.
+  std::vector<SubRange> sub_ranges_;
+
+  // Read position in the sub-ranges.
+  struct SubRangePosition {
+    /// Index of SubRange in 'ScanRange::sub_ranges_' to read next
+    int64_t index = 0;
+    /// Bytes already read from 'ScanRange::sub_ranges_[sub_range_index]'
+    int64_t bytes_read = 0;
+  };
+
+  /// Current read position in the sub-ranges.
+  SubRangePosition sub_range_pos_;
+
+  /// Number of bytes need to be read by this ScanRange. If there are no sub-ranges it
+  /// equals to 'len_'. If there are sub-ranges then it equals to the sum of the lengths
+  /// of the sub-ranges (which is less than or equal to 'len_').
+  int64_t bytes_to_read_ = 0;
 };
 
 /// Used to specify data to be written to a file and offset.

http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index aaef6b4..660710e 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -163,13 +163,8 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
   return result;
 }
 
-int ScanRange::BytesRead() const {
-  DCHECK(file_reader_ != nullptr);
-  return file_reader_->bytes_read();
-}
-
 ReadOutcome ScanRange::DoRead(int disk_id) {
-  int64_t bytes_remaining = len_ - BytesRead();
+  int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
 
   unique_ptr<BufferDescriptor> buffer_desc;
@@ -183,8 +178,8 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
           this, client_buffer_.data, client_buffer_.len));
     } else {
       DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
-          << "This code path does not handle other buffer types, i.e. HDFS cache"
-          << static_cast<int>(external_buffer_tag_);
+          << "This code path does not handle other buffer types, i.e. HDFS cache. "
+          << "external_buffer_tag_=" << static_cast<int>(external_buffer_tag_);
       buffer_desc = GetUnusedBuffer(lock);
       if (buffer_desc == nullptr) {
         // No buffer available - the range will be rescheduled when a buffer is added.
@@ -199,21 +194,27 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
   // No locks in this section.  Only working on local vars.  We don't want to hold a
   // lock across the read call.
   Status read_status = file_reader_->Open(is_file_handle_caching_enabled());
+  bool eof = false;
   if (read_status.ok()) {
     COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
     COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
 
-    read_status = file_reader_->ReadFromPos(offset_ + BytesRead(), buffer_desc->buffer_,
-        min(len() - BytesRead(), buffer_desc->buffer_len_), &buffer_desc->len_,
-        &buffer_desc->eosr_);
-    buffer_desc->scan_range_offset_ = BytesRead() - buffer_desc->len_;
+    if (sub_ranges_.empty()) {
+      DCHECK(cache_.data == nullptr);
+      read_status = file_reader_->ReadFromPos(offset_ + bytes_read_, buffer_desc->buffer_,
+          min(len() - bytes_read_, buffer_desc->buffer_len_),
+          &buffer_desc->len_, &eof);
+    } else {
+      read_status = ReadSubRanges(buffer_desc.get(), &eof);
+    }
 
     COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
     COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
   }
 
   DCHECK(buffer_desc->buffer_ != nullptr);
-  DCHECK(!buffer_desc->is_cached()) << "HDFS cache reads don't go through this code path.";
+  DCHECK(!buffer_desc->is_cached()) <<
+      "Pure HDFS cache reads don't go through this code path.";
   if (!read_status.ok()) {
     // Free buffer to release resources before we cancel the range so that all buffers
     // are freed at cancellation.
@@ -228,9 +229,15 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
     return ReadOutcome::CANCELLED;
   }
 
+  bytes_read_ += buffer_desc->len();
+  DCHECK_LE(bytes_read_, bytes_to_read_);
+
+  // It is end of stream if it is end of file, or read all the bytes.
+  buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
+
   // After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
   // Store the state we need before calling EnqueueReadyBuffer().
-  bool eosr = buffer_desc->eosr_;
+  bool eosr = buffer_desc->eosr();
   // Read successful - enqueue the buffer and return the appropriate outcome.
   if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
   if (eosr) {
@@ -241,6 +248,42 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
   return ReadOutcome::SUCCESS_NO_EOSR;
 }
 
+Status ScanRange::ReadSubRanges(BufferDescriptor* buffer_desc, bool* eof) {
+  buffer_desc->len_ = 0;
+  while (buffer_desc->len() < buffer_desc->buffer_len() &&
+      sub_range_pos_.index < sub_ranges_.size()) {
+    SubRange& sub_range = sub_ranges_[sub_range_pos_.index];
+    int64_t offset = sub_range.offset + sub_range_pos_.bytes_read;
+    int64_t bytes_to_read = min(sub_range.length - sub_range_pos_.bytes_read,
+        buffer_desc->buffer_len() - buffer_desc->len());
+
+    if (cache_.data != nullptr) {
+      memcpy(buffer_desc->buffer_ + buffer_desc->len(),
+          cache_.data + offset, bytes_to_read);
+    } else {
+      int64_t current_bytes_read;
+      Status read_status = file_reader_->ReadFromPos(offset,
+          buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, &current_bytes_read,
+          eof);
+      if (!read_status.ok()) return read_status;
+      if (current_bytes_read != bytes_to_read) {
+        DCHECK(*eof);
+        DCHECK_LT(current_bytes_read, bytes_to_read);
+        return Status(TErrorCode::SCANNER_INCOMPLETE_READ, bytes_to_read,
+            current_bytes_read, file(), offset);
+      }
+    }
+
+    buffer_desc->len_ += bytes_to_read;
+    sub_range_pos_.bytes_read += bytes_to_read;
+    if (sub_range_pos_.bytes_read == sub_range.length) {
+      sub_range_pos_.index += 1;
+      sub_range_pos_.bytes_read = 0;
+    }
+  }
+  return Status::OK();
+}
+
 void ScanRange::SetBlockedOnBuffer() {
   unique_lock<mutex> lock(lock_);
   blocked_on_buffer_ = true;
@@ -339,9 +382,10 @@ string ScanRange::DebugString() const {
 }
 
 bool ScanRange::Validate() {
-  if (BytesRead() > len_) {
-    LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
-                 << " bytes_read_=" << BytesRead() << " len_=" << len_;
+  if (bytes_read_ > bytes_to_read_) {
+    LOG(ERROR) << "Bytes read tracking is wrong. Too many bytes have been read."
+                 << " bytes_read_=" << bytes_read_
+                 << " bytes_to_read_=" << bytes_to_read_;
     return false;
   }
   if (!cancel_status_.ok() && !ready_buffers_.empty()) {
@@ -377,8 +421,6 @@ ScanRange::ScanRange()
     external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {}
 
 ScanRange::~ScanRange() {
-  DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
-      << "Cached buffer was not released.";
   DCHECK(!read_in_flight_);
   DCHECK_EQ(0, ready_buffers_.size());
   DCHECK_EQ(0, num_buffers_in_reader_.Load());
@@ -386,6 +428,12 @@ ScanRange::~ScanRange() {
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
     int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
+  Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, {}, meta_data);
+}
+
+void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
+    int disk_id, bool expected_local, const BufferOpts& buffer_opts,
+    vector<SubRange>&& sub_ranges, void* meta_data) {
   DCHECK(ready_buffers_.empty());
   DCHECK(!read_in_flight_);
   DCHECK(file != nullptr);
@@ -401,6 +449,7 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
   }
   file_ = file;
   len_ = len;
+  bytes_to_read_ = len;
   offset_ = offset;
   disk_id_ = disk_id;
   try_cache_ = buffer_opts.try_cache_;
@@ -416,6 +465,59 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
   expected_local_ = expected_local;
   io_mgr_ = nullptr;
   reader_ = nullptr;
+  sub_ranges_.clear();
+  sub_range_pos_ = {};
+  InitSubRanges(move(sub_ranges));
+}
+
+void ScanRange::InitSubRanges(vector<SubRange>&& sub_ranges) {
+  sub_ranges_ = std::move(sub_ranges);
+  DCHECK(ValidateSubRanges());
+  MergeSubRanges();
+  DCHECK(ValidateSubRanges());
+  sub_range_pos_ = {};
+
+  if (sub_ranges_.empty()) return;
+
+  int length_sum = 0;
+  for (auto& sub_range : sub_ranges_) {
+    length_sum += sub_range.length;
+  }
+  bytes_to_read_ = length_sum;
+}
+
+bool ScanRange::ValidateSubRanges() {
+  for (int i = 0; i < sub_ranges_.size(); ++i) {
+    SubRange& sub_range = sub_ranges_[i];
+    if (sub_range.length <= 0) return false;
+    if (sub_range.offset < offset_) return false;
+    if (sub_range.offset + sub_range.length > offset_ + len_) return false;
+
+    if (i == sub_ranges_.size() - 1) break;
+
+    SubRange& next_sub_range = sub_ranges_[i+1];
+    if (sub_range.offset + sub_range.length > next_sub_range.offset) return false;
+  }
+  return true;
+}
+
+void ScanRange::MergeSubRanges() {
+  if (sub_ranges_.empty()) return;
+  for (int i = 0; i < sub_ranges_.size() - 1; ++i) {
+    SubRange& current = sub_ranges_[i];
+    int j = i + 1;
+    for (; j < sub_ranges_.size(); ++j) {
+      SubRange& sr_j = sub_ranges_[j];
+      if (sr_j.offset == current.offset + current.length) {
+        current.length += sr_j.length;
+      } else {
+        break;
+      }
+    }
+    if (j > i + 1) {
+      sub_ranges_.erase(sub_ranges_.begin() + i + 1, sub_ranges_.begin() + j);
+    }
+  }
 }
 
 void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
@@ -427,10 +529,16 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
   cancel_status_ = Status::OK();
   eosr_queued_ = false;
   blocked_on_buffer_ = false;
+  bytes_read_ = 0;
+  sub_range_pos_ = {};
   file_reader_->ResetState();
   DCHECK(Validate()) << DebugString();
 }
 
+void ScanRange::SetFileReader(unique_ptr<FileReader> file_reader) {
+  file_reader_ = move(file_reader);
+}
+
 int64_t ScanRange::MaxReadChunkSize() const {
   // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
   // interface).  So, hdfsRead() needs to allocate a Java byte[] and copy the data out.
@@ -454,54 +562,54 @@ Status ScanRange::ReadFromCache(
     const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
   DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
   DCHECK(try_cache_);
-  DCHECK_EQ(BytesRead(), 0);
+  DCHECK_EQ(bytes_read_, 0);
   *read_succeeded = false;
   Status status = file_reader_->Open(false);
   if (!status.ok()) return status;
 
-  // Cached reads not supported on local filesystem.
-  if (fs_ == nullptr) return Status::OK();
-
   // Check cancel status.
   {
     unique_lock<mutex> lock(lock_);
     RETURN_IF_ERROR(cancel_status_);
   }
 
-  DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
-  void* buffer = file_reader_->CachedFile();
-  if (buffer != nullptr) external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+  file_reader_->CachedFile(&cache_.data, &cache_.len);
   // Data was not cached, caller will fall back to normal read path.
-  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
+  if (cache_.data == nullptr) {
     VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
                << ". Switching to disk read path.";
     // Clean up the scan range state before re-issuing it.
     file_reader_->Close();
     return Status::OK();
   }
-  int bytes_read = BytesRead();
   // A partial read can happen when files are truncated.
   // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
   // between errors and partially cached blocks here.
-  if (bytes_read < len()) {
+  if (cache_.len < len()) {
     VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
-      << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
+      << len() << " bytes, but read " << cache_.len << ". Switching to disk read path.";
     // Close the scan range. 'read_succeeded' is still false, so the caller will fall back
     // to non-cached read of this scan range.
     file_reader_->Close();
     return Status::OK();
   }
 
+  *read_succeeded = true;
+  // If there are sub-ranges, then we need to memcpy() them from the cached buffer.
+  if (HasSubRanges()) return Status::OK();
+
+  DCHECK(external_buffer_tag_ != ExternalBufferTag::CLIENT_BUFFER);
+  external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+  bytes_read_ = cache_.len;
+
   // Create a single buffer desc for the entire scan range and enqueue that.
   // The memory is owned by the HDFS java client, not the Impala backend.
   unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
-      this, reinterpret_cast<uint8_t*>(buffer), 0));
-  desc->len_ = bytes_read;
-  desc->scan_range_offset_ = 0;
+      this, cache_.data, 0));
+  desc->len_ = cache_.len;
   desc->eosr_ = true;
   EnqueueReadyBuffer(move(desc));
-  COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
-  *read_succeeded = true;
+  COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, cache_.len);
   return Status::OK();
 }