You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/05 03:18:16 UTC

[05/11] incubator-impala git commit: IMPALA-4674: Part 2: port backend exec to BufferPool

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
deleted file mode 100644
index 41d63bf..0000000
--- a/be/src/runtime/buffered-tuple-stream.h
+++ /dev/null
@@ -1,561 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H
-#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H
-
-#include <vector>
-#include <set>
-
-#include "common/status.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/row-batch.h"
-
-namespace impala {
-
-class BufferedBlockMgr;
-class RuntimeProfile;
-class RuntimeState;
-class RowDescriptor;
-class SlotDescriptor;
-class TupleRow;
-
-/// Class that provides an abstraction for a stream of tuple rows. Rows can be
-/// added to the stream and returned. Rows are returned in the order they are added.
-///
-/// The underlying memory management is done by the BufferedBlockMgr.
-///
-/// The tuple stream consists of a number of small (less than IO-sized blocks) before
-/// an arbitrary number of IO-sized blocks. The smaller blocks do not spill and are
-/// there to lower the minimum buffering requirements. For example, an operator that
-/// needs to maintain 64 streams (1 buffer per partition) would need, by default,
-/// 64 * 8MB = 512MB of buffering. A query with 5 of these operators would require
-/// 2.56GB just to run, regardless of how much of that is used. This is
-/// problematic for small queries. Instead we will start with a fixed number of small
-/// buffers (currently 2 small buffers: one 64KB and one 512KB) and only start using IO
-/// sized buffers when those fill up. The small buffers never spill.
-/// The stream will *not* automatically switch from using small buffers to IO-sized
-/// buffers when all the small buffers for this stream have been used.
-///
-/// The BufferedTupleStream is *not* thread safe from the caller's point of view. It is
-/// expected that all the APIs are called from a single thread. Internally, the
-/// object is thread safe wrt to the underlying block mgr.
-///
-/// Buffer management:
-/// The stream is either pinned or unpinned, set via PinStream() and UnpinStream().
-/// Blocks are optionally deleted as they are read, set with the delete_on_read argument
-/// to PrepareForRead().
-///
-/// Block layout:
-/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a
-/// bitstring at the start of each block with null indicators for all tuples in each row
-/// in the block. The length of the  bitstring is a function of the block size. Row data
-/// is stored after the null indicators if present, or at the start of the block
-/// otherwise. Rows are stored back to back in the stream, with no interleaving of data
-/// from different rows. There is no padding or alignment between rows.
-///
-/// Null tuples:
-/// The order of bits in the null indicators bitstring corresponds to the order of
-/// tuples in the block. The NULL tuples are not stored in the row iself, only as set
-/// bits in the null indicators bitstring.
-///
-/// Tuple row layout:
-/// The fixed length parts of the row's tuples are stored first, followed by var len data
-/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can
-/// point to var len data outside the stream. When reading the stream, the length of each
-/// row's var len data in the stream must be computed to find the next row's start.
-///
-/// The tuple stream supports reading from the stream into RowBatches without copying
-/// out any data: the RowBatches' Tuple pointers will point directly into the stream's
-/// blocks. The fixed length parts follow Impala's internal tuple format, so for the
-/// tuple to be valid, we only need to update pointers to point to the var len data
-/// in the stream. These pointers need to be updated by the stream because a spilled
-/// block may be relocated to a different location in memory. The pointers are updated
-/// lazily upon reading the stream via GetNext() or GetRows().
-///
-/// Example layout for a row with two tuples ((1, "hello"), (2, "world")) with all var
-/// len data stored in the stream:
-///  <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ...
-/// +--------+-----------+-----------+-----------+-------------+
-/// | IntVal | StringVal | BigIntVal | StringVal |             | ...
-/// +--------+-----------+-----------+-----------++------------+
-/// | val: 1 | len: 5    | val: 2    | len: 5    | helloworld  | ...
-/// |        | ptr: 0x.. |           | ptr: 0x.. |             | ...
-/// +--------+-----------+-----------+-----------+-------------+
-///  <--4b--> <---12b---> <----8b---> <---12b---> <----10b---->
-//
-/// Example layout for a row with a single tuple (("hello", "world")) with the second
-/// string slot stored externally to the stream:
-///  <------ tuple 1 ------> <- var len ->  <- next row ...
-/// +-----------+-----------+-------------+
-/// | StringVal | StringVal |             | ...
-/// +-----------+-----------+-------------+
-/// | len: 5    | len: 5    |  hello      | ...
-/// | ptr: 0x.. | ptr: 0x.. |             | ...
-/// +-----------+-----------+-------------+
-///  <---12b---> <---12b---> <-----5b---->
-///
-/// The behavior of reads and writes is as follows:
-/// Read:
-///   1. Delete on read (delete_on_read_): Blocks are deleted as we go through the stream.
-///   The data returned by the tuple stream is valid until the next read call so the
-///   caller does not need to copy if it is streaming.
-///   2. Unpinned: Blocks remain in blocks_ and are unpinned after reading.
-///   3. Pinned: Blocks remain in blocks_ and are left pinned after reading. If the next
-///   block in the stream cannot be pinned, the read call will fail and the caller needs
-///   to free memory from the underlying block mgr.
-/// Write:
-///   1. Unpinned: Unpin blocks as they fill up. This means only a single (i.e. the
-///   current) block needs to be in memory regardless of the input size (if read_write is
-///   true, then two blocks need to be in memory).
-///   2. Pinned: Blocks are left pinned. If we run out of blocks, the write will fail and
-///   the caller needs to free memory from the underlying block mgr.
-///
-/// Memory lifetime of rows read from stream:
-/// If the stream is pinned, it is valid to access any tuples returned via
-/// GetNext() or GetRows() until the stream is unpinned. If the stream is unpinned, and
-/// the batch returned from GetNext() has the needs_deep_copy flag set, any tuple memory
-/// returned so far from the stream may be freed on the next call to GetNext().
-///
-/// Manual construction of rows with AllocateRow():
-/// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow().
-/// The caller of AllocateRow() is responsible for writing the row with exactly the
-/// layout described above.
-///
-/// If a caller constructs a tuple in this way, the caller can set the pointers and they
-/// will not be modified until the stream is read via GetNext() or GetRows().
-///
-/// TODO: we need to be able to do read ahead in the BufferedBlockMgr. It currently
-/// only has PinAllBlocks() which is blocking. We need a non-blocking version of this or
-/// some way to indicate a block will need to be pinned soon.
-/// TODO: see if this can be merged with Sorter::Run. The key difference is that this
-/// does not need to return rows in the order they were added, which allows it to be
-/// simpler.
-/// TODO: we could compact the small buffers when we need to spill but they use very
-/// little memory so ths might not be very useful.
-/// TODO: improvements:
-///   - It would be good to allocate the null indicators at the end of each block and grow
-///     this array as new rows are inserted in the block. If we do so, then there will be
-///     fewer gaps in case of many rows with NULL tuples.
-///   - We will want to multithread this. Add a AddBlock() call so the synchronization
-///     happens at the block level. This is a natural extension.
-///   - Instead of allocating all blocks from the block_mgr, allocate some blocks that
-///     are much smaller (e.g. 16K and doubling up to the block size). This way, very
-///     small streams (a common case) will use very little memory. This small blocks
-///     are always in memory since spilling them frees up negligible memory.
-///   - Return row batches in GetNext() instead of filling one in
-class BufferedTupleStream {
- public:
-  /// Ordinal index into the stream to retrieve a row in O(1) time. This index can
-  /// only be used if the stream is pinned.
-  /// To read a row from a stream we need three pieces of information that we squeeze in
-  /// 64 bits:
-  ///  - The index of the block. The block id is stored in 16 bits. We can have up to
-  ///    64K blocks per tuple stream. With 8MB blocks that is 512GB per stream.
-  ///  - The offset of the start of the row (data) within the block. Since blocks are 8MB
-  ///    we use 24 bits for the offsets. (In theory we could use 23 bits.)
-  ///  - The idx of the row in the block. We need this for retrieving the null indicators.
-  ///    We use 24 bits for this index as well.
-  struct RowIdx {
-    static const uint64_t BLOCK_MASK  = 0xFFFF;
-    static const uint64_t BLOCK_SHIFT = 0;
-    static const uint64_t OFFSET_MASK  = 0xFFFFFF0000;
-    static const uint64_t OFFSET_SHIFT = 16;
-    static const uint64_t IDX_MASK  = 0xFFFFFF0000000000;
-    static const uint64_t IDX_SHIFT = 40;
-
-    uint64_t block() const {
-      return (data & BLOCK_MASK);
-    }
-
-    uint64_t offset() const {
-      return (data & OFFSET_MASK) >> OFFSET_SHIFT;
-    }
-
-    uint64_t idx() const {
-      return (data & IDX_MASK) >> IDX_SHIFT;
-    }
-
-    uint64_t set(uint64_t block, uint64_t offset, uint64_t idx) {
-      DCHECK_LE(block, BLOCK_MASK)
-          << "Cannot have more than 2^16 = 64K blocks in a tuple stream.";
-      DCHECK_LE(offset, OFFSET_MASK >> OFFSET_SHIFT)
-          << "Cannot have blocks larger than 2^24 = 16MB";
-      DCHECK_LE(idx, IDX_MASK >> IDX_SHIFT)
-          << "Cannot have more than 2^24 = 16M rows in a block.";
-      data = block | (offset << OFFSET_SHIFT) | (idx << IDX_SHIFT);
-      return data;
-    }
-
-    std::string DebugString() const;
-
-    uint64_t data;
-  };
-
-  /// row_desc: description of rows stored in the stream. This is the desc for rows
-  /// that are added and the rows being returned.
-  /// block_mgr: Underlying block mgr that owns the data blocks.
-  /// use_initial_small_buffers: If true, the initial N buffers allocated for the
-  /// tuple stream use smaller than IO-sized buffers.
-  /// read_write: Stream allows interchanging read and write operations. Requires at
-  /// least two blocks may be pinned.
-  /// ext_varlen_slots: set of varlen slots with data stored externally to the stream
-  BufferedTupleStream(RuntimeState* state, const RowDescriptor* row_desc,
-      BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
-      bool use_initial_small_buffers, bool read_write,
-      const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
-
-  ~BufferedTupleStream();
-
-  /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
-  /// once before any of the other APIs.
-  /// If 'pinned' is true, the tuple stream starts of pinned, otherwise it is unpinned.
-  /// If 'profile' is non-NULL, counters are created.
-  /// 'node_id' is only used for error reporting.
-  Status Init(int node_id, RuntimeProfile* profile, bool pinned);
-
-  /// Prepares the stream for writing by attempting to allocate a write block.
-  /// Called after Init() and before the first AddRow() call.
-  /// 'got_buffer': set to true if the first write block was successfully pinned, or
-  ///     false if the block could not be pinned and no error was encountered. Undefined
-  ///     if an error status is returned.
-  Status PrepareForWrite(bool* got_buffer);
-
-  /// Must be called for streams using small buffers to switch to IO-sized buffers.
-  /// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_
-  /// back to false.
-  /// TODO: IMPALA-3200: remove this when small buffers are removed.
-  Status SwitchToIoBuffers(bool* got_buffer);
-
-  /// Adds a single row to the stream. Returns true if the append succeeded, returns false
-  /// and sets 'status' to OK if appending failed but can be retried or returns false and
-  /// sets 'status' to an error if an error occurred.
-  /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow()
-  /// returns an error, it should not be called again. If appending failed without an
-  /// error and the stream is using small buffers, it is valid to call
-  /// SwitchToIoBuffers() then AddRow() again.
-  bool AddRow(TupleRow* row, Status* status) noexcept;
-
-  /// Allocates space to store a row of with fixed length 'fixed_size' and variable
-  /// length data 'varlen_size'. If successful, returns the pointer where fixed length
-  /// data should be stored and assigns 'varlen_data' to where var-len data should
-  /// be stored. Returns NULL if there is not enough memory or an error occurred.
-  /// Sets *status if an error occurred. The returned memory is guaranteed to all
-  /// be allocated in the same block. AllocateRow does not currently support nullable
-  /// tuples.
-  uint8_t* AllocateRow(int fixed_size, int varlen_size, uint8_t** varlen_data,
-      Status* status);
-
-  /// Populates 'row' with the row at 'idx'. The stream must be pinned. The row must have
-  /// been allocated with the stream's row desc.
-  void GetTupleRow(const RowIdx& idx, TupleRow* row) const;
-
-  /// Prepares the stream for reading. If read_write_, this can be called at any time to
-  /// begin reading. Otherwise this must be called after the last AddRow() and
-  /// before GetNext().
-  /// delete_on_read: Blocks are deleted after they are read.
-  /// got_buffer: set to true if the first read block was successfully pinned, or
-  ///     false if the block could not be pinned and no error was encountered.
-  Status PrepareForRead(bool delete_on_read, bool* got_buffer);
-
-  /// Pins all blocks in this stream and switches to pinned mode.
-  /// If there is not enough memory, *pinned is set to false and the stream is unmodified.
-  /// If already_reserved is true, the caller has already made a reservation on
-  /// block_mgr_client_ to pin the stream.
-  Status PinStream(bool already_reserved, bool* pinned);
-
-  /// Modes for UnpinStream().
-  enum UnpinMode {
-    /// All blocks in the stream are unpinned and the read/write positions in the stream
-    /// are reset. No more rows can be written to the stream after this. The stream can
-    /// be re-read from the beginning by calling PrepareForRead().
-    UNPIN_ALL,
-    /// All blocks are unpinned aside from the current read and write blocks (if any),
-    /// which is left in the same state. The unpinned stream can continue being read
-    /// or written from the current read or write positions.
-    UNPIN_ALL_EXCEPT_CURRENT,
-  };
-
-  /// Unpins stream with the given 'mode' as described above.
-  Status UnpinStream(UnpinMode mode);
-
-  /// Get the next batch of output rows. Memory is still owned by the BufferedTupleStream
-  /// and must be copied out by the caller.
-  Status GetNext(RowBatch* batch, bool* eos);
-
-  /// Same as above, but also populate 'indices' with the index of each returned row.
-  Status GetNext(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices);
-
-  /// Returns all the rows in the stream in batch. This pins the entire stream in the
-  /// process.
-  /// *got_rows is false if the stream could not be pinned.
-  Status GetRows(boost::scoped_ptr<RowBatch>* batch, bool* got_rows);
-
-  /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
-  /// attaches any pinned blocks to the batch and deletes unpinned blocks. Otherwise
-  /// deletes all blocks. Does nothing if the stream was already closed. The 'flush'
-  /// mode is forwarded to RowBatch::AddBlock() when attaching blocks.
-  void Close(RowBatch* batch, RowBatch::FlushMode flush);
-
-  /// Number of rows in the stream.
-  int64_t num_rows() const { return num_rows_; }
-
-  /// Number of rows returned via GetNext().
-  int64_t rows_returned() const { return rows_returned_; }
-
-  /// Returns the byte size necessary to store the entire stream in memory.
-  int64_t byte_size() const { return total_byte_size_; }
-
-  /// Returns the byte size of the stream that is currently pinned in memory.
-  /// If ignore_current is true, the write_block_ memory is not included.
-  int64_t bytes_in_mem(bool ignore_current) const;
-
-  bool is_closed() const { return closed_; }
-  bool is_pinned() const { return pinned_; }
-  int blocks_pinned() const { return num_pinned_; }
-  int blocks_unpinned() const { return blocks_.size() - num_pinned_ - num_small_blocks_; }
-  bool has_read_block() const { return read_block_ != blocks_.end(); }
-  bool has_write_block() const { return write_block_ != NULL; }
-  bool using_small_buffers() const { return use_small_buffers_; }
-
-  /// Returns true if the row consumes any memory. If false, the stream only needs to
-  /// store the count of rows.
-  bool RowConsumesMemory() const {
-    return fixed_tuple_row_size_ > 0 || has_nullable_tuple_;
-  }
-
-  std::string DebugString() const;
-
- private:
-  friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
-  friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
-  friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
-  friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test;
-
-  /// Runtime state instance used to check for cancellation. Not owned.
-  RuntimeState* const state_;
-
-  /// Description of rows stored in the stream.
-  const RowDescriptor* desc_;
-
-  /// Sum of the fixed length portion of all the tuples in desc_.
-  int fixed_tuple_row_size_;
-
-  /// The size of the fixed length portion for each tuple in the row.
-  std::vector<int> fixed_tuple_sizes_;
-
-  /// Max size (in bytes) of null indicators bitmap in the current read and write
-  /// blocks. If 0, it means that there is no need to store null indicators for this
-  /// RowDesc. We calculate this value based on the block's size and the
-  /// fixed_tuple_row_size_. When not 0, this value is also an upper bound for the number
-  /// of (rows * tuples_per_row) in this block.
-  int read_block_null_indicators_size_;
-  int write_block_null_indicators_size_;
-
-  /// Size (in bytes) of the null indicators bitmap reserved in a block of maximum
-  /// size (i.e. IO block size). 0 if no tuple is nullable.
-  int max_null_indicators_size_;
-
-  /// Vectors of all the strings slots that have their varlen data stored in stream
-  /// grouped by tuple_idx.
-  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_;
-
-  /// Vectors of all the collection slots that have their varlen data stored in the
-  /// stream, grouped by tuple_idx.
-  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_;
-
-  /// Block manager and client used to allocate, pin and release blocks. Not owned.
-  BufferedBlockMgr* block_mgr_;
-  BufferedBlockMgr::Client* block_mgr_client_;
-
-  /// List of blocks in the stream.
-  std::list<BufferedBlockMgr::Block*> blocks_;
-
-  /// Total size of blocks_, including small blocks.
-  int64_t total_byte_size_;
-
-  /// Iterator pointing to the current block for read. Equal to list.end() until
-  /// PrepareForRead() is called.
-  std::list<BufferedBlockMgr::Block*>::iterator read_block_;
-
-  /// For each block in the stream, the buffer of the start of the block. This is only
-  /// valid when the stream is pinned, giving random access to data in the stream.
-  /// This is not maintained for delete_on_read_.
-  std::vector<uint8_t*> block_start_idx_;
-
-  /// Current idx of the tuple read from the read_block_ buffer.
-  uint32_t read_tuple_idx_;
-
-  /// Current offset in read_block_ of the end of the last data read.
-  uint8_t* read_ptr_;
-
-  /// Pointer to one byte past the end of read_block_.
-  uint8_t* read_end_ptr_;
-
-  /// Current idx of the tuple written at the write_block_ buffer.
-  uint32_t write_tuple_idx_;
-
-  /// Pointer into write_block_ of the end of the last data written.
-  uint8_t*  write_ptr_;
-
-  /// Pointer to one byte past the end of write_block_.
-  uint8_t* write_end_ptr_;
-
-  /// Number of rows returned to the caller from GetNext().
-  int64_t rows_returned_;
-
-  /// The block index of the current read block in blocks_.
-  int read_block_idx_;
-
-  /// The current block for writing. NULL if there is no available block to write to.
-  /// The entire write_block_ buffer is marked as allocated, so any data written into
-  /// the buffer will be spilled without having to allocate additional space.
-  BufferedBlockMgr::Block* write_block_;
-
-  /// Number of pinned blocks in blocks_, stored to avoid iterating over the list
-  /// to compute bytes_in_mem and bytes_unpinned.
-  /// This does not include small blocks.
-  int num_pinned_;
-
-  /// The total number of small blocks in blocks_;
-  int num_small_blocks_;
-
-  /// Number of rows stored in the stream.
-  int64_t num_rows_;
-
-  /// Counters added by this object to the parent runtime profile.
-  RuntimeProfile::Counter* pin_timer_;
-  RuntimeProfile::Counter* unpin_timer_;
-  RuntimeProfile::Counter* get_new_block_timer_;
-
-  /// If true, read and write operations may be interleaved. Otherwise all calls
-  /// to AddRow() must occur before calling PrepareForRead() and subsequent calls to
-  /// GetNext().
-  const bool read_write_;
-
-  /// Whether any tuple in the rows is nullable.
-  const bool has_nullable_tuple_;
-
-  /// If true, this stream is still using small buffers.
-  bool use_small_buffers_;
-
-  /// If true, blocks are deleted after they are read.
-  bool delete_on_read_;
-
-  bool closed_; // Used for debugging.
-
-  /// If true, this stream has been explicitly pinned by the caller. This changes the
-  /// memory management of the stream. The blocks are not unpinned until the caller calls
-  /// UnpinAllBlocks(). If false, only the write_block_ and/or read_block_ are pinned
-  /// (both are if read_write_ is true).
-  bool pinned_;
-
-  /// The slow path for AddRow() that is called if there is not sufficient space in
-  /// the current block.
-  bool AddRowSlow(TupleRow* row, Status* status) noexcept;
-
-  /// Copies 'row' into write_block_. Returns false if there is not enough space in
-  /// 'write_block_'. After returning false, write_ptr_ may be left pointing to the
-  /// partially-written row, and no more data can be written to write_block_.
-  template <bool HAS_NULLABLE_TUPLE>
-  bool DeepCopyInternal(TupleRow* row) noexcept;
-
-  /// Helper function to copy strings in string_slots from tuple into write_block_.
-  /// Updates write_ptr_ to the end of the string data added. Returns false if the data
-  /// does not fit in the current write block. After returning false, write_ptr_ is left
-  /// pointing to the partially-written row, and no more data can be written to
-  /// write_block_.
-  bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots);
-
-  /// Helper function to deep copy collections in collection_slots from tuple into
-  /// write_block_. Updates write_ptr_ to the end of the collection data added. Returns
-  /// false if the data does not fit in the current write block.. After returning false,
-  /// write_ptr_ is left pointing to the partially-written row, and no more data can be
-  /// written to write_block_.
-  bool CopyCollections(const Tuple* tuple,
-      const std::vector<SlotDescriptor*>& collection_slots);
-
-  /// Wrapper of the templated DeepCopyInternal() function.
-  bool DeepCopy(TupleRow* row) noexcept;
-
-  /// Gets a new block of 'block_len' bytes from the block_mgr_, updating write_block_,
-  /// write_tuple_idx_, write_ptr_ and write_end_ptr_. 'null_indicators_size' is the
-  /// number of bytes that will be reserved in the block for the null indicators bitmap.
-  /// *got_block is set to true if a block was successfully acquired. Null indicators
-  /// (if any) will also be reserved and initialized. If there are no blocks available,
-  /// *got_block is set to false and write_block_ is unchanged.
-  Status NewWriteBlock(
-      int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept;
-
-  /// A wrapper around NewWriteBlock(). 'row_size' is the size of the tuple row to be
-  /// appended to this block. This function determines the block size required in order
-  /// to fit the row and null indicators.
-  Status NewWriteBlockForRow(int64_t row_size, bool* got_block) noexcept;
-
-  /// Reads the next block from the block_mgr_. This blocks if necessary.
-  /// Updates read_block_, read_ptr_, read_tuple_idx_ and read_end_ptr_.
-  Status NextReadBlock();
-
-  /// Returns the total additional bytes that this row will consume in write_block_ if
-  /// appended to the block. This includes the fixed length part of the row and the
-  /// data for inlined_string_slots_ and inlined_coll_slots_.
-  int64_t ComputeRowSize(TupleRow* row) const noexcept;
-
-  /// Unpins block if it is an IO-sized block and updates tracking stats.
-  Status UnpinBlock(BufferedBlockMgr::Block* block);
-
-  /// Templated GetNext implementations.
-  template <bool FILL_INDICES>
-  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices);
-  template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE>
-  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices);
-
-  /// Helper function for GetNextInternal(). For each string slot in string_slots,
-  /// update StringValue's ptr field to point to the corresponding string data stored
-  /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the
-  /// StringValue's length field.
-  void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple);
-
-  /// Helper function for GetNextInternal(). For each collection slot in collection_slots,
-  /// recursively update any pointers in the CollectionValue to point to the corresponding
-  /// var len data stored inline in the stream, advancing read_ptr_ as data is read.
-  /// Assumes that the collection was serialized to the stream in DeepCopy()'s format.
-  void FixUpCollectionsForRead(const vector<SlotDescriptor*>& collection_slots,
-      Tuple* tuple);
-
-  /// Computes the number of bytes needed for null indicators for a block of 'block_size'.
-  /// Return 0 if no tuple is nullable. Return -1 if a single row of fixed-size tuples
-  /// plus its null indicator (if any) cannot fit in the block.
-  int ComputeNumNullIndicatorBytes(int block_size) const;
-
-  uint32_t read_block_bytes_remaining() const {
-    DCHECK_GE(read_end_ptr_, read_ptr_);
-    DCHECK_LE(read_end_ptr_ - read_ptr_, (*read_block_)->buffer_len());
-    return read_end_ptr_ - read_ptr_;
-  }
-
-  uint32_t write_block_bytes_remaining() const {
-    DCHECK_GE(write_end_ptr_, write_ptr_);
-    DCHECK_LE(write_end_ptr_ - write_ptr_, write_block_->buffer_len());
-    return write_end_ptr_ - write_ptr_;
-  }
-
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h
deleted file mode 100644
index ba6bb8c..0000000
--- a/be/src/runtime/buffered-tuple-stream.inline.h
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H
-#define IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H
-
-#include "runtime/buffered-tuple-stream.h"
-
-#include "runtime/descriptors.h"
-#include "runtime/tuple-row.h"
-
-namespace impala {
-
-inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept {
-  DCHECK(!closed_);
-  if (LIKELY(DeepCopy(row))) return true;
-  return AddRowSlow(row, status);
-}
-
-inline uint8_t* BufferedTupleStream::AllocateRow(int fixed_size, int varlen_size,
-    uint8_t** varlen_data, Status* status) {
-  DCHECK(!closed_);
-  DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable tuples";
-  const int total_size = fixed_size + varlen_size;
-  if (UNLIKELY(write_block_ == NULL || write_block_bytes_remaining() < total_size)) {
-    bool got_block;
-    *status = NewWriteBlockForRow(total_size, &got_block);
-    if (!status->ok() || !got_block) return NULL;
-  }
-  DCHECK(write_block_ != NULL);
-  DCHECK(write_block_->is_pinned());
-  DCHECK_GE(write_block_bytes_remaining(), total_size);
-  ++num_rows_;
-  write_block_->AddRow();
-
-  uint8_t* fixed_data = write_ptr_;
-  write_ptr_ += fixed_size;
-  *varlen_data = write_ptr_;
-  write_ptr_ += varlen_size;
-  return fixed_data;
-}
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 9b16112..83f2e6a 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -308,6 +308,16 @@ int64_t BufferPool::ClientHandle::GetUnusedReservation() const {
   return impl_->reservation()->GetUnusedReservation();
 }
 
+bool BufferPool::ClientHandle::TransferReservationFrom(
+    ReservationTracker* src, int64_t bytes) {
+  return src->TransferReservationTo(impl_->reservation(), bytes);
+}
+
+bool BufferPool::ClientHandle::TransferReservationTo(
+    ReservationTracker* dst, int64_t bytes) {
+  return impl_->reservation()->TransferReservationTo(dst, bytes);
+}
+
 void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t bytes) {
   DCHECK_EQ(dst->tracker_->parent(), impl_->reservation());
   bool success = impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes);
@@ -355,7 +365,7 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
   RuntimeProfile* child_profile = profile->CreateChild("Buffer pool", true, true);
   reservation_.InitChildTracker(
       child_profile, parent_reservation, mem_tracker, reservation_limit);
-  counters_.alloc_time = ADD_TIMER(profile, "AllocTime");
+  counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
   counters_.cumulative_allocations =
       ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);
   counters_.cumulative_bytes_alloced =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index f2ff99b..e3df8df 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -338,6 +338,14 @@ class BufferPool::ClientHandle {
   int64_t GetUsedReservation() const;
   int64_t GetUnusedReservation() const;
 
+  /// Try to transfer 'bytes' of reservation from 'src' to this client using
+  /// ReservationTracker::TransferReservationTo().
+  bool TransferReservationFrom(ReservationTracker* src, int64_t bytes);
+
+  /// Transfer 'bytes' of reservation from this client to 'dst' using
+  /// ReservationTracker::TransferReservationTo().
+  bool TransferReservationTo(ReservationTracker* dst, int64_t bytes);
+
   bool is_registered() const { return impl_ != NULL; }
 
   std::string DebugString() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 4d525c0..80084bc 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -127,6 +127,10 @@ class ReservationTracker {
   /// Returns true if the reservation increase was successful or not necessary.
   bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
 
+  /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's
+  /// reservation must be at least 'bytes' before calling this method.
+  void DecreaseReservation(int64_t bytes) { DecreaseReservation(bytes, false); }
+
   /// Transfer reservation from this tracker to 'other'. Both trackers must be in the
   /// same query subtree of the hierarchy. One tracker can be the ancestor of the other,
   /// or they can share a common ancestor. The subtree root must be at the query level

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 3393ab3..55042d8 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -83,7 +83,7 @@ DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads");
 // not introduce seeks.  The literature seems to agree that with 8 MB reads, random
 // io and sequential io perform similarly.
 DEFINE_int32(read_size, 8 * 1024 * 1024, "Read Size (in bytes)");
-DEFINE_int32(min_buffer_size, 1024, "The minimum read buffer size (in bytes)");
+DECLARE_int64(min_buffer_size);
 
 // With 1024B through 8MB buffers, this is up to ~2GB of buffers.
 DEFINE_int32(max_free_io_buffers, 128,
@@ -937,9 +937,8 @@ void DiskIoMgr::HandleWriteFinished(
   int disk_id = write_range->disk_id_;
 
   // Execute the callback before decrementing the thread count. Otherwise CancelContext()
-  // that waits for the disk ref count to be 0 will return, creating a race, e.g.
-  // between BufferedBlockMgr::WriteComplete() and BufferedBlockMgr::~BufferedBlockMgr().
-  // See IMPALA-1890.
+  // that waits for the disk ref count to be 0 will return, creating a race, e.g. see
+  // IMPALA-1890.
   // The status of the write does not affect the status of the writer context.
   write_range->callback_(write_status);
   {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 960e3c9..f2ee6f0 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -75,6 +75,8 @@ DEFINE_int32(state_store_subscriber_port, 23000,
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
 DEFINE_bool(disable_admission_control, false, "Disables admission control.");
+DEFINE_int64(min_buffer_size, 64 * 1024,
+    "(Advanced) The minimum buffer size to use in the buffer pool");
 
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
@@ -204,13 +206,14 @@ Status ExecEnv::StartServices() {
   // memory limit either based on the available physical memory, or if overcommitting
   // is turned off, we use the memory commit limit from /proc/meminfo (see
   // IMPALA-1690).
-  // --mem_limit="" means no memory limit
+  // --mem_limit="" means no memory limit. TODO: IMPALA-5652: deprecate this mode
   int64_t bytes_limit = 0;
   bool is_percent;
+  int64_t system_mem;
   if (MemInfo::vm_overcommit() == 2 &&
       MemInfo::commit_limit() < MemInfo::physical_mem()) {
-    bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent,
-        MemInfo::commit_limit());
+    system_mem = MemInfo::commit_limit();
+    bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem);
     // There might be the case of misconfiguration, when on a system swap is disabled
     // and overcommitting is turned off the actual usable memory is less than the
     // available physical memory.
@@ -225,14 +228,23 @@ Status ExecEnv::StartServices() {
                  << "/proc/sys/vm/overcommit_memory and "
                  << "/proc/sys/vm/overcommit_ratio.";
   } else {
-    bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent,
-        MemInfo::physical_mem());
+    system_mem = MemInfo::physical_mem();
+    bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem);
   }
-
+  // ParseMemSpec returns 0 to mean unlimited. TODO: IMPALA-5652: deprecate this mode.
+  bool no_process_mem_limit = bytes_limit == 0;
   if (bytes_limit < 0) {
     return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'.");
   }
 
+  if (!BitUtil::IsPowerOf2(FLAGS_min_buffer_size)) {
+    return Status(Substitute(
+        "--min_buffer_size must be a power-of-two: $0", FLAGS_min_buffer_size));
+  }
+  int64_t buffer_pool_capacity = BitUtil::RoundDown(
+      no_process_mem_limit ? system_mem : bytes_limit * 4 / 5, FLAGS_min_buffer_size);
+  InitBufferPool(FLAGS_min_buffer_size, buffer_pool_capacity);
+
   metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr);
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
@@ -240,8 +252,8 @@ Status ExecEnv::StartServices() {
       metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
 
   // Limit of -1 means no memory limit.
-  mem_tracker_.reset(new MemTracker(
-      AggregateMemoryMetrics::TOTAL_USED, bytes_limit > 0 ? bytes_limit : -1, "Process"));
+  mem_tracker_.reset(new MemTracker(AggregateMemoryMetrics::TOTAL_USED,
+      no_process_mem_limit ? -1 : bytes_limit, "Process"));
   if (buffer_pool_ != nullptr) {
     // Add BufferPool MemTrackers for cached memory that is not tracked against queries
     // but is included in process memory consumption.
@@ -270,6 +282,8 @@ Status ExecEnv::StartServices() {
   }
   LOG(INFO) << "Using global memory limit: "
             << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
+  LOG(INFO) << "Buffer pool capacity: "
+            << PrettyPrinter::Print(buffer_pool_capacity, TUnit::BYTES);
 
   RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
 
@@ -310,9 +324,8 @@ Status ExecEnv::StartServices() {
   return Status::OK();
 }
 
-void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) {
-  DCHECK(buffer_pool_ == nullptr);
-  buffer_pool_.reset(new BufferPool(min_page_size, capacity));
+void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity) {
+  buffer_pool_.reset(new BufferPool(min_buffer_size, capacity));
   buffer_reservation_.reset(new ReservationTracker());
   buffer_reservation_->InitRootTracker(nullptr, capacity);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 4674072..63d2e0b 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -159,8 +159,8 @@ class ExecEnv {
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
 
   /// Query-wide buffer pool and the root reservation tracker for the pool. The
-  /// reservation limit is equal to the maximum capacity of the pool.
-  /// For now this is only used by backend tests that create them via InitBufferPool();
+  /// reservation limit is equal to the maximum capacity of the pool. Created in
+  /// InitBufferPool();
   boost::scoped_ptr<ReservationTracker> buffer_reservation_;
   boost::scoped_ptr<BufferPool> buffer_pool_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 2385eab..07b3f1c 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -126,8 +126,6 @@ Status FragmentInstanceState::Prepare() {
   profile()->AddChild(timings_profile_);
   SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
 
-  // TODO: move this into a RuntimeState::Init()
-  RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
   runtime_state_->InitFilterBank();
 
   // Reserve one main thread from the pool

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/initial-reservations.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc
new file mode 100644
index 0000000..4987ec3
--- /dev/null
+++ b/be/src/runtime/initial-reservations.cc
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/initial-reservations.h"
+
+#include <limits>
+
+#include <boost/thread/mutex.hpp>
+#include <gflags/gflags.h>
+
+#include "common/logging.h"
+#include "common/object-pool.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "util/debug-util.h"
+
+#include "common/names.h"
+
+using std::numeric_limits;
+
+DECLARE_int32(be_port);
+DECLARE_string(hostname);
+
+namespace impala {
+
+InitialReservations::InitialReservations(ObjectPool* obj_pool,
+    ReservationTracker* query_reservation, MemTracker* query_mem_tracker,
+    int64_t initial_reservation_total_claims)
+  : remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+  MemTracker* initial_reservation_tracker = obj_pool->Add(
+      new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false));
+  initial_reservations_.InitChildTracker(nullptr, query_reservation,
+      initial_reservation_tracker, numeric_limits<int64_t>::max());
+}
+
+Status InitialReservations::Init(
+    const TUniqueId& query_id, int64_t query_min_reservation) {
+  DCHECK_EQ(0, initial_reservations_.GetReservation()) << "Already inited";
+  if (!initial_reservations_.IncreaseReservation(query_min_reservation)) {
+    return Status(TErrorCode::MINIMUM_RESERVATION_UNAVAILABLE,
+        PrettyPrinter::Print(query_min_reservation, TUnit::BYTES), FLAGS_hostname,
+        FLAGS_be_port, PrintId(query_id),
+        ExecEnv::GetInstance()->process_mem_tracker()->LogUsage());
+  }
+  VLOG_QUERY << "Successfully claimed initial reservations ("
+            << PrettyPrinter::Print(query_min_reservation, TUnit::BYTES) << ") for"
+            << " query " << PrintId(query_id);
+  return Status::OK();
+}
+
+void InitialReservations::Claim(BufferPool::ClientHandle* dst, int64_t bytes) {
+  DCHECK_GE(bytes, 0);
+  lock_guard<SpinLock> l(lock_);
+  DCHECK_LE(bytes, remaining_initial_reservation_claims_);
+  bool success = dst->TransferReservationFrom(&initial_reservations_, bytes);
+  DCHECK(success) << "Planner computation should ensure enough initial reservations";
+  remaining_initial_reservation_claims_ -= bytes;
+}
+
+void InitialReservations::Return(BufferPool::ClientHandle* src, int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  bool success = src->TransferReservationTo(&initial_reservations_, bytes);
+  // No limits on our tracker - no way this should fail.
+  DCHECK(success);
+  // Check to see if we can release any reservation.
+  int64_t excess_reservation =
+    initial_reservations_.GetReservation() - remaining_initial_reservation_claims_;
+  if (excess_reservation > 0) {
+    initial_reservations_.DecreaseReservation(excess_reservation);
+  }
+}
+
+void InitialReservations::ReleaseResources() {
+  initial_reservations_.Close();
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/initial-reservations.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.h b/be/src/runtime/initial-reservations.h
new file mode 100644
index 0000000..dfcb114
--- /dev/null
+++ b/be/src/runtime/initial-reservations.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_INITIAL_RESERVATIONS_H
+#define IMPALA_RUNTIME_INITIAL_RESERVATIONS_H
+
+#include "common/status.h"
+#include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class ObjectPool;
+
+/**
+ * Manages the pool of initial reservations for different nodes in the plan tree.
+ * Each plan node and sink claims its initial reservation from here, then returns it when
+ * it is done executing. The frontend is responsible for making sure that enough initial
+ * reservation is in this pool for all of the concurrent claims.
+ */
+class InitialReservations {
+ public:
+  /// 'query_reservation' and 'query_mem_tracker' are the top-level trackers for the
+  /// query. This creates trackers for initial reservations under those.
+  /// 'initial_reservation_total_claims' is the total of initial reservations that will be
+  /// claimed over the lifetime of the query. The total bytes claimed via Claim()
+  /// cannot exceed this. Allocated objects are stored in 'obj_pool'.
+  InitialReservations(ObjectPool* obj_pool, ReservationTracker* query_reservation,
+      MemTracker* query_mem_tracker, int64_t initial_reservation_total_claims);
+
+  /// Initialize the query's pool of initial reservations by acquiring the minimum
+  /// reservation required for the query on this host. Fails if the reservation could
+  /// not be acquired, e.g. because it would exceed a pool or process limit.
+  Status Init(
+      const TUniqueId& query_id, int64_t query_min_reservation) WARN_UNUSED_RESULT;
+
+  /// Claim the initial reservation of 'bytes' for 'dst'. Assumes that the transfer will
+  /// not violate any reservation limits on 'dst'.
+  void Claim(BufferPool::ClientHandle* dst, int64_t bytes);
+
+  /// Return the initial reservation of 'bytes' from 'src'. The reservation is returned
+  /// to the pool of reservations if it may be needed to satisfy a subsequent claim or
+  /// otherwise is released.
+  void Return(BufferPool::ClientHandle* src, int64_t bytes);
+
+  /// Release any reservations held onto by this object.
+  void ReleaseResources();
+
+ private:
+  // Protects all below members to ensure that the internal state is consistent.
+  SpinLock lock_;
+
+  // The pool of initial reservations that Claim() returns reservations from and
+  // Return() returns reservations to.
+  ReservationTracker initial_reservations_;
+
+  /// The total bytes of additional reservations that we expect to be claimed.
+  /// initial_reservations_->GetReservation() <= remaining_initial_reservation_claims_.
+  int64_t remaining_initial_reservation_claims_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 6057b52..22c2826 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -124,6 +124,8 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) {
   }
 #endif
 
+  // decrement refcount taken in QueryState::Init();
+  qs->ReleaseInitialReservationRefcount();
   // decrement refcount taken in StartQuery()
   ReleaseQueryState(qs);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 21f35fb..64a8c5a 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -21,11 +21,12 @@
 #include <boost/thread/locks.hpp>
 
 #include "exprs/expr.h"
+#include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/backend-client.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/runtime-state.h"
@@ -37,6 +38,20 @@
 
 using namespace impala;
 
+// The fraction of the query mem limit that is used for buffer reservations. Most
+// operators that accumulate memory use reservations, so the majority of memory should
+// be allocated to buffer reservations, as a heuristic.
+// TODO: this will go away once all operators use buffer reservations.
+static const double RESERVATION_MEM_FRACTION = 0.8;
+
+// The minimum amount of memory that should be left after buffer reservations.
+// The limit on reservations is computed as:
+// min(query_limit * RESERVATION_MEM_FRACTION,
+//     query_limit - RESERVATION_MEM_MIN_REMAINING)
+// TODO: this will go away once all operators use buffer reservations and we have accurate
+// minimum requirements.
+static const int64_t RESERVATION_MEM_MIN_REMAINING = 100 * 1024 * 1024;
+
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
   DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -49,8 +64,10 @@ QueryState::ScopedRef::~ScopedRef() {
 
 QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   : query_ctx_(query_ctx),
+    initial_reservation_refcnt_(0),
     refcnt_(0),
-    is_cancelled_(0) {
+    is_cancelled_(0),
+    query_spilled_(0) {
   if (query_ctx_.request_pool.empty()) {
     // fix up pool name for tests
     DCHECK(!request_pool.empty());
@@ -75,6 +92,7 @@ void QueryState::ReleaseResources() {
   // Clean up temporary files.
   if (file_group_ != nullptr) file_group_->Close();
   // Release any remaining reservation.
+  if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources();
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
   // Avoid dangling reference from the parent of 'query_mem_tracker_'.
   if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent();
@@ -85,6 +103,7 @@ void QueryState::ReleaseResources() {
 QueryState::~QueryState() {
   DCHECK(released_resources_);
   DCHECK_EQ(refcnt_.Load(), 0);
+  DCHECK_EQ(initial_reservation_refcnt_.Load(), 0);
 }
 
 Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
@@ -99,9 +118,8 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
         "is over its memory limit", PrintId(query_id()));
     RETURN_IF_ERROR(process_mem_tracker->MemLimitExceeded(NULL, msg, 0));
   }
-  // Do buffer-pool-related setup if running in a backend test that explicitly created
-  // the pool.
-  if (exec_env->buffer_pool() != nullptr) RETURN_IF_ERROR(InitBufferPoolState());
+
+  RETURN_IF_ERROR(InitBufferPoolState());
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   rpc_params_.__set_coord_state_idx(rpc_params.coord_state_idx);
@@ -112,6 +130,15 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
   rpc_params_.__isset.fragment_instance_ctxs = true;
 
+  // Claim the query-wide minimum reservation. Do this last so that we don't need
+  // to handle releasing it if a later step fails.
+  initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
+      buffer_reservation_, query_mem_tracker_,
+      query_ctx_.per_host_initial_reservation_total_claims));
+  RETURN_IF_ERROR(
+      initial_reservations_->Init(query_id(), query_ctx_.per_host_min_reservation));
+  DCHECK_EQ(0, initial_reservation_refcnt_.Load());
+  initial_reservation_refcnt_.Add(1); // Decremented in QueryExecMgr::StartQueryHelper().
   return Status::OK();
 }
 
@@ -129,19 +156,23 @@ void QueryState::InitMemTrackers() {
 
 Status QueryState::InitBufferPoolState() {
   ExecEnv* exec_env = ExecEnv::GetInstance();
-  int64_t query_mem_limit = query_mem_tracker_->limit();
-  if (query_mem_limit == -1) query_mem_limit = numeric_limits<int64_t>::max();
-
-  // TODO: IMPALA-3200: add a default upper bound to buffer pool memory derived from
-  // query_mem_limit.
-  int64_t max_reservation = numeric_limits<int64_t>::max();
-  if (query_options().__isset.max_block_mgr_memory
-      && query_options().max_block_mgr_memory > 0) {
-    max_reservation = query_options().max_block_mgr_memory;
+  int64_t mem_limit = query_mem_tracker_->lowest_limit();
+  int64_t max_reservation;
+  if (query_options().__isset.buffer_pool_limit
+      && query_options().buffer_pool_limit > 0) {
+    max_reservation = query_options().buffer_pool_limit;
+  } else if (mem_limit == -1) {
+    // No query mem limit. The process-wide reservation limit is the only limit on
+    // reservations.
+    max_reservation = numeric_limits<int64_t>::max();
+  } else {
+    DCHECK_GE(mem_limit, 0);
+    max_reservation = min<int64_t>(
+        mem_limit * RESERVATION_MEM_FRACTION, mem_limit - RESERVATION_MEM_MIN_REMAINING);
+    max_reservation = max<int64_t>(0, max_reservation);
   }
+  VLOG_QUERY << "Buffer pool limit for " << PrintId(query_id()) << ": " << max_reservation;
 
-  // TODO: IMPALA-3748: claim the query-wide minimum reservation.
-  // For now, rely on exec nodes to grab their minimum reservation during Prepare().
   buffer_reservation_ = obj_pool_.Add(new ReservationTracker);
   buffer_reservation_->InitChildTracker(
       NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation);
@@ -256,6 +287,7 @@ void QueryState::StartFInstances() {
   VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id())
       << " #instances=" << rpc_params_.fragment_instance_ctxs.size();
   DCHECK_GT(refcnt_.Load(), 0);
+  DCHECK_GT(initial_reservation_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
   // set up desc tbl
   DCHECK(query_ctx().__isset.desc_tbl);
@@ -290,6 +322,7 @@ void QueryState::StartFInstances() {
 
     // start new thread to execute instance
     refcnt_.Add(1);  // decremented in ExecFInstance()
+    initial_reservation_refcnt_.Add(1);  // decremented in ExecFInstance()
     string thread_name = Substitute(
         "exec-finstance (finst:$0)", PrintId(instance_ctx.fragment_instance_id));
     Thread t(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
@@ -311,6 +344,12 @@ void QueryState::StartFInstances() {
   instances_prepared_promise_.Set(prepare_status);
 }
 
+void QueryState::ReleaseInitialReservationRefcount() {
+  int32_t new_val = initial_reservation_refcnt_.Add(-1);
+  DCHECK_GE(new_val, 0);
+  if (new_val == 0) initial_reservations_->ReleaseResources();
+}
+
 void QueryState::ExecFInstance(FragmentInstanceState* fis) {
   ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
   ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
@@ -327,6 +366,8 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
   // initiate cancellation if nobody has done so yet
   if (!status.ok()) Cancel();
   // decrement refcount taken in StartFInstances()
+  ReleaseInitialReservationRefcount();
+  // decrement refcount taken in StartFInstances()
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
 }
 
@@ -345,3 +386,21 @@ void QueryState::PublishFilter(int32_t filter_id, int fragment_idx,
     fis->PublishFilter(filter_id, thrift_bloom_filter);
   }
 }
+
+Status QueryState::StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker) {
+  // Return an error message with the root cause of why spilling is disabled.
+  if (query_options().scratch_limit == 0) {
+    return mem_tracker->MemLimitExceeded(
+        runtime_state, "Could not free memory by spilling to disk: scratch_limit is 0");
+  } else if (query_ctx_.disable_spilling) {
+    return mem_tracker->MemLimitExceeded(runtime_state,
+        "Could not free memory by spilling to disk: spilling was disabled by planner. "
+        "Re-enable spilling by setting the query option DISABLE_UNSAFE_SPILLS=false");
+  }
+  // 'file_group_' must be non-NULL for spilling to be enabled.
+  DCHECK(file_group_ != nullptr);
+  if (query_spilled_.CompareAndSwap(0, 1)) {
+    ImpaladMetrics::NUM_QUERIES_SPILLED->Increment(1);
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 9ce4316..fc71772 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -34,8 +34,10 @@
 namespace impala {
 
 class FragmentInstanceState;
+class InitialReservations;
 class MemTracker;
 class ReservationTracker;
+class RuntimeState;
 
 /// Central class for all backend execution state (example: the FragmentInstanceStates
 /// of the individual fragment instances) created for a particular query.
@@ -110,6 +112,7 @@ class QueryState {
 
   // the following getters are only valid after Prepare()
   ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
+  InitialReservations* initial_reservations() const { return initial_reservations_; }
   TmpFileMgr::FileGroup* file_group() const { return file_group_; }
   const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; }
 
@@ -117,8 +120,10 @@ class QueryState {
   const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
 
   /// Sets up state required for fragment execution: memory reservations, etc. Fails
-  /// if resources could not be acquired. Uses few cycles and never blocks.
-  /// Not idempotent, not thread-safe.
+  /// if resources could not be acquired. On success, acquires an initial reservation
+  /// refcount for the caller, which the caller must release by calling
+  /// ReleaseInitialReservationRefcount().
+  /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
   /// The remaining public functions must be called only after Init().
   Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT;
 
@@ -155,6 +160,12 @@ class QueryState {
   /// If there is an error during the rpc, initiates cancellation.
   void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis);
 
+  /// Checks whether spilling is enabled for this query. Must be called before the first
+  /// call to BufferPool::Unpin() for the query. Returns OK if spilling is enabled. If
+  /// spilling is not enabled, logs a MEM_LIMIT_EXCEEDED error from
+  /// tracker->MemLimitExceeded() to 'runtime_state'.
+  Status StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker);
+
   ~QueryState();
 
  private:
@@ -162,6 +173,7 @@ class QueryState {
 
   /// test execution
   friend class RuntimeState;
+  friend class TestEnv;
 
   static const int DEFAULT_BATCH_SIZE = 1024;
 
@@ -176,16 +188,21 @@ class QueryState {
   /// TODO: find a way not to have to copy this
   TExecQueryFInstancesParams rpc_params_;
 
-  /// Buffer reservation for this query (owned by obj_pool_)
-  /// Only non-null in backend tests that explicitly enabled the new buffer pool
-  /// Set in Prepare().
-  /// TODO: this will always be non-null once IMPALA-3200 is done
+  /// Buffer reservation for this query (owned by obj_pool_). Set in Prepare().
   ReservationTracker* buffer_reservation_ = nullptr;
 
-  /// Temporary files for this query (owned by obj_pool_)
-  /// Only non-null in backend tests the explicitly enabled the new buffer pool
-  /// Set in Prepare().
-  /// TODO: this will always be non-null once IMPALA-3200 is done
+  /// Pool of buffer reservations used to distribute initial reservations to operators
+  /// in the query. Contains a ReservationTracker that is a child of
+  /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare().
+  InitialReservations* initial_reservations_ = nullptr;
+
+  /// Number of fragment instances executing, which may need to claim
+  /// from 'initial_reservations_'.
+  /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575).
+  AtomicInt32 initial_reservation_refcnt_;
+
+  /// Temporary files for this query (owned by obj_pool_). Non-null if spilling is
+  /// enabled. Set in Prepare().
   TmpFileMgr::FileGroup* file_group_ = nullptr;
 
   /// created in StartFInstances(), owned by obj_pool_
@@ -214,6 +231,11 @@ class QueryState {
   /// True if and only if ReleaseResources() has been called.
   bool released_resources_ = false;
 
+  /// Whether the query has spilled. 0 if the query has not spilled. Atomically set to 1
+  /// when the query first starts to spill. Required to correctly maintain the
+  /// "num-queries-spilled" metric.
+  AtomicInt32 query_spilled_;
+
   /// Create QueryState w/ refcnt of 0.
   /// The query is associated with the resource pool query_ctx.request_pool or
   /// 'request_pool', if the former is not set (needed for tests).
@@ -222,13 +244,16 @@ class QueryState {
   /// Execute the fragment instance and decrement the refcnt when done.
   void ExecFInstance(FragmentInstanceState* fis);
 
-  /// Called from Prepare() to initialize MemTrackers.
+  /// Called from constructor to initialize MemTrackers.
   void InitMemTrackers();
 
-  /// Called from Prepare() to setup buffer reservations and the
-  /// file group. Fails if required resources are not available.
+  /// Called from Init() to set up buffer reservations and the file group.
   Status InitBufferPoolState() WARN_UNUSED_RESULT;
 
+  /// Decrement 'initial_reservation_refcnt_' and release the initial reservation if it
+  /// goes to zero.
+  void ReleaseInitialReservationRefcount();
+
   /// Same behavior as ReportExecStatus().
   /// Cancel on error only if instances_started is true.
   void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 11cf363..942ac05 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -147,9 +147,6 @@ RowBatch::~RowBatch() {
   for (int i = 0; i < io_buffers_.size(); ++i) {
     ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
   }
-  for (int i = 0; i < blocks_.size(); ++i) {
-    blocks_[i]->Delete();
-  }
   for (BufferInfo& buffer_info : buffers_) {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
         buffer_info.client, &buffer_info.buffer);
@@ -295,14 +292,6 @@ void RowBatch::AddIoBuffer(unique_ptr<DiskIoMgr::BufferDescriptor> buffer) {
   io_buffers_.emplace_back(move(buffer));
 }
 
-void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) {
-  DCHECK(block != NULL);
-  DCHECK(block->is_pinned());
-  blocks_.push_back(block);
-  auxiliary_mem_usage_ += block->buffer_len();
-  if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
-}
-
 void RowBatch::AddBuffer(BufferPool::ClientHandle* client,
     BufferPool::BufferHandle&& buffer, FlushMode flush) {
   auxiliary_mem_usage_ += buffer.len();
@@ -322,10 +311,6 @@ void RowBatch::Reset() {
     ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
   }
   io_buffers_.clear();
-  for (int i = 0; i < blocks_.size(); ++i) {
-    blocks_[i]->Delete();
-  }
-  blocks_.clear();
   for (BufferInfo& buffer_info : buffers_) {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
         buffer_info.client, &buffer_info.buffer);
@@ -342,10 +327,6 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
     dest->AddIoBuffer(move(io_buffers_[i]));
   }
   io_buffers_.clear();
-  for (int i = 0; i < blocks_.size(); ++i) {
-    dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES);
-  }
-  blocks_.clear();
   for (BufferInfo& buffer_info : buffers_) {
     dest->AddBuffer(
         buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 1b75ebb..35a8f14 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -25,7 +25,6 @@
 #include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
-#include "runtime/buffered-block-mgr.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
 #include "runtime/disk-io-mgr.h"
@@ -207,7 +206,6 @@ class RowBatch {
   int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); }
   MemPool* tuple_data_pool() { return &tuple_data_pool_; }
   int num_io_buffers() const { return io_buffers_.size(); }
-  int num_blocks() const { return blocks_.size(); }
   int num_buffers() const { return buffers_.size(); }
 
   /// Resets the row batch, returning all resources it has accumulated.
@@ -216,13 +214,6 @@ class RowBatch {
   /// Add io buffer to this row batch.
   void AddIoBuffer(std::unique_ptr<DiskIoMgr::BufferDescriptor> buffer);
 
-  /// Adds a block to this row batch. The block must be pinned. The blocks must be
-  /// deleted when freeing resources. The block's memory remains accounted against
-  /// the original owner, even when the ownership of batches is transferred. If the
-  /// original owner wants the memory to be released, it should call this with 'mode'
-  /// FLUSH_RESOURCES (see MarkFlushResources() for further explanation).
-  void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush);
-
   /// Adds a buffer to this row batch. The buffer is deleted when freeing resources.
   /// The buffer's memory remains accounted against the original owner, even when the
   /// ownership of batches is transferred. If the original owner wants the memory to be
@@ -426,10 +417,6 @@ class RowBatch {
   /// (i.e. they are not ref counted) so most row batches don't own any.
   std::vector<std::unique_ptr<DiskIoMgr::BufferDescriptor>> io_buffers_;
 
-  /// Blocks attached to this row batch. The underlying memory and block manager client
-  /// are owned by the BufferedBlockMgr.
-  std::vector<BufferedBlockMgr::Block*> blocks_;
-
   struct BufferInfo {
     BufferPool::ClientHandle* client;
     BufferPool::BufferHandle buffer;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index ab70d4a..7b6066a 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -23,6 +23,7 @@
 #include "runtime/runtime-filter-bank.h"
 #include "util/bloom-filter.h"
 #include "util/spinlock.h"
+#include "util/time.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 89eec29..ba8e75d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -17,21 +17,21 @@
 
 #include "runtime/runtime-state.h"
 
-#include <iostream>
 #include <jni.h>
+#include <iostream>
 #include <sstream>
 #include <string>
 
-#include "common/logging.h"
 #include <boost/algorithm/string/join.hpp>
 #include <gutil/strings/substitute.h>
+#include "common/logging.h"
 
 #include "codegen/llvm-codegen.h"
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-fn-call.h"
-#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-recvr.h"
@@ -54,22 +54,10 @@
 #include "common/names.h"
 
 using namespace llvm;
+using strings::Substitute;
 
 DECLARE_int32(max_errors);
 
-// The fraction of the query mem limit that is used for the block mgr. Operators
-// that accumulate memory all use the block mgr so the majority of the memory should
-// be allocated to the block mgr. The remaining memory is used by the non-spilling
-// operators and should be independent of data size.
-static const float BLOCK_MGR_MEM_FRACTION = 0.8f;
-
-// The minimum amount of memory that must be left after the block mgr reserves the
-// BLOCK_MGR_MEM_FRACTION. The block limit is:
-// min(query_limit * BLOCK_MGR_MEM_FRACTION, query_limit - BLOCK_MGR_MEM_MIN_REMAINING)
-// TODO: this value was picked arbitrarily and the tests are written to rely on this
-// for the minimum memory required to run the query. Revisit.
-static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024;
-
 namespace impala {
 
 RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
@@ -82,7 +70,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
         query_state->query_ctx().utc_timestamp_string))),
     exec_env_(exec_env),
     profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
-    instance_buffer_reservation_(nullptr),
+    instance_buffer_reservation_(new ReservationTracker),
     is_cancelled_(false),
     root_node_id_(-1) {
   Init();
@@ -127,8 +115,7 @@ void RuntimeState::Init() {
   instance_mem_tracker_.reset(new MemTracker(
       runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
 
-  if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) {
-    instance_buffer_reservation_ = obj_pool()->Add(new ReservationTracker);
+  if (instance_buffer_reservation_ != nullptr) {
     instance_buffer_reservation_->InitChildTracker(&profile_,
         query_state_->buffer_reservation(), instance_mem_tracker_.get(),
         numeric_limits<int64_t>::max());
@@ -139,28 +126,6 @@ void RuntimeState::InitFilterBank() {
   filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this));
 }
 
-Status RuntimeState::CreateBlockMgr() {
-  DCHECK(block_mgr_.get() == NULL);
-
-  // Compute the max memory the block mgr will use.
-  int64_t block_mgr_limit = query_mem_tracker()->lowest_limit();
-  if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max();
-  block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION),
-      block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING);
-  if (block_mgr_limit < 0) block_mgr_limit = 0;
-  if (query_options().__isset.max_block_mgr_memory &&
-      query_options().max_block_mgr_memory > 0) {
-    block_mgr_limit = query_options().max_block_mgr_memory;
-    LOG(WARNING) << "Block mgr mem limit: "
-                 << PrettyPrinter::Print(block_mgr_limit, TUnit::BYTES);
-  }
-
-  RETURN_IF_ERROR(BufferedBlockMgr::Create(this, query_mem_tracker(),
-      runtime_profile(), exec_env()->tmp_file_mgr(), block_mgr_limit,
-      io_mgr()->max_read_buffer_size(), &block_mgr_));
-  return Status::OK();
-}
-
 Status RuntimeState::CreateCodegen() {
   if (codegen_.get() != NULL) return Status::OK();
   // TODO: add the fragment ID to the codegen ID as well
@@ -179,6 +144,10 @@ Status RuntimeState::CodegenScalarFns() {
   return Status::OK();
 }
 
+Status RuntimeState::StartSpilling(MemTracker* mem_tracker) {
+  return query_state_->StartSpilling(this, mem_tracker);
+}
+
 string RuntimeState::ErrorLog() {
   lock_guard<SpinLock> l(error_log_lock_);
   return PrintErrorMapToString(error_log_);
@@ -270,7 +239,6 @@ void RuntimeState::ReleaseResources() {
   if (resource_pool_ != nullptr) {
     exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
   }
-  block_mgr_.reset(); // Release any block mgr memory, if this is the last reference.
   codegen_.reset(); // Release any memory associated with codegen.
 
   // Release the reservation, which should be unused at the point.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 9a1d0b2..12e7d8c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -32,7 +32,7 @@
 
 namespace impala {
 
-class BufferedBlockMgr;
+class BufferPool;
 class DataStreamRecvr;
 class DescriptorTbl;
 class DiskIoMgr;
@@ -92,9 +92,6 @@ class RuntimeState {
   /// Initializes the runtime filter bank.
   void InitFilterBank();
 
-  /// Gets/Creates the query wide block mgr.
-  Status CreateBlockMgr();
-
   QueryState* query_state() const { return query_state_; }
   /// Return the query's ObjectPool
   ObjectPool* obj_pool() const;
@@ -132,7 +129,7 @@ class RuntimeState {
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
   MemTracker* query_mem_tracker();  // reference to the query_state_'s memtracker
   ReservationTracker* instance_buffer_reservation() {
-    return instance_buffer_reservation_;
+    return instance_buffer_reservation_.get();
   }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
@@ -206,11 +203,6 @@ class RuntimeState {
   /// Unregisters all reader contexts acquired through AcquireReaderContext().
   void UnregisterReaderContexts();
 
-  BufferedBlockMgr* block_mgr() {
-    DCHECK(block_mgr_.get() != NULL);
-    return block_mgr_.get();
-  }
-
   inline Status GetQueryStatus() {
     // Do a racy check for query_status_ to avoid unnecessary spinlock acquisition.
     if (UNLIKELY(!query_status_.ok())) {
@@ -307,21 +299,19 @@ class RuntimeState {
   /// TODO: Fix IMPALA-4233
   Status CodegenScalarFns();
 
+  /// Helper to call QueryState::StartSpilling().
+  Status StartSpilling(MemTracker* mem_tracker);
+
   /// Release resources and prepare this object for destruction.
   void ReleaseResources();
 
  private:
-  /// Allow TestEnv to set block_mgr manually for testing.
+  /// Allow TestEnv to use private methods for testing.
   friend class TestEnv;
 
   /// Set per-fragment state.
   void Init();
 
-  /// Use a custom block manager for the query for testing purposes.
-  void set_block_mgr(const std::shared_ptr<BufferedBlockMgr>& block_mgr) {
-    block_mgr_ = block_mgr;
-  }
-
   /// Lock protecting error_log_
   SpinLock error_log_lock_;
 
@@ -382,9 +372,8 @@ class RuntimeState {
   boost::scoped_ptr<MemTracker> instance_mem_tracker_;
 
   /// Buffer reservation for this fragment instance - a child of the query buffer
-  /// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_
-  /// was created by a backend test. Owned by obj_pool().
-  ReservationTracker* instance_buffer_reservation_;
+  /// reservation. Non-NULL if 'query_state_' is not NULL.
+  boost::scoped_ptr<ReservationTracker> instance_buffer_reservation_;
 
   /// if true, execution should stop with a CANCELLED status
   bool is_cancelled_;
@@ -401,11 +390,6 @@ class RuntimeState {
   SpinLock reader_contexts_lock_;
   std::vector<DiskIoRequestContext*> reader_contexts_;
 
-  /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory
-  /// with a fixed memory budget.
-  /// The block mgr is shared by all fragments for this query.
-  std::shared_ptr<BufferedBlockMgr> block_mgr_;
-
   /// This is the node id of the root node for this plan fragment. This is used as the
   /// hash seed and has two useful properties:
   /// 1) It is the same for all exec nodes in a fragment, so the resulting hash values