You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/09 02:42:23 UTC

[2/2] incubator-impala git commit: IMPALA-3780: avoid many small reads past end of block

IMPALA-3780: avoid many small reads past end of block

The text scanner had some pathological behaviour when reading
significantly past the end of it scan range. E.g. reading a 256mb string
that's split across blocks. ScannerContext defaulted to issuing 1kb
reads, even if the scan node requested significantly more data. E.g. if
the Parquet scanner called ReadBytes(16mb), this was chopped up into
1kb reads, which were reassembled in boundary_buffer_.

Increase the minimum read size in this case to 64kb. Reading that amount
of data should not have any significant overhead even if we only read
a few bytes past the end of the scan range.

ScannerContext implements a saner default algorithm that will work better
if scanners make many small reads: it starts with 64kb reads and doubles
the size of each successive read past the end of the scan range. We
also correct pass the 'read_past_size' into GetNextBuffer(), so that
we always read the right amount of data.

Also save some time by pre-sizing the boundary buffer to the correct
size instead of reallocating it multiple times.

Testing:
Add test case that exercises the code paths for very large strings.

Performance:
The queries in the test case are vastly faster than before. E.g. 0.6s
vs ~60s for the count(*) query.

Change-Id: Id90c5dea44f07dba5dd465cf325fbff28be34137
Reviewed-on: http://gerrit.cloudera.org:8080/3518
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3810b7c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3810b7c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3810b7c4

Branch: refs/heads/master
Commit: 3810b7c413f575ad5970d3fb5c53509d81cb0032
Parents: ed5ec67
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jun 24 16:29:03 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Fri Jul 8 19:42:18 2016 -0700

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc | 15 ++++-------
 be/src/exec/hdfs-text-scanner.h      |  2 +-
 be/src/exec/scanner-context.cc       | 43 +++++++++++++++++++------------
 be/src/exec/scanner-context.h        | 39 ++++++++++++++++++----------
 be/src/runtime/string-buffer.h       |  3 ++-
 tests/query_test/test_insert.py      | 29 +++++++++++++++++++++
 6 files changed, 89 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3810b7c4/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index d235b77..dc7a983 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -33,8 +33,7 @@ const int BaseSequenceScanner::SYNC_MARKER = -1;
 
 // Constants used in ReadPastSize()
 static const double BLOCK_SIZE_PADDING_PERCENT = 0.1;
-static const int REMAINING_BLOCK_SIZE_GUESS = 100 * 1024; // bytes
-static const int MIN_SYNC_READ_SIZE = 10 * 1024; // bytes
+static const int MIN_SYNC_READ_SIZE = 64 * 1024; // bytes
 
 // Macro to convert between SerdeUtil errors to Status returns.
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
@@ -301,11 +300,9 @@ void BaseSequenceScanner::CloseFileRanges(const char* filename) {
 
 int BaseSequenceScanner::ReadPastSize(int64_t file_offset) {
   DCHECK_GE(total_block_size_, 0);
-  if (total_block_size_ == 0) {
-    // This scan range didn't include a complete block, so we have no idea how many bytes
-    // remain in the block. Guess.
-    return REMAINING_BLOCK_SIZE_GUESS;
-  }
+  // This scan range didn't include a complete block, so we have no idea how many bytes
+  // remain in the block. Let ScannerContext use its default strategy.
+  if (total_block_size_ == 0) return 0;
   DCHECK_GE(num_syncs_, 2);
   int average_block_size = total_block_size_ / (num_syncs_ - 1);
 
@@ -315,7 +312,5 @@ int BaseSequenceScanner::ReadPastSize(int64_t file_offset) {
   int bytes_left = max(average_block_size - block_bytes_read, 0);
   // Include some padding
   bytes_left += average_block_size * BLOCK_SIZE_PADDING_PERCENT;
-
-  int max_read_size = state_->io_mgr()->max_read_buffer_size();
-  return min(max(bytes_left, MIN_SYNC_READ_SIZE), max_read_size);
+  return max(bytes_left, MIN_SYNC_READ_SIZE);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3810b7c4/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index dae104d..e8dbe08 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -83,7 +83,7 @@ class HdfsTextScanner : public HdfsScanner {
   bool only_parsing_header_;
 
  private:
-  const static int NEXT_BLOCK_READ_SIZE = 1024; //bytes
+  const static int NEXT_BLOCK_READ_SIZE = 64 * 1024; //bytes
 
   /// Initializes this scanner for this context.  The context maps to a single
   /// scan range.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3810b7c4/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 6a5081d..f7f65be 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -29,7 +29,7 @@
 using namespace impala;
 using namespace strings;
 
-static const int64_t DEFAULT_READ_PAST_SIZE = 1024; // in bytes
+static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 
 // We always want output_buffer_bytes_left_ to be non-NULL, so we can avoid a NULL check
 // in GetBytes(). We use this variable, which is set to 0, to initialize
@@ -57,6 +57,7 @@ void ScannerContext::ReleaseCompletedResources(RowBatch* batch, bool done) {
 
 ScannerContext::Stream::Stream(ScannerContext* parent)
   : parent_(parent),
+    next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
     boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
     boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
 }
@@ -146,10 +147,19 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   } else {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
-    int64_t read_past_buffer_size = read_past_size_cb_.empty() ?
-        DEFAULT_READ_PAST_SIZE : read_past_size_cb_(offset);
+    int64_t read_past_buffer_size = 0;
+    int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size();
+    if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset);
+    if (read_past_buffer_size <= 0) {
+      // Either no callback was set or the callback did not return an estimate. Use
+      // the default doubling strategy.
+      read_past_buffer_size = next_read_past_size_bytes_;
+      next_read_past_size_bytes_ =
+          min<int64_t>(next_read_past_size_bytes_ * 2, max_buffer_size);
+    }
     read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
     read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
+    read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
     // We're reading past the scan range. Be careful not to read past the end of file.
     DCHECK_GE(read_past_buffer_size, 0);
     if (read_past_buffer_size == 0) {
@@ -235,25 +245,24 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
     }
   }
 
-  // Resize the buffer to the right size.
-  RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
-
   while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
-    // We need to fetch more bytes. Copy the end of the current buffer and fetch the next
-    // one.
-    RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
-    boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
+    // We must copy the remainder of 'io_buffer_' to 'boundary_buffer_' before advancing
+    // to handle the case when the read straddles a block boundary. Preallocate
+    // 'boundary_buffer_' to avoid unnecessary resizes for large reads.
+    if (io_buffer_bytes_left_ > 0) {
+      RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
+      RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
+      boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
+    }
 
-    RETURN_IF_ERROR(GetNextBuffer());
+    int64_t remaining_requested_len = requested_len - boundary_buffer_->len();
+    RETURN_IF_ERROR(GetNextBuffer(remaining_requested_len));
     if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
-
-    if (io_buffer_bytes_left_ == 0) {
-      // No more bytes (i.e. EOF)
-      break;
-    }
+    // No more bytes (i.e. EOF).
+    if (io_buffer_bytes_left_ == 0) break;
   }
 
-  // We have enough bytes in io_buffer_ or couldn't read more bytes
+  // We have read the full 'requested_len' bytes or couldn't read more bytes.
   int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_;
   DCHECK_GE(requested_bytes_left, 0);
   int64_t num_bytes = min(io_buffer_bytes_left_, requested_bytes_left);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3810b7c4/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 5d6ea06..2ab35e6 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -97,11 +97,16 @@ class ScannerContext {
     void set_contains_tuple_data(bool v) { contains_tuple_data_ = v; }
 
     /// Callback that returns the buffer size to use when reading past the end of the scan
-    /// range.  By default a constant value is used, which scanners can override with this
-    /// callback.  The callback takes the file offset of the asynchronous read (this may be
-    /// more than file_offset() due to data being assembled in the boundary buffer).
-    /// Reading past the end of the scan range is likely a remote read, so we want to
-    /// minimize the number of io requests as well as the data volume.
+    /// range. Reading past the end of the scan range is likely a remote read, so we want
+    /// find a good trade-off between io requests and data volume. Scanners that have
+    /// some information about the optimal read size can provide this callback to
+    /// override the default read-size doubling strategy (see GetNextBuffer()). If the
+    /// callback returns a positive length, this overrides the default strategy. If the
+    /// callback returns a length greater than the max read size, the max read size will
+    /// be used.
+    ///
+    /// The callback takes the file offset of the asynchronous read (this may be more
+    /// than file_offset() due to data being assembled in the boundary buffer).
     typedef boost::function<int (int64_t)> ReadPastSizeCallback;
     void set_read_past_size_cb(ReadPastSizeCallback cb) { read_past_size_cb_ = cb; }
 
@@ -176,8 +181,14 @@ class ScannerContext {
     /// earlier, i.e. the file was truncated.
     int64_t file_len_;
 
+    /// Callback if a scanner wants to implement custom logic for guessing how far to
+    /// read past the end of the scan range.
     ReadPastSizeCallback read_past_size_cb_;
 
+    /// The next amount we should read past the end of the file, if using the default
+    /// doubling algorithm. Unused if 'read_past_size_cb_' is set.
+    int64_t next_read_past_size_bytes_;
+
     /// The current io buffer. This starts as NULL before we've read any bytes.
     DiskIoMgr::BufferDescriptor* io_buffer_;
 
@@ -223,14 +234,16 @@ class ScannerContext {
     /// Gets (and blocks) for the next io buffer. After fetching all buffers in the scan
     /// range, performs synchronous reads past the scan range until EOF.
     //
-    /// When performing a synchronous read, the read size is the max of read_past_size and
-    /// the result returned by read_past_size_cb_() (or DEFAULT_READ_PAST_SIZE if no
-    /// callback is set). read_past_size is not used otherwise.
-    //
-    /// Updates io_buffer_, io_buffer_bytes_left_, and io_buffer_pos_.  If GetNextBuffer()
-    /// is called after all bytes in the file have been returned, io_buffer_bytes_left_
-    /// will be set to 0. In the non-error case, io_buffer_ is never set to NULL, even if
-    /// it contains 0 bytes.
+    /// When performing a synchronous read, the read size is the max of 'read_past_size'
+    /// and either the result of read_past_size_cb_(), or the result of iteratively
+    /// doubling INIT_READ_PAST_SIZE up to the max read size. 'read_past_size' is not
+    /// used otherwise. This is done to find a balance between reading too much data
+    /// and issuing too many small reads.
+    ///
+    /// Updates 'io_buffer_', 'io_buffer_bytes_left_', and 'io_buffer_pos_'.  If
+    /// GetNextBuffer() is called after all bytes in the file have been returned,
+    /// 'io_buffer_bytes_left_' will be set to 0. In the non-error case, 'io_buffer_' is
+    /// never set to NULL, even if it contains 0 bytes.
     Status GetNextBuffer(int64_t read_past_size = 0);
 
     /// If 'batch' is not NULL, attaches all completed io buffers and the boundary mem

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3810b7c4/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 3725181..adddb92 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -46,9 +46,10 @@ class StringBuffer {
   }
 
   /// Append 'str' to the current string, allocating a new buffer as necessary.
+  /// Return error status if memory limit is exceeded.
   Status Append(const char* str, int64_t str_len) {
     int64_t new_len = len_ + str_len;
-    if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len));
+    if (UNLIKELY(new_len > buffer_size_)) RETURN_IF_ERROR(GrowBuffer(new_len));
     memcpy(buffer_ + len_, str, str_len);
     len_ += str_len;
     return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3810b7c4/tests/query_test/test_insert.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 6929fca..b359553 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -49,6 +49,35 @@ class TestInsertQueries(ImpalaTestSuite):
       cls.TestMatrix.add_constraint(lambda v:\
           v.get_value('table_format').compression_codec == 'none')
 
+  def test_insert_large_string(self, vector, unique_database):
+    """Test handling of large strings in inserter and scanner."""
+    table_format = vector.get_value('table_format')
+    table_name = unique_database + ".insert_largestring"
+
+    file_format = vector.get_value('table_format').file_format
+    if file_format == "parquet":
+      stored_as = file_format
+    else:
+      assert file_format == "text"
+      stored_as = "textfile"
+    self.client.execute("""
+        create table {0}
+        stored as {1} as
+        select repeat('AZ', 128 * 1024 * 1024) as s""".format(table_name, stored_as))
+
+    # Make sure it produces correct result when materializing no tuples.
+    result = self.client.execute("select count(*) from {0}".format(table_name))
+    assert result.data == ["1"]
+
+    # Make sure it got the length right.
+    result = self.client.execute("select length(s) from {0}".format(table_name))
+    assert result.data == [str(2 * 128 * 1024 * 1024)]
+
+    # Spot-check the data.
+    result = self.client.execute(
+        "select substr(s, 200 * 1024 * 1024, 5) from {0}".format(table_name))
+    assert result.data == ["ZAZAZ"]
+
   @classmethod
   def setup_class(cls):
     super(TestInsertQueries, cls).setup_class()