You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/10/18 21:32:42 UTC
[3/4] impala git commit: IMPALA-7543: Enhance scan ranges to support
sub-ranges
IMPALA-7543: Enhance scan ranges to support sub-ranges
This commit enhances the ScanRange class to make it possible to only
read some smaller parts of the whole ScanRange. This functionality
is needed by IMPALA-5843.
A sub-range is an offset and length which is located within the scan
range. Sub-ranges can be added to a scan range when calling
ScanRange::Reset(). If done so, the ScanRange class will only read the
parts defined by the sub-ranges.
If we have sub-ranges for a cache read then the ScanRange won't
enqueue the whole cache buffer (which contains the whole ScanRange),
but memcpy() the sub-ranges to IO/client buffers.
Smaller refactorings needed to do:
* remove scan_range_offset_ from BufferDescriptor
* number of bytes read are bookkeeped by ScanRange again
Testing:
* introduced CacheReaderTestStub to fake cache reads during testing
* extended disk-io-mgr-test.cc with sub-ranges
Change-Id: Iea26ba386713990f7671aab5a372cf449b8d51e4
Reviewed-on: http://gerrit.cloudera.org:8080/11520
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/48fb4902
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/48fb4902
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/48fb4902
Branch: refs/heads/master
Commit: 48fb4902d4f28c9e4d327b80b00b33962c118c22
Parents: 2fb8eba
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Wed Sep 26 13:58:36 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 20:33:36 2018 +0000
----------------------------------------------------------------------
be/src/runtime/io/CMakeLists.txt | 1 -
be/src/runtime/io/cache-reader-test-stub.h | 62 +++++++++
be/src/runtime/io/disk-io-mgr-stress.cc | 3 +-
be/src/runtime/io/disk-io-mgr-test.cc | 177 +++++++++++++++++++----
be/src/runtime/io/disk-io-mgr.cc | 2 +-
be/src/runtime/io/file-reader.cc | 31 -----
be/src/runtime/io/file-reader.h | 23 ++-
be/src/runtime/io/hdfs-file-reader.cc | 32 ++---
be/src/runtime/io/hdfs-file-reader.h | 4 +-
be/src/runtime/io/local-file-reader.cc | 15 +-
be/src/runtime/io/local-file-reader.h | 5 +-
be/src/runtime/io/request-context.cc | 41 +++++-
be/src/runtime/io/request-context.h | 8 ++
be/src/runtime/io/request-ranges.h | 87 ++++++++++--
be/src/runtime/io/scan-range.cc | 178 +++++++++++++++++++-----
15 files changed, 511 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index bb26ef0..675bd6e 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -29,7 +29,6 @@ add_library(Io
error-converter.cc
request-context.cc
scan-range.cc
- file-reader.cc
hdfs-file-reader.cc
local-file-reader.cc
)
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/cache-reader-test-stub.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/cache-reader-test-stub.h b/be/src/runtime/io/cache-reader-test-stub.h
new file mode 100644
index 0000000..b53b30b
--- /dev/null
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/io/file-reader.h"
+#include "runtime/io/request-ranges.h"
+
+namespace impala {
+namespace io {
+
+/// Only for testing the code path when reading from the cache is successful.
+/// Takes a pointer to a buffer in its constructor, also the length of this buffer.
+/// CachedFile() simply returns the pointer and length.
+/// Invoking ReadFromPos() on it results in an error.
+class CacheReaderTestStub : public FileReader {
+public:
+ CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length) :
+ FileReader(scan_range),
+ cache_(cache),
+ length_(length) {
+ }
+
+ ~CacheReaderTestStub() {}
+
+ virtual Status Open(bool use_file_handle_cache) override {
+ return Status::OK();
+ }
+
+ virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
+ DCHECK(false);
+ return Status("Not implemented");
+ }
+
+ virtual void CachedFile(uint8_t** data, int64_t* length) override {
+ *length = length_;
+ *data = cache_;
+ }
+
+ virtual void Close() override {}
+private:
+ uint8_t* cache_ = nullptr;
+ int64_t length_ = 0;
+};
+
+}
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 16cfbf2..4335aa4 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -119,6 +119,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
while (!eos) {
ScanRange* range;
+ int64_t scan_range_offset = 0;
bool needs_buffers;
Status status = client->reader->GetNextUnstartedRange(&range, &needs_buffers);
CHECK(status.ok() || status.IsCancelled());
@@ -135,7 +136,6 @@ void DiskIoMgrStress::ClientThread(int client_id) {
CHECK(status.ok() || status.IsCancelled());
if (buffer == NULL) break;
- int64_t scan_range_offset = buffer->scan_range_offset();
int len = buffer->len();
CHECK_GE(scan_range_offset, 0);
CHECK_LT(scan_range_offset, expected.size());
@@ -154,6 +154,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
range->ReturnBuffer(move(buffer));
bytes_read += len;
+ scan_range_offset += len;
CHECK_GE(bytes_read, 0);
CHECK_LE(bytes_read, expected.size());
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index a6689a8..7a9bc23 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -24,6 +24,7 @@
#include "common/init.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/io/cache-reader-test-stub.h"
#include "runtime/io/local-file-system-with-fault-injection.h"
#include "runtime/io/disk-io-mgr-stress.h"
#include "runtime/io/disk-io-mgr.h"
@@ -179,7 +180,7 @@ class DiskIoMgrTest : public testing::Test {
}
ASSERT_OK(range->GetNext(&buffer));
ASSERT_TRUE(buffer != nullptr);
- EXPECT_EQ(buffer->len(), range->len());
+ EXPECT_EQ(buffer->len(), range->bytes_to_read());
if (expected_len < 0) expected_len = strlen(expected);
int cmp = memcmp(buffer->buffer(), expected, expected_len);
EXPECT_TRUE(cmp == 0);
@@ -190,6 +191,7 @@ class DiskIoMgrTest : public testing::Test {
const char* expected, int expected_len, const Status& expected_status) {
char result[expected_len + 1];
memset(result, 0, expected_len + 1);
+ int64_t scan_range_offset = 0;
while (true) {
unique_ptr<BufferDescriptor> buffer;
@@ -200,8 +202,9 @@ class DiskIoMgrTest : public testing::Test {
break;
}
ASSERT_LE(buffer->len(), expected_len);
- memcpy(result + range->offset() + buffer->scan_range_offset(),
+ memcpy(result + range->offset() + scan_range_offset,
buffer->buffer(), buffer->len());
+ scan_range_offset += buffer->len();
range->ReturnBuffer(move(buffer));
}
ValidateEmptyOrCorrect(expected, result, expected_len);
@@ -229,11 +232,16 @@ class DiskIoMgrTest : public testing::Test {
}
}
+ static void SetReaderStub(ScanRange* scan_range, unique_ptr<FileReader> reader_stub) {
+ scan_range->SetFileReader(move(reader_stub));
+ }
+
ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len,
- int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
+ int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false,
+ std::vector<ScanRange::SubRange> sub_ranges = {}) {
ScanRange* range = pool->Add(new ScanRange);
range->Reset(nullptr, file_path, len, offset, disk_id, true,
- BufferOpts(is_cached, mtime), meta_data);
+ BufferOpts(is_cached, mtime), move(sub_ranges), meta_data);
EXPECT_EQ(mtime, range->mtime());
return range;
}
@@ -252,6 +260,12 @@ class DiskIoMgrTest : public testing::Test {
const string& tmp_file, int offset, RequestContext* writer,
const string& expected_output);
+ void SingleReaderTestBody(const char* data, const char* expected_result,
+ vector<ScanRange::SubRange> sub_ranges = {});
+
+ void CachedReadsTestBody(const char* data, const char* expected,
+ bool fake_cache, vector<ScanRange::SubRange> sub_ranges = {});
+
/// Convenience function to get a reference to the buffer pool.
BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); }
@@ -522,13 +536,11 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
buffer_pool()->DeregisterClient(&read_client);
}
-// Basic test with a single reader, testing multiple threads, disks and a different
-// number of buffers.
-TEST_F(DiskIoMgrTest, SingleReader) {
- InitRootReservation(LARGE_RESERVATION_LIMIT);
+void DiskIoMgrTest::SingleReaderTestBody(const char* data, const char* expected_result,
+ vector<ScanRange::SubRange> sub_ranges) {
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
- const char* data = "abcdefghijklm";
- int len = strlen(data);
+ int data_len = strlen(data);
+ int expected_result_len = strlen(expected_result);
CreateTempFile(tmp_file, data);
// Get mtime for file
@@ -554,9 +566,10 @@ TEST_F(DiskIoMgrTest, SingleReader) {
unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
- for (int i = 0; i < len; ++i) {
+ for (int i = 0; i < data_len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, data_len, disk_id,
+ stat_val.st_mtime, nullptr, false, sub_ranges));
}
ASSERT_OK(reader->AddScanRanges(ranges));
@@ -564,7 +577,8 @@ TEST_F(DiskIoMgrTest, SingleReader) {
thread_group threads;
for (int i = 0; i < num_read_threads; ++i) {
threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
- &read_client, data, len, Status::OK(), 0, &num_ranges_processed));
+ &read_client, expected_result, expected_result_len, Status::OK(), 0,
+ &num_ranges_processed));
}
threads.join_all();
@@ -578,6 +592,23 @@ TEST_F(DiskIoMgrTest, SingleReader) {
EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
+// Basic test with a single reader, testing multiple threads, disks and a different
+// number of buffers.
+TEST_F(DiskIoMgrTest, SingleReader) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
+ const char* data = "abcdefghijklm";
+ SingleReaderTestBody(data, data);
+}
+
+TEST_F(DiskIoMgrTest, SingleReaderSubRanges) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
+ const char* data = "abcdefghijklm";
+ int64_t data_len = strlen(data);
+ SingleReaderTestBody(data, data, {{0, data_len}});
+ SingleReaderTestBody(data, "abdef", {{0, 2}, {3, 3}});
+ SingleReaderTestBody(data, "bceflm", {{1, 2}, {4, 2}, {11, 2}});
+}
+
// This test issues adding additional scan ranges while there are some still in flight.
TEST_F(DiskIoMgrTest, AddScanRangeTest) {
InitRootReservation(LARGE_RESERVATION_LIMIT);
@@ -872,14 +903,10 @@ TEST_F(DiskIoMgrTest, MemScarcity) {
}
}
-// Test when some scan ranges are marked as being cached.
-// Since these files are not in HDFS, the cached path always fails so this
-// only tests the fallback mechanism.
-// TODO: we can fake the cached read path without HDFS
-TEST_F(DiskIoMgrTest, CachedReads) {
- InitRootReservation(LARGE_RESERVATION_LIMIT);
+void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected,
+ bool fake_cache, vector<ScanRange::SubRange> sub_ranges) {
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
- const char* data = "abcdefghijklm";
+ uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
int len = strlen(data);
CreateTempFile(tmp_file, data);
@@ -898,17 +925,26 @@ TEST_F(DiskIoMgrTest, CachedReads) {
unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
ScanRange* complete_range =
- InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
+ InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true,
+ sub_ranges);
+ if (fake_cache) {
+ SetReaderStub(complete_range, make_unique<CacheReaderTestStub>(
+ complete_range, cached_data, len));
+ }
// Issue some reads before the async ones are issued
- ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(
- InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
+ ScanRange* range = InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime,
+ nullptr, true, sub_ranges);
+ ranges.push_back(range);
+ if (fake_cache) {
+ SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cached_data, len));
+ }
}
ASSERT_OK(reader->AddScanRanges(ranges));
@@ -916,19 +952,19 @@ TEST_F(DiskIoMgrTest, CachedReads) {
thread_group threads;
for (int i = 0; i < 5; ++i) {
threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client,
- data, strlen(data), Status::OK(), 0, &num_ranges_processed));
+ expected, strlen(expected), Status::OK(), 0, &num_ranges_processed));
}
// Issue some more sync ranges
for (int i = 0; i < 5; ++i) {
sched_yield();
- ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
}
threads.join_all();
- ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected);
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
io_mgr.UnregisterContext(reader.get());
@@ -938,6 +974,32 @@ TEST_F(DiskIoMgrTest, CachedReads) {
EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
+// Test when some scan ranges are marked as being cached.
+TEST_F(DiskIoMgrTest, CachedReads) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
+ const char* data = "abcdefghijklm";
+ // Don't fake the cache, i.e. test the fallback mechanism
+ CachedReadsTestBody(data, data, false);
+ // Fake the test with a file reader stub.
+ CachedReadsTestBody(data, data, true);
+}
+
+// Test when some scan ranges are marked as being cached and there
+// are sub-ranges as well.
+TEST_F(DiskIoMgrTest, CachedReadsSubRanges) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
+ const char* data = "abcdefghijklm";
+ int64_t data_len = strlen(data);
+
+ // first iteration tests the fallback mechanism with sub-ranges
+ // second iteration fakes a cache
+ for (bool fake_cache : {false, true}) {
+ CachedReadsTestBody(data, data, fake_cache, {{0, data_len}});
+ CachedReadsTestBody(data, "bc", fake_cache, {{1, 2}});
+ CachedReadsTestBody(data, "abchilm", fake_cache, {{0, 3}, {7, 2}, {11, 2}});
+ }
+}
+
TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
InitRootReservation(LARGE_RESERVATION_LIMIT);
const int ITERATIONS = 1;
@@ -1401,6 +1463,63 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
+// Test reading into a client-allocated buffer using sub-ranges.
+TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "the quick brown fox jumped over the lazy dog";
+ uint8_t* cache = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
+ int data_len = strlen(data);
+ int read_len = 4; // Make buffer size smaller than client-provided buffer.
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
+ ASSERT_OK(io_mgr->Init());
+ // Reader doesn't need to provide client if it's providing buffers.
+ unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
+
+ auto test_case = [&](bool fake_cache, const char* expected_result,
+ vector<ScanRange::SubRange> sub_ranges) {
+ int result_len = strlen(expected_result);
+ vector<uint8_t> client_buffer(result_len);
+ ScanRange* range = pool_.Add(new ScanRange);
+ range->Reset(nullptr, tmp_file, data_len, 0, 0, true,
+ BufferOpts::ReadInto(fake_cache, stat_val.st_mtime, client_buffer.data(),
+ result_len), move(sub_ranges));
+ if (fake_cache) {
+ SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
+ }
+ bool needs_buffers;
+ ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
+ ASSERT_FALSE(needs_buffers);
+
+ unique_ptr<BufferDescriptor> io_buffer;
+ ASSERT_OK(range->GetNext(&io_buffer));
+ ASSERT_TRUE(io_buffer->eosr());
+ ASSERT_EQ(result_len, io_buffer->len());
+ ASSERT_EQ(client_buffer.data(), io_buffer->buffer());
+ ASSERT_EQ(memcmp(io_buffer->buffer(), expected_result, result_len), 0);
+
+ // DiskIoMgr should not have allocated memory.
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
+ range->ReturnBuffer(move(io_buffer));
+ };
+
+ for (bool fake_cache : {false, true}) {
+ test_case(fake_cache, data, {{0, data_len}});
+ test_case(fake_cache, data, {{0, 15}, {15, data_len - 15}});
+ test_case(fake_cache, "quick fox", {{4, 5}, {15, 4}});
+ test_case(fake_cache, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4, 4}});
+ }
+
+ io_mgr->UnregisterContext(reader.get());
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
+}
+
// Test reading into a client-allocated buffer where the read fails.
TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
InitRootReservation(LARGE_RESERVATION_LIMIT);
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 75136ec..fa871d1 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -302,7 +302,7 @@ Status DiskIoMgr::AllocateBuffersForRange(
BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
Status status;
vector<unique_ptr<BufferDescriptor>> buffers;
- for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
+ for (int64_t buffer_size : ChooseBufferSizes(range->bytes_to_read(), max_bytes)) {
BufferPool::BufferHandle handle;
status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
if (!status.ok()) goto error;
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/file-reader.cc b/be/src/runtime/io/file-reader.cc
deleted file mode 100644
index 81ef090..0000000
--- a/be/src/runtime/io/file-reader.cc
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "gutil/strings/substitute.h"
-#include "runtime/io/file-reader.h"
-
-#include "common/names.h"
-
-namespace impala {
-namespace io {
-
-string FileReader::DebugString() const {
- return Substitute("bytes_read=$0", bytes_read_);
-}
-
-}
-}
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h
index 40f2a5c..9dbcc31 100644
--- a/be/src/runtime/io/file-reader.h
+++ b/be/src/runtime/io/file-reader.h
@@ -40,33 +40,31 @@ public:
FileReader(ScanRange* scan_range) : scan_range_(scan_range) {}
virtual ~FileReader() {}
- /// Returns number of bytes read by this file reader.
- int bytes_read() const { return bytes_read_; }
-
/// Opens file that is associated with 'scan_range_'.
/// 'use_file_handle_cache' currently only used by HdfsFileReader.
virtual Status Open(bool use_file_handle_cache) = 0;
/// Reads bytes from given position ('file_offset'). Tries to read
/// 'bytes_to_read' amount of bytes. 'bytes_read' contains the number of
- /// bytes actually read. 'eosr' is set to true when end of file has reached,
- /// or the file reader has read all the bytes needed by 'scan_range_'.
+ /// bytes actually read. 'eof' is set to true when end of file has reached.
virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) = 0;
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0;
/// ***Currently only for HDFS***
- /// Returns a pointer to a cached buffer that contains the contents of the file.
- virtual void* CachedFile() = 0;
+ /// When successful, sets 'data' to a buffer that contains the contents of a file,
+ /// and 'length' is set to the length of the data.
+ /// When unsuccessful, 'data' is set to nullptr.
+ virtual void CachedFile(uint8_t** data, int64_t* length) = 0;
/// Closes the file associated with 'scan_range_'. It doesn't have effect on other
/// scan ranges.
virtual void Close() = 0;
- /// Reset internal bookkeeping, e.g. how many bytes have been read.
- virtual void ResetState() { bytes_read_ = 0; }
+ /// Resets internal bookkeeping
+ virtual void ResetState() {}
// Debug string of this file reader.
- virtual std::string DebugString() const;
+ virtual std::string DebugString() const { return ""; }
SpinLock& lock() { return lock_; }
protected:
@@ -81,9 +79,6 @@ protected:
/// The scan range this file reader serves.
ScanRange* const scan_range_;
-
- /// Number of bytes read by this reader.
- int bytes_read_ = 0;
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index ea61b6a..c6a6cec 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -40,6 +40,7 @@ namespace io {
HdfsFileReader::~HdfsFileReader() {
DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
+ DCHECK(cached_buffer_ == nullptr) << "Cached buffer was not released.";
}
Status HdfsFileReader::Open(bool use_file_handle_cache) {
@@ -73,7 +74,7 @@ Status HdfsFileReader::Open(bool use_file_handle_cache) {
}
Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) {
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
DCHECK(scan_range_->read_in_flight());
DCHECK_GE(bytes_to_read, 0);
// Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -87,7 +88,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
auto io_mgr = scan_range_->io_mgr_;
auto request_context = scan_range_->reader_;
- *eosr = false;
+ *eof = false;
*bytes_read = 0;
CachedHdfsFileHandle* borrowed_hdfs_fh = nullptr;
@@ -151,7 +152,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
DCHECK_GT(current_bytes_read, -1);
if (current_bytes_read == 0) {
// No more bytes in the file. The scan range went past the end.
- *eosr = true;
+ *eof = true;
break;
}
*bytes_read += current_bytes_read;
@@ -164,12 +165,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
if (borrowed_hdfs_fh != nullptr) {
io_mgr->ReleaseCachedHdfsFileHandle(scan_range_->file_string(), borrowed_hdfs_fh);
}
- if (!status.ok())
- return status;
- bytes_read_ += *bytes_read;
- DCHECK_LE(bytes_read_, scan_range_->len());
- if (bytes_read_ == scan_range_->len()) *eosr = true;
- return Status::OK();
+ return status;
}
Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
@@ -203,16 +199,22 @@ Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_
return Status::OK();
}
-void* HdfsFileReader::CachedFile() {
+void HdfsFileReader::CachedFile(uint8_t** data, int64_t* length) {
{
unique_lock<SpinLock> hdfs_lock(lock_);
+ DCHECK(cached_buffer_ == nullptr);
DCHECK(exclusive_hdfs_fh_ != nullptr);
cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(),
scan_range_->io_mgr_->cached_read_options(), scan_range_->len());
}
- if (cached_buffer_ == nullptr) return nullptr;
- bytes_read_ = hadoopRzBufferLength(cached_buffer_);
- return const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
+ if (cached_buffer_ == nullptr) {
+ *data = nullptr;
+ *length = 0;
+ return;
+ }
+ *data = reinterpret_cast<uint8_t*>(
+ const_cast<void*>(hadoopRzBufferGet(cached_buffer_)));
+ *length = hadoopRzBufferLength(cached_buffer_);
}
void HdfsFileReader::Close() {
@@ -220,11 +222,9 @@ void HdfsFileReader::Close() {
if (exclusive_hdfs_fh_ != nullptr) {
GetHdfsStatistics(exclusive_hdfs_fh_->file());
- if (scan_range_->external_buffer_tag_ ==
- ScanRange::ExternalBufferTag::CACHED_BUFFER) {
+ if (cached_buffer_ != nullptr) {
hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
cached_buffer_ = nullptr;
- scan_range_->external_buffer_tag_ = ScanRange::ExternalBufferTag::NO_BUFFER;
}
// Destroy the file handle.
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index 3b3642b..ba95095 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -34,11 +34,11 @@ public:
virtual Status Open(bool use_file_handle_cache) override;
virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override;
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
/// Reads from the DN cache. On success, sets cached_buffer_ to the DN
/// buffer and returns a pointer to the underlying raw buffer.
/// Returns nullptr if the data is not cached.
- virtual void* CachedFile() override;
+ virtual void CachedFile(uint8_t** data, int64_t* length) override;
virtual void Close() override;
virtual void ResetState() override;
virtual std::string DebugString() const override;
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc
index 47c025b..3f88106 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -49,7 +49,7 @@ Status LocalFileReader::Open(bool use_file_handle_cache) {
}
Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) {
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
DCHECK(scan_range_->read_in_flight());
DCHECK_GE(bytes_to_read, 0);
// Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -61,7 +61,7 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
unique_lock<SpinLock> fs_lock(lock_);
RETURN_IF_ERROR(scan_range_->cancel_status_);
- *eosr = false;
+ *eof = false;
*bytes_read = 0;
DCHECK(file_ != nullptr);
@@ -85,17 +85,14 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
}
// On Linux, we should only get partial reads from block devices on error or eof.
DCHECK(feof(file_) != 0);
- *eosr = true;
+ *eof = true;
}
- bytes_read_ += *bytes_read;
- DCHECK_LE(bytes_read_, scan_range_->len());
- if (bytes_read_ == scan_range_->len()) *eosr = true;
return Status::OK();
}
-void* LocalFileReader::CachedFile() {
- DCHECK(false);
- return nullptr;
+void LocalFileReader::CachedFile(uint8_t** data, int64_t* length) {
+ *data = nullptr;
+ *length = 0;
}
void LocalFileReader::Close() {
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-reader.h b/be/src/runtime/io/local-file-reader.h
index d8eedd1..14cff3c 100644
--- a/be/src/runtime/io/local-file-reader.h
+++ b/be/src/runtime/io/local-file-reader.h
@@ -17,7 +17,6 @@
#pragma once
-#include "common/hdfs.h"
#include "runtime/io/file-reader.h"
namespace impala {
@@ -32,9 +31,9 @@ public:
virtual Status Open(bool use_file_handle_cache) override;
virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override;
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
/// We don't cache files of the local file system.
- virtual void* CachedFile() override;
+ virtual void CachedFile(uint8_t** data, int64_t* length) override;
virtual void Close() override;
private:
/// Points to a C FILE object between calls to Open() and Close(), otherwise nullptr.
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 56f6704..24213f9 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -425,7 +425,8 @@ Status RequestContext::GetNextUnstartedRange(ScanRange** range, bool* needs_buff
*range = cached_ranges_.Dequeue();
DCHECK((*range)->try_cache());
bool cached_read_succeeded;
- RETURN_IF_ERROR((*range)->ReadFromCache(lock, &cached_read_succeeded));
+ RETURN_IF_ERROR(TryReadFromCache(lock, *range, &cached_read_succeeded,
+ needs_buffers));
if (cached_read_succeeded) return Status::OK();
// This range ended up not being cached. Loop again and pick up a new range.
@@ -472,12 +473,9 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) {
DCHECK_NE(range->len(), 0);
if (range->try_cache()) {
bool cached_read_succeeded;
- RETURN_IF_ERROR(range->ReadFromCache(lock, &cached_read_succeeded));
- if (cached_read_succeeded) {
- DCHECK(Validate()) << endl << DebugString();
- *needs_buffers = false;
- return Status::OK();
- }
+ RETURN_IF_ERROR(TryReadFromCache(lock, range, &cached_read_succeeded,
+ needs_buffers));
+ if (cached_read_succeeded) return Status::OK();
// Cached read failed, fall back to normal read path.
}
// If we don't have a buffer yet, the caller must allocate buffers for the range.
@@ -491,6 +489,35 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) {
return Status::OK();
}
+Status RequestContext::TryReadFromCache(const unique_lock<mutex>& lock,
+ ScanRange* range, bool* read_succeeded, bool* needs_buffers) {
+ DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+ RETURN_IF_ERROR(range->ReadFromCache(lock, read_succeeded));
+ if (!*read_succeeded) return Status::OK();
+
+ DCHECK(Validate()) << endl << DebugString();
+ ScanRange::ExternalBufferTag buffer_tag = range->external_buffer_tag();
+ // The following cases are possible at this point:
+ // * The scan range doesn't have sub-ranges:
+ // ** buffer_tag is CACHED_BUFFER and the buffer is already available to the reader.
+ // (there is nothing to do)
+ //
+ // * The scan range has sub-ranges, and buffer_tag is:
+ // ** NO_BUFFER: the client needs to add buffers to the scan range
+ // ** CLIENT_BUFFER: the client already provided a buffer to copy data into it
+ *needs_buffers = buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER;
+ if (*needs_buffers) {
+ DCHECK(range->HasSubRanges());
+ range->SetBlockedOnBuffer();
+ // The range will be scheduled when buffers are added to it.
+ AddRangeToDisk(lock, range, ScheduleMode::BY_CALLER);
+ } else if (buffer_tag == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+ DCHECK(range->HasSubRanges());
+ AddRangeToDisk(lock, range, ScheduleMode::IMMEDIATELY);
+ }
+ return Status::OK();
+}
+
Status RequestContext::AddWriteRange(WriteRange* write_range) {
unique_lock<mutex> lock(lock_);
if (state_ == RequestContext::Cancelled) return CONTEXT_CANCELLED;
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 0fcbca3..24b1bb5 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -281,6 +281,14 @@ class RequestContext {
void RemoveActiveScanRangeLocked(
const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
+ /// Try to read the scan range from the cache. '*read_succeeded' is set to true if the
+ /// scan range can be found in the cache, otherwise false.
+ /// If '*needs_buffers' is returned as true, the caller must call
+ /// AllocateBuffersForRange() to add buffers for the data to be read into before the
+ /// range can be scheduled.
+ Status TryReadFromCache(const boost::unique_lock<boost::mutex>& lock, ScanRange* range,
+ bool* read_succeeded, bool* needs_buffers);
+
// Counters are updated by other classes - expose to other io:: classes for convenience.
/// Total bytes read for this reader
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 3bf55d6..1dc451a 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -60,9 +60,6 @@ class BufferDescriptor {
int64_t len() { return len_; }
bool eosr() { return eosr_; }
- /// Returns the offset within the scan range that this buffer starts at
- int64_t scan_range_offset() const { return scan_range_offset_; }
-
private:
DISALLOW_COPY_AND_ASSIGN(BufferDescriptor);
/// This class is tightly coupled with ScanRange. Making them friends is easiest.
@@ -101,8 +98,6 @@ class BufferDescriptor {
/// true if the current scan range is complete
bool eosr_ = false;
- int64_t scan_range_offset_ = 0;
-
// Handle to an allocated buffer and the client used to allocate it buffer. Only used
// for non-external buffers.
BufferPool::ClientHandle* bp_client_ = nullptr;
@@ -191,6 +186,15 @@ struct BufferOpts {
return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len);
}
+ /// Use only when you don't want to to read the entire scan range, but only sub-ranges
+ /// in it. In this case you can copy the relevant parts from the HDFS cache into the
+ /// client buffer. The length of the buffer, 'client_buffer_len' must fit the
+ /// concatenation of all the sub-ranges.
+ static BufferOpts ReadInto(bool try_cache, int64_t mtime, uint8_t* client_buffer,
+ int64_t client_buffer_len) {
+ return BufferOpts(try_cache, mtime, client_buffer, client_buffer_len);
+ }
+
private:
friend class ScanRange;
friend class HdfsFileReader;
@@ -227,6 +231,12 @@ class ScanRange : public RequestRange {
virtual ~ScanRange();
+ /// Defines an internal range within this ScanRange.
+ struct SubRange {
+ int64_t offset;
+ int64_t length;
+ };
+
/// Resets this scan range object with the scan range description. The scan range
/// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
/// local filesystem). The scan range must be non-empty and fall within the file bounds
@@ -238,10 +248,17 @@ class ScanRange : public RequestRange {
void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr);
+ /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
+ /// in advance, as this method will do the merge.
+ void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
+ bool expected_local, const BufferOpts& buffer_opts,
+ std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr);
+
void* meta_data() const { return meta_data_; }
bool try_cache() const { return try_cache_; }
bool read_in_flight() const { return read_in_flight_; }
bool expected_local() const { return expected_local_; }
+ int64_t bytes_to_read() const { return bytes_to_read_; }
/// Returns the next buffer for this scan range. buffer is an output parameter.
/// This function blocks until a buffer is ready or an error occurred. If this is
@@ -269,7 +286,7 @@ class ScanRange : public RequestRange {
int64_t mtime() const { return mtime_; }
- int BytesRead() const;
+ bool HasSubRanges() const { return !sub_ranges_.empty(); }
private:
DISALLOW_COPY_AND_ASSIGN(ScanRange);
@@ -279,6 +296,7 @@ class ScanRange : public RequestRange {
friend class BufferDescriptor;
friend class DiskQueue;
friend class DiskIoMgr;
+ friend class DiskIoMgrTest;
friend class RequestContext;
friend class HdfsFileReader;
friend class LocalFileReader;
@@ -344,6 +362,9 @@ class ScanRange : public RequestRange {
/// while any thread is inside a critical section.
Status cancel_status_;
+ /// Only for testing
+ void SetFileReader(std::unique_ptr<FileReader> file_reader);
+
/// END: private members that are accessed by other io:: classes
/////////////////////////////////////////
@@ -383,10 +404,28 @@ class ScanRange : public RequestRange {
return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty());
}
+ /// Adds sub-ranges to this ScanRange. If sub_ranges is not empty, then ScanRange won't
+ /// read everything from its range, but will only read these sub-ranges.
+ /// Sub-ranges need to be ordered by 'offset' and cannot overlap with each other.
+ /// Doesn't need to merge continuous sub-ranges in advance, this method will do.
+ void InitSubRanges(std::vector<SubRange>&& sub_ranges);
+
+ /// Read the sub-ranges into buffer and track the current position in 'sub_range_pos_'.
+ /// If cached data is available, then memcpy() from it instead of actually reading the
+ /// files.
+ Status ReadSubRanges(BufferDescriptor* buffer, bool* eof);
+
/// Validates the internal state of this range. lock_ must be taken
/// before calling this.
bool Validate();
+ /// Validates the sub-ranges. All sub-range must be inside of this ScanRange.
+ /// They need to be ordered by offset and cannot overlap.
+ bool ValidateSubRanges();
+
+ /// Merges adjacent and continuous sub-ranges.
+ void MergeSubRanges();
+
/// Pointer to caller specified metadata. This is untouched by the io manager
/// and the caller can put whatever auxiliary data in here.
void* meta_data_ = nullptr;
@@ -418,18 +457,26 @@ class ScanRange : public RequestRange {
/// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER.
struct {
/// Client-provided buffer to read the whole scan range into.
- uint8_t* data;
+ uint8_t* data = nullptr;
/// Length of the client-provided buffer.
- int64_t len;
+ int64_t len = 0;
} client_buffer_;
+ /// Valid if reading file contents from cache was successful.
+ struct {
+ /// Pointer to the contents of the file.
+ uint8_t* data = nullptr;
+ /// Length of the contents.
+ int64_t len = 0;
+ } cache_;
+
/// The number of buffers that have been returned to a client via GetNext() that have
/// not yet been returned with ReturnBuffer().
AtomicInt32 num_buffers_in_reader_{0};
/// Lock protecting fields below.
- /// This lock should not be taken during Open()/Read()/Close().
+ /// This lock should not be taken during FileReader::Open()/Read()/Close().
/// If RequestContext::lock_ and this lock need to be held simultaneously,
/// RequestContext::lock_ must be taken first.
boost::mutex lock_;
@@ -475,8 +522,30 @@ class ScanRange : public RequestRange {
/// cancelled.
ConditionVariable buffer_ready_cv_;
+ /// Number of bytes read by this scan range.
+ int64_t bytes_read_ = 0;
+
/// Polymorphic object that is responsible for doing file operations.
std::unique_ptr<FileReader> file_reader_;
+
+ /// If not empty, the ScanRange will only read these parts from the file.
+ std::vector<SubRange> sub_ranges_;
+
+ // Read position in the sub-ranges.
+ struct SubRangePosition {
+ /// Index of SubRange in 'ScanRange::sub_ranges_' to read next
+ int64_t index = 0;
+ /// Bytes already read from 'ScanRange::sub_ranges_[sub_range_index]'
+ int64_t bytes_read = 0;
+ };
+
+ /// Current read position in the sub-ranges.
+ SubRangePosition sub_range_pos_;
+
+ /// Number of bytes need to be read by this ScanRange. If there are no sub-ranges it
+ /// equals to 'len_'. If there are sub-ranges then it equals to the sum of the lengths
+ /// of the sub-ranges (which is less than or equal to 'len_').
+ int64_t bytes_to_read_ = 0;
};
/// Used to specify data to be written to a file and offset.
http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index aaef6b4..660710e 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -163,13 +163,8 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
return result;
}
-int ScanRange::BytesRead() const {
- DCHECK(file_reader_ != nullptr);
- return file_reader_->bytes_read();
-}
-
ReadOutcome ScanRange::DoRead(int disk_id) {
- int64_t bytes_remaining = len_ - BytesRead();
+ int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
DCHECK_GT(bytes_remaining, 0);
unique_ptr<BufferDescriptor> buffer_desc;
@@ -183,8 +178,8 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
this, client_buffer_.data, client_buffer_.len));
} else {
DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
- << "This code path does not handle other buffer types, i.e. HDFS cache"
- << static_cast<int>(external_buffer_tag_);
+ << "This code path does not handle other buffer types, i.e. HDFS cache. "
+ << "external_buffer_tag_=" << static_cast<int>(external_buffer_tag_);
buffer_desc = GetUnusedBuffer(lock);
if (buffer_desc == nullptr) {
// No buffer available - the range will be rescheduled when a buffer is added.
@@ -199,21 +194,27 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
// No locks in this section. Only working on local vars. We don't want to hold a
// lock across the read call.
Status read_status = file_reader_->Open(is_file_handle_caching_enabled());
+ bool eof = false;
if (read_status.ok()) {
COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
- read_status = file_reader_->ReadFromPos(offset_ + BytesRead(), buffer_desc->buffer_,
- min(len() - BytesRead(), buffer_desc->buffer_len_), &buffer_desc->len_,
- &buffer_desc->eosr_);
- buffer_desc->scan_range_offset_ = BytesRead() - buffer_desc->len_;
+ if (sub_ranges_.empty()) {
+ DCHECK(cache_.data == nullptr);
+ read_status = file_reader_->ReadFromPos(offset_ + bytes_read_, buffer_desc->buffer_,
+ min(len() - bytes_read_, buffer_desc->buffer_len_),
+ &buffer_desc->len_, &eof);
+ } else {
+ read_status = ReadSubRanges(buffer_desc.get(), &eof);
+ }
COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
}
DCHECK(buffer_desc->buffer_ != nullptr);
- DCHECK(!buffer_desc->is_cached()) << "HDFS cache reads don't go through this code path.";
+ DCHECK(!buffer_desc->is_cached()) <<
+ "Pure HDFS cache reads don't go through this code path.";
if (!read_status.ok()) {
// Free buffer to release resources before we cancel the range so that all buffers
// are freed at cancellation.
@@ -228,9 +229,15 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
return ReadOutcome::CANCELLED;
}
+ bytes_read_ += buffer_desc->len();
+ DCHECK_LE(bytes_read_, bytes_to_read_);
+
+ // It is end of stream if it is end of file, or read all the bytes.
+ buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
+
// After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
// Store the state we need before calling EnqueueReadyBuffer().
- bool eosr = buffer_desc->eosr_;
+ bool eosr = buffer_desc->eosr();
// Read successful - enqueue the buffer and return the appropriate outcome.
if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
if (eosr) {
@@ -241,6 +248,42 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
return ReadOutcome::SUCCESS_NO_EOSR;
}
+Status ScanRange::ReadSubRanges(BufferDescriptor* buffer_desc, bool* eof) {
+ buffer_desc->len_ = 0;
+ while (buffer_desc->len() < buffer_desc->buffer_len() &&
+ sub_range_pos_.index < sub_ranges_.size()) {
+ SubRange& sub_range = sub_ranges_[sub_range_pos_.index];
+ int64_t offset = sub_range.offset + sub_range_pos_.bytes_read;
+ int64_t bytes_to_read = min(sub_range.length - sub_range_pos_.bytes_read,
+ buffer_desc->buffer_len() - buffer_desc->len());
+
+ if (cache_.data != nullptr) {
+ memcpy(buffer_desc->buffer_ + buffer_desc->len(),
+ cache_.data + offset, bytes_to_read);
+ } else {
+ int64_t current_bytes_read;
+ Status read_status = file_reader_->ReadFromPos(offset,
+ buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, ¤t_bytes_read,
+ eof);
+ if (!read_status.ok()) return read_status;
+ if (current_bytes_read != bytes_to_read) {
+ DCHECK(*eof);
+ DCHECK_LT(current_bytes_read, bytes_to_read);
+ return Status(TErrorCode::SCANNER_INCOMPLETE_READ, bytes_to_read,
+ current_bytes_read, file(), offset);
+ }
+ }
+
+ buffer_desc->len_ += bytes_to_read;
+ sub_range_pos_.bytes_read += bytes_to_read;
+ if (sub_range_pos_.bytes_read == sub_range.length) {
+ sub_range_pos_.index += 1;
+ sub_range_pos_.bytes_read = 0;
+ }
+ }
+ return Status::OK();
+}
+
void ScanRange::SetBlockedOnBuffer() {
unique_lock<mutex> lock(lock_);
blocked_on_buffer_ = true;
@@ -339,9 +382,10 @@ string ScanRange::DebugString() const {
}
bool ScanRange::Validate() {
- if (BytesRead() > len_) {
- LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
- << " bytes_read_=" << BytesRead() << " len_=" << len_;
+ if (bytes_read_ > bytes_to_read_) {
+ LOG(ERROR) << "Bytes read tracking is wrong. Too many bytes have been read."
+ << " bytes_read_=" << bytes_read_
+ << " bytes_to_read_=" << bytes_to_read_;
return false;
}
if (!cancel_status_.ok() && !ready_buffers_.empty()) {
@@ -377,8 +421,6 @@ ScanRange::ScanRange()
external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {}
ScanRange::~ScanRange() {
- DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
- << "Cached buffer was not released.";
DCHECK(!read_in_flight_);
DCHECK_EQ(0, ready_buffers_.size());
DCHECK_EQ(0, num_buffers_in_reader_.Load());
@@ -386,6 +428,12 @@ ScanRange::~ScanRange() {
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
+ Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, {}, meta_data);
+}
+
+void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
+ int disk_id, bool expected_local, const BufferOpts& buffer_opts,
+ vector<SubRange>&& sub_ranges, void* meta_data) {
DCHECK(ready_buffers_.empty());
DCHECK(!read_in_flight_);
DCHECK(file != nullptr);
@@ -401,6 +449,7 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
}
file_ = file;
len_ = len;
+ bytes_to_read_ = len;
offset_ = offset;
disk_id_ = disk_id;
try_cache_ = buffer_opts.try_cache_;
@@ -416,6 +465,59 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
expected_local_ = expected_local;
io_mgr_ = nullptr;
reader_ = nullptr;
+ sub_ranges_.clear();
+ sub_range_pos_ = {};
+ InitSubRanges(move(sub_ranges));
+}
+
+void ScanRange::InitSubRanges(vector<SubRange>&& sub_ranges) {
+ sub_ranges_ = std::move(sub_ranges);
+ DCHECK(ValidateSubRanges());
+ MergeSubRanges();
+ DCHECK(ValidateSubRanges());
+ sub_range_pos_ = {};
+
+ if (sub_ranges_.empty()) return;
+
+ int length_sum = 0;
+ for (auto& sub_range : sub_ranges_) {
+ length_sum += sub_range.length;
+ }
+ bytes_to_read_ = length_sum;
+}
+
+bool ScanRange::ValidateSubRanges() {
+ for (int i = 0; i < sub_ranges_.size(); ++i) {
+ SubRange& sub_range = sub_ranges_[i];
+ if (sub_range.length <= 0) return false;
+ if (sub_range.offset < offset_) return false;
+ if (sub_range.offset + sub_range.length > offset_ + len_) return false;
+
+ if (i == sub_ranges_.size() - 1) break;
+
+ SubRange& next_sub_range = sub_ranges_[i+1];
+ if (sub_range.offset + sub_range.length > next_sub_range.offset) return false;
+ }
+ return true;
+}
+
+void ScanRange::MergeSubRanges() {
+ if (sub_ranges_.empty()) return;
+ for (int i = 0; i < sub_ranges_.size() - 1; ++i) {
+ SubRange& current = sub_ranges_[i];
+ int j = i + 1;
+ for (; j < sub_ranges_.size(); ++j) {
+ SubRange& sr_j = sub_ranges_[j];
+ if (sr_j.offset == current.offset + current.length) {
+ current.length += sr_j.length;
+ } else {
+ break;
+ }
+ }
+ if (j > i + 1) {
+ sub_ranges_.erase(sub_ranges_.begin() + i + 1, sub_ranges_.begin() + j);
+ }
+ }
}
void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
@@ -427,10 +529,16 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
cancel_status_ = Status::OK();
eosr_queued_ = false;
blocked_on_buffer_ = false;
+ bytes_read_ = 0;
+ sub_range_pos_ = {};
file_reader_->ResetState();
DCHECK(Validate()) << DebugString();
}
+void ScanRange::SetFileReader(unique_ptr<FileReader> file_reader) {
+ file_reader_ = move(file_reader);
+}
+
int64_t ScanRange::MaxReadChunkSize() const {
// S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
// interface). So, hdfsRead() needs to allocate a Java byte[] and copy the data out.
@@ -454,54 +562,54 @@ Status ScanRange::ReadFromCache(
const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
DCHECK(try_cache_);
- DCHECK_EQ(BytesRead(), 0);
+ DCHECK_EQ(bytes_read_, 0);
*read_succeeded = false;
Status status = file_reader_->Open(false);
if (!status.ok()) return status;
- // Cached reads not supported on local filesystem.
- if (fs_ == nullptr) return Status::OK();
-
// Check cancel status.
{
unique_lock<mutex> lock(lock_);
RETURN_IF_ERROR(cancel_status_);
}
- DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
- void* buffer = file_reader_->CachedFile();
- if (buffer != nullptr) external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+ file_reader_->CachedFile(&cache_.data, &cache_.len);
// Data was not cached, caller will fall back to normal read path.
- if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
+ if (cache_.data == nullptr) {
VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
<< ". Switching to disk read path.";
// Clean up the scan range state before re-issuing it.
file_reader_->Close();
return Status::OK();
}
- int bytes_read = BytesRead();
// A partial read can happen when files are truncated.
// TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
// between errors and partially cached blocks here.
- if (bytes_read < len()) {
+ if (cache_.len < len()) {
VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
- << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
+ << len() << " bytes, but read " << cache_.len << ". Switching to disk read path.";
// Close the scan range. 'read_succeeded' is still false, so the caller will fall back
// to non-cached read of this scan range.
file_reader_->Close();
return Status::OK();
}
+ *read_succeeded = true;
+ // If there are sub-ranges, then we need to memcpy() them from the cached buffer.
+ if (HasSubRanges()) return Status::OK();
+
+ DCHECK(external_buffer_tag_ != ExternalBufferTag::CLIENT_BUFFER);
+ external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+ bytes_read_ = cache_.len;
+
// Create a single buffer desc for the entire scan range and enqueue that.
// The memory is owned by the HDFS java client, not the Impala backend.
unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
- this, reinterpret_cast<uint8_t*>(buffer), 0));
- desc->len_ = bytes_read;
- desc->scan_range_offset_ = 0;
+ this, cache_.data, 0));
+ desc->len_ = cache_.len;
desc->eosr_ = true;
EnqueueReadyBuffer(move(desc));
- COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
- *read_succeeded = true;
+ COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, cache_.len);
return Status::OK();
}