You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/05 03:18:16 UTC
[05/11] incubator-impala git commit: IMPALA-4674: Part 2: port
backend exec to BufferPool
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
deleted file mode 100644
index 41d63bf..0000000
--- a/be/src/runtime/buffered-tuple-stream.h
+++ /dev/null
@@ -1,561 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H
-#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H
-
-#include <vector>
-#include <set>
-
-#include "common/status.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/row-batch.h"
-
-namespace impala {
-
-class BufferedBlockMgr;
-class RuntimeProfile;
-class RuntimeState;
-class RowDescriptor;
-class SlotDescriptor;
-class TupleRow;
-
-/// Class that provides an abstraction for a stream of tuple rows. Rows can be
-/// added to the stream and returned. Rows are returned in the order they are added.
-///
-/// The underlying memory management is done by the BufferedBlockMgr.
-///
-/// The tuple stream consists of a number of small (less than IO-sized blocks) before
-/// an arbitrary number of IO-sized blocks. The smaller blocks do not spill and are
-/// there to lower the minimum buffering requirements. For example, an operator that
-/// needs to maintain 64 streams (1 buffer per partition) would need, by default,
-/// 64 * 8MB = 512MB of buffering. A query with 5 of these operators would require
-/// 2.56GB just to run, regardless of how much of that is used. This is
-/// problematic for small queries. Instead we will start with a fixed number of small
-/// buffers (currently 2 small buffers: one 64KB and one 512KB) and only start using IO
-/// sized buffers when those fill up. The small buffers never spill.
-/// The stream will *not* automatically switch from using small buffers to IO-sized
-/// buffers when all the small buffers for this stream have been used.
-///
-/// The BufferedTupleStream is *not* thread safe from the caller's point of view. It is
-/// expected that all the APIs are called from a single thread. Internally, the
-/// object is thread safe wrt to the underlying block mgr.
-///
-/// Buffer management:
-/// The stream is either pinned or unpinned, set via PinStream() and UnpinStream().
-/// Blocks are optionally deleted as they are read, set with the delete_on_read argument
-/// to PrepareForRead().
-///
-/// Block layout:
-/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a
-/// bitstring at the start of each block with null indicators for all tuples in each row
-/// in the block. The length of the bitstring is a function of the block size. Row data
-/// is stored after the null indicators if present, or at the start of the block
-/// otherwise. Rows are stored back to back in the stream, with no interleaving of data
-/// from different rows. There is no padding or alignment between rows.
-///
-/// Null tuples:
-/// The order of bits in the null indicators bitstring corresponds to the order of
-/// tuples in the block. The NULL tuples are not stored in the row iself, only as set
-/// bits in the null indicators bitstring.
-///
-/// Tuple row layout:
-/// The fixed length parts of the row's tuples are stored first, followed by var len data
-/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can
-/// point to var len data outside the stream. When reading the stream, the length of each
-/// row's var len data in the stream must be computed to find the next row's start.
-///
-/// The tuple stream supports reading from the stream into RowBatches without copying
-/// out any data: the RowBatches' Tuple pointers will point directly into the stream's
-/// blocks. The fixed length parts follow Impala's internal tuple format, so for the
-/// tuple to be valid, we only need to update pointers to point to the var len data
-/// in the stream. These pointers need to be updated by the stream because a spilled
-/// block may be relocated to a different location in memory. The pointers are updated
-/// lazily upon reading the stream via GetNext() or GetRows().
-///
-/// Example layout for a row with two tuples ((1, "hello"), (2, "world")) with all var
-/// len data stored in the stream:
-/// <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ...
-/// +--------+-----------+-----------+-----------+-------------+
-/// | IntVal | StringVal | BigIntVal | StringVal | | ...
-/// +--------+-----------+-----------+-----------++------------+
-/// | val: 1 | len: 5 | val: 2 | len: 5 | helloworld | ...
-/// | | ptr: 0x.. | | ptr: 0x.. | | ...
-/// +--------+-----------+-----------+-----------+-------------+
-/// <--4b--> <---12b---> <----8b---> <---12b---> <----10b---->
-//
-/// Example layout for a row with a single tuple (("hello", "world")) with the second
-/// string slot stored externally to the stream:
-/// <------ tuple 1 ------> <- var len -> <- next row ...
-/// +-----------+-----------+-------------+
-/// | StringVal | StringVal | | ...
-/// +-----------+-----------+-------------+
-/// | len: 5 | len: 5 | hello | ...
-/// | ptr: 0x.. | ptr: 0x.. | | ...
-/// +-----------+-----------+-------------+
-/// <---12b---> <---12b---> <-----5b---->
-///
-/// The behavior of reads and writes is as follows:
-/// Read:
-/// 1. Delete on read (delete_on_read_): Blocks are deleted as we go through the stream.
-/// The data returned by the tuple stream is valid until the next read call so the
-/// caller does not need to copy if it is streaming.
-/// 2. Unpinned: Blocks remain in blocks_ and are unpinned after reading.
-/// 3. Pinned: Blocks remain in blocks_ and are left pinned after reading. If the next
-/// block in the stream cannot be pinned, the read call will fail and the caller needs
-/// to free memory from the underlying block mgr.
-/// Write:
-/// 1. Unpinned: Unpin blocks as they fill up. This means only a single (i.e. the
-/// current) block needs to be in memory regardless of the input size (if read_write is
-/// true, then two blocks need to be in memory).
-/// 2. Pinned: Blocks are left pinned. If we run out of blocks, the write will fail and
-/// the caller needs to free memory from the underlying block mgr.
-///
-/// Memory lifetime of rows read from stream:
-/// If the stream is pinned, it is valid to access any tuples returned via
-/// GetNext() or GetRows() until the stream is unpinned. If the stream is unpinned, and
-/// the batch returned from GetNext() has the needs_deep_copy flag set, any tuple memory
-/// returned so far from the stream may be freed on the next call to GetNext().
-///
-/// Manual construction of rows with AllocateRow():
-/// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow().
-/// The caller of AllocateRow() is responsible for writing the row with exactly the
-/// layout described above.
-///
-/// If a caller constructs a tuple in this way, the caller can set the pointers and they
-/// will not be modified until the stream is read via GetNext() or GetRows().
-///
-/// TODO: we need to be able to do read ahead in the BufferedBlockMgr. It currently
-/// only has PinAllBlocks() which is blocking. We need a non-blocking version of this or
-/// some way to indicate a block will need to be pinned soon.
-/// TODO: see if this can be merged with Sorter::Run. The key difference is that this
-/// does not need to return rows in the order they were added, which allows it to be
-/// simpler.
-/// TODO: we could compact the small buffers when we need to spill but they use very
-/// little memory so ths might not be very useful.
-/// TODO: improvements:
-/// - It would be good to allocate the null indicators at the end of each block and grow
-/// this array as new rows are inserted in the block. If we do so, then there will be
-/// fewer gaps in case of many rows with NULL tuples.
-/// - We will want to multithread this. Add a AddBlock() call so the synchronization
-/// happens at the block level. This is a natural extension.
-/// - Instead of allocating all blocks from the block_mgr, allocate some blocks that
-/// are much smaller (e.g. 16K and doubling up to the block size). This way, very
-/// small streams (a common case) will use very little memory. This small blocks
-/// are always in memory since spilling them frees up negligible memory.
-/// - Return row batches in GetNext() instead of filling one in
-class BufferedTupleStream {
- public:
- /// Ordinal index into the stream to retrieve a row in O(1) time. This index can
- /// only be used if the stream is pinned.
- /// To read a row from a stream we need three pieces of information that we squeeze in
- /// 64 bits:
- /// - The index of the block. The block id is stored in 16 bits. We can have up to
- /// 64K blocks per tuple stream. With 8MB blocks that is 512GB per stream.
- /// - The offset of the start of the row (data) within the block. Since blocks are 8MB
- /// we use 24 bits for the offsets. (In theory we could use 23 bits.)
- /// - The idx of the row in the block. We need this for retrieving the null indicators.
- /// We use 24 bits for this index as well.
- struct RowIdx {
- static const uint64_t BLOCK_MASK = 0xFFFF;
- static const uint64_t BLOCK_SHIFT = 0;
- static const uint64_t OFFSET_MASK = 0xFFFFFF0000;
- static const uint64_t OFFSET_SHIFT = 16;
- static const uint64_t IDX_MASK = 0xFFFFFF0000000000;
- static const uint64_t IDX_SHIFT = 40;
-
- uint64_t block() const {
- return (data & BLOCK_MASK);
- }
-
- uint64_t offset() const {
- return (data & OFFSET_MASK) >> OFFSET_SHIFT;
- }
-
- uint64_t idx() const {
- return (data & IDX_MASK) >> IDX_SHIFT;
- }
-
- uint64_t set(uint64_t block, uint64_t offset, uint64_t idx) {
- DCHECK_LE(block, BLOCK_MASK)
- << "Cannot have more than 2^16 = 64K blocks in a tuple stream.";
- DCHECK_LE(offset, OFFSET_MASK >> OFFSET_SHIFT)
- << "Cannot have blocks larger than 2^24 = 16MB";
- DCHECK_LE(idx, IDX_MASK >> IDX_SHIFT)
- << "Cannot have more than 2^24 = 16M rows in a block.";
- data = block | (offset << OFFSET_SHIFT) | (idx << IDX_SHIFT);
- return data;
- }
-
- std::string DebugString() const;
-
- uint64_t data;
- };
-
- /// row_desc: description of rows stored in the stream. This is the desc for rows
- /// that are added and the rows being returned.
- /// block_mgr: Underlying block mgr that owns the data blocks.
- /// use_initial_small_buffers: If true, the initial N buffers allocated for the
- /// tuple stream use smaller than IO-sized buffers.
- /// read_write: Stream allows interchanging read and write operations. Requires at
- /// least two blocks may be pinned.
- /// ext_varlen_slots: set of varlen slots with data stored externally to the stream
- BufferedTupleStream(RuntimeState* state, const RowDescriptor* row_desc,
- BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
- bool use_initial_small_buffers, bool read_write,
- const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
-
- ~BufferedTupleStream();
-
- /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
- /// once before any of the other APIs.
- /// If 'pinned' is true, the tuple stream starts of pinned, otherwise it is unpinned.
- /// If 'profile' is non-NULL, counters are created.
- /// 'node_id' is only used for error reporting.
- Status Init(int node_id, RuntimeProfile* profile, bool pinned);
-
- /// Prepares the stream for writing by attempting to allocate a write block.
- /// Called after Init() and before the first AddRow() call.
- /// 'got_buffer': set to true if the first write block was successfully pinned, or
- /// false if the block could not be pinned and no error was encountered. Undefined
- /// if an error status is returned.
- Status PrepareForWrite(bool* got_buffer);
-
- /// Must be called for streams using small buffers to switch to IO-sized buffers.
- /// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_
- /// back to false.
- /// TODO: IMPALA-3200: remove this when small buffers are removed.
- Status SwitchToIoBuffers(bool* got_buffer);
-
- /// Adds a single row to the stream. Returns true if the append succeeded, returns false
- /// and sets 'status' to OK if appending failed but can be retried or returns false and
- /// sets 'status' to an error if an error occurred.
- /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow()
- /// returns an error, it should not be called again. If appending failed without an
- /// error and the stream is using small buffers, it is valid to call
- /// SwitchToIoBuffers() then AddRow() again.
- bool AddRow(TupleRow* row, Status* status) noexcept;
-
- /// Allocates space to store a row of with fixed length 'fixed_size' and variable
- /// length data 'varlen_size'. If successful, returns the pointer where fixed length
- /// data should be stored and assigns 'varlen_data' to where var-len data should
- /// be stored. Returns NULL if there is not enough memory or an error occurred.
- /// Sets *status if an error occurred. The returned memory is guaranteed to all
- /// be allocated in the same block. AllocateRow does not currently support nullable
- /// tuples.
- uint8_t* AllocateRow(int fixed_size, int varlen_size, uint8_t** varlen_data,
- Status* status);
-
- /// Populates 'row' with the row at 'idx'. The stream must be pinned. The row must have
- /// been allocated with the stream's row desc.
- void GetTupleRow(const RowIdx& idx, TupleRow* row) const;
-
- /// Prepares the stream for reading. If read_write_, this can be called at any time to
- /// begin reading. Otherwise this must be called after the last AddRow() and
- /// before GetNext().
- /// delete_on_read: Blocks are deleted after they are read.
- /// got_buffer: set to true if the first read block was successfully pinned, or
- /// false if the block could not be pinned and no error was encountered.
- Status PrepareForRead(bool delete_on_read, bool* got_buffer);
-
- /// Pins all blocks in this stream and switches to pinned mode.
- /// If there is not enough memory, *pinned is set to false and the stream is unmodified.
- /// If already_reserved is true, the caller has already made a reservation on
- /// block_mgr_client_ to pin the stream.
- Status PinStream(bool already_reserved, bool* pinned);
-
- /// Modes for UnpinStream().
- enum UnpinMode {
- /// All blocks in the stream are unpinned and the read/write positions in the stream
- /// are reset. No more rows can be written to the stream after this. The stream can
- /// be re-read from the beginning by calling PrepareForRead().
- UNPIN_ALL,
- /// All blocks are unpinned aside from the current read and write blocks (if any),
- /// which is left in the same state. The unpinned stream can continue being read
- /// or written from the current read or write positions.
- UNPIN_ALL_EXCEPT_CURRENT,
- };
-
- /// Unpins stream with the given 'mode' as described above.
- Status UnpinStream(UnpinMode mode);
-
- /// Get the next batch of output rows. Memory is still owned by the BufferedTupleStream
- /// and must be copied out by the caller.
- Status GetNext(RowBatch* batch, bool* eos);
-
- /// Same as above, but also populate 'indices' with the index of each returned row.
- Status GetNext(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices);
-
- /// Returns all the rows in the stream in batch. This pins the entire stream in the
- /// process.
- /// *got_rows is false if the stream could not be pinned.
- Status GetRows(boost::scoped_ptr<RowBatch>* batch, bool* got_rows);
-
- /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
- /// attaches any pinned blocks to the batch and deletes unpinned blocks. Otherwise
- /// deletes all blocks. Does nothing if the stream was already closed. The 'flush'
- /// mode is forwarded to RowBatch::AddBlock() when attaching blocks.
- void Close(RowBatch* batch, RowBatch::FlushMode flush);
-
- /// Number of rows in the stream.
- int64_t num_rows() const { return num_rows_; }
-
- /// Number of rows returned via GetNext().
- int64_t rows_returned() const { return rows_returned_; }
-
- /// Returns the byte size necessary to store the entire stream in memory.
- int64_t byte_size() const { return total_byte_size_; }
-
- /// Returns the byte size of the stream that is currently pinned in memory.
- /// If ignore_current is true, the write_block_ memory is not included.
- int64_t bytes_in_mem(bool ignore_current) const;
-
- bool is_closed() const { return closed_; }
- bool is_pinned() const { return pinned_; }
- int blocks_pinned() const { return num_pinned_; }
- int blocks_unpinned() const { return blocks_.size() - num_pinned_ - num_small_blocks_; }
- bool has_read_block() const { return read_block_ != blocks_.end(); }
- bool has_write_block() const { return write_block_ != NULL; }
- bool using_small_buffers() const { return use_small_buffers_; }
-
- /// Returns true if the row consumes any memory. If false, the stream only needs to
- /// store the count of rows.
- bool RowConsumesMemory() const {
- return fixed_tuple_row_size_ > 0 || has_nullable_tuple_;
- }
-
- std::string DebugString() const;
-
- private:
- friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
- friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
- friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
- friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test;
-
- /// Runtime state instance used to check for cancellation. Not owned.
- RuntimeState* const state_;
-
- /// Description of rows stored in the stream.
- const RowDescriptor* desc_;
-
- /// Sum of the fixed length portion of all the tuples in desc_.
- int fixed_tuple_row_size_;
-
- /// The size of the fixed length portion for each tuple in the row.
- std::vector<int> fixed_tuple_sizes_;
-
- /// Max size (in bytes) of null indicators bitmap in the current read and write
- /// blocks. If 0, it means that there is no need to store null indicators for this
- /// RowDesc. We calculate this value based on the block's size and the
- /// fixed_tuple_row_size_. When not 0, this value is also an upper bound for the number
- /// of (rows * tuples_per_row) in this block.
- int read_block_null_indicators_size_;
- int write_block_null_indicators_size_;
-
- /// Size (in bytes) of the null indicators bitmap reserved in a block of maximum
- /// size (i.e. IO block size). 0 if no tuple is nullable.
- int max_null_indicators_size_;
-
- /// Vectors of all the strings slots that have their varlen data stored in stream
- /// grouped by tuple_idx.
- std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_;
-
- /// Vectors of all the collection slots that have their varlen data stored in the
- /// stream, grouped by tuple_idx.
- std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_;
-
- /// Block manager and client used to allocate, pin and release blocks. Not owned.
- BufferedBlockMgr* block_mgr_;
- BufferedBlockMgr::Client* block_mgr_client_;
-
- /// List of blocks in the stream.
- std::list<BufferedBlockMgr::Block*> blocks_;
-
- /// Total size of blocks_, including small blocks.
- int64_t total_byte_size_;
-
- /// Iterator pointing to the current block for read. Equal to list.end() until
- /// PrepareForRead() is called.
- std::list<BufferedBlockMgr::Block*>::iterator read_block_;
-
- /// For each block in the stream, the buffer of the start of the block. This is only
- /// valid when the stream is pinned, giving random access to data in the stream.
- /// This is not maintained for delete_on_read_.
- std::vector<uint8_t*> block_start_idx_;
-
- /// Current idx of the tuple read from the read_block_ buffer.
- uint32_t read_tuple_idx_;
-
- /// Current offset in read_block_ of the end of the last data read.
- uint8_t* read_ptr_;
-
- /// Pointer to one byte past the end of read_block_.
- uint8_t* read_end_ptr_;
-
- /// Current idx of the tuple written at the write_block_ buffer.
- uint32_t write_tuple_idx_;
-
- /// Pointer into write_block_ of the end of the last data written.
- uint8_t* write_ptr_;
-
- /// Pointer to one byte past the end of write_block_.
- uint8_t* write_end_ptr_;
-
- /// Number of rows returned to the caller from GetNext().
- int64_t rows_returned_;
-
- /// The block index of the current read block in blocks_.
- int read_block_idx_;
-
- /// The current block for writing. NULL if there is no available block to write to.
- /// The entire write_block_ buffer is marked as allocated, so any data written into
- /// the buffer will be spilled without having to allocate additional space.
- BufferedBlockMgr::Block* write_block_;
-
- /// Number of pinned blocks in blocks_, stored to avoid iterating over the list
- /// to compute bytes_in_mem and bytes_unpinned.
- /// This does not include small blocks.
- int num_pinned_;
-
- /// The total number of small blocks in blocks_;
- int num_small_blocks_;
-
- /// Number of rows stored in the stream.
- int64_t num_rows_;
-
- /// Counters added by this object to the parent runtime profile.
- RuntimeProfile::Counter* pin_timer_;
- RuntimeProfile::Counter* unpin_timer_;
- RuntimeProfile::Counter* get_new_block_timer_;
-
- /// If true, read and write operations may be interleaved. Otherwise all calls
- /// to AddRow() must occur before calling PrepareForRead() and subsequent calls to
- /// GetNext().
- const bool read_write_;
-
- /// Whether any tuple in the rows is nullable.
- const bool has_nullable_tuple_;
-
- /// If true, this stream is still using small buffers.
- bool use_small_buffers_;
-
- /// If true, blocks are deleted after they are read.
- bool delete_on_read_;
-
- bool closed_; // Used for debugging.
-
- /// If true, this stream has been explicitly pinned by the caller. This changes the
- /// memory management of the stream. The blocks are not unpinned until the caller calls
- /// UnpinAllBlocks(). If false, only the write_block_ and/or read_block_ are pinned
- /// (both are if read_write_ is true).
- bool pinned_;
-
- /// The slow path for AddRow() that is called if there is not sufficient space in
- /// the current block.
- bool AddRowSlow(TupleRow* row, Status* status) noexcept;
-
- /// Copies 'row' into write_block_. Returns false if there is not enough space in
- /// 'write_block_'. After returning false, write_ptr_ may be left pointing to the
- /// partially-written row, and no more data can be written to write_block_.
- template <bool HAS_NULLABLE_TUPLE>
- bool DeepCopyInternal(TupleRow* row) noexcept;
-
- /// Helper function to copy strings in string_slots from tuple into write_block_.
- /// Updates write_ptr_ to the end of the string data added. Returns false if the data
- /// does not fit in the current write block. After returning false, write_ptr_ is left
- /// pointing to the partially-written row, and no more data can be written to
- /// write_block_.
- bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots);
-
- /// Helper function to deep copy collections in collection_slots from tuple into
- /// write_block_. Updates write_ptr_ to the end of the collection data added. Returns
- /// false if the data does not fit in the current write block.. After returning false,
- /// write_ptr_ is left pointing to the partially-written row, and no more data can be
- /// written to write_block_.
- bool CopyCollections(const Tuple* tuple,
- const std::vector<SlotDescriptor*>& collection_slots);
-
- /// Wrapper of the templated DeepCopyInternal() function.
- bool DeepCopy(TupleRow* row) noexcept;
-
- /// Gets a new block of 'block_len' bytes from the block_mgr_, updating write_block_,
- /// write_tuple_idx_, write_ptr_ and write_end_ptr_. 'null_indicators_size' is the
- /// number of bytes that will be reserved in the block for the null indicators bitmap.
- /// *got_block is set to true if a block was successfully acquired. Null indicators
- /// (if any) will also be reserved and initialized. If there are no blocks available,
- /// *got_block is set to false and write_block_ is unchanged.
- Status NewWriteBlock(
- int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept;
-
- /// A wrapper around NewWriteBlock(). 'row_size' is the size of the tuple row to be
- /// appended to this block. This function determines the block size required in order
- /// to fit the row and null indicators.
- Status NewWriteBlockForRow(int64_t row_size, bool* got_block) noexcept;
-
- /// Reads the next block from the block_mgr_. This blocks if necessary.
- /// Updates read_block_, read_ptr_, read_tuple_idx_ and read_end_ptr_.
- Status NextReadBlock();
-
- /// Returns the total additional bytes that this row will consume in write_block_ if
- /// appended to the block. This includes the fixed length part of the row and the
- /// data for inlined_string_slots_ and inlined_coll_slots_.
- int64_t ComputeRowSize(TupleRow* row) const noexcept;
-
- /// Unpins block if it is an IO-sized block and updates tracking stats.
- Status UnpinBlock(BufferedBlockMgr::Block* block);
-
- /// Templated GetNext implementations.
- template <bool FILL_INDICES>
- Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices);
- template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE>
- Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices);
-
- /// Helper function for GetNextInternal(). For each string slot in string_slots,
- /// update StringValue's ptr field to point to the corresponding string data stored
- /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the
- /// StringValue's length field.
- void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple);
-
- /// Helper function for GetNextInternal(). For each collection slot in collection_slots,
- /// recursively update any pointers in the CollectionValue to point to the corresponding
- /// var len data stored inline in the stream, advancing read_ptr_ as data is read.
- /// Assumes that the collection was serialized to the stream in DeepCopy()'s format.
- void FixUpCollectionsForRead(const vector<SlotDescriptor*>& collection_slots,
- Tuple* tuple);
-
- /// Computes the number of bytes needed for null indicators for a block of 'block_size'.
- /// Return 0 if no tuple is nullable. Return -1 if a single row of fixed-size tuples
- /// plus its null indicator (if any) cannot fit in the block.
- int ComputeNumNullIndicatorBytes(int block_size) const;
-
- uint32_t read_block_bytes_remaining() const {
- DCHECK_GE(read_end_ptr_, read_ptr_);
- DCHECK_LE(read_end_ptr_ - read_ptr_, (*read_block_)->buffer_len());
- return read_end_ptr_ - read_ptr_;
- }
-
- uint32_t write_block_bytes_remaining() const {
- DCHECK_GE(write_end_ptr_, write_ptr_);
- DCHECK_LE(write_end_ptr_ - write_ptr_, write_block_->buffer_len());
- return write_end_ptr_ - write_ptr_;
- }
-
-};
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h
deleted file mode 100644
index ba6bb8c..0000000
--- a/be/src/runtime/buffered-tuple-stream.inline.h
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H
-#define IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H
-
-#include "runtime/buffered-tuple-stream.h"
-
-#include "runtime/descriptors.h"
-#include "runtime/tuple-row.h"
-
-namespace impala {
-
-inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept {
- DCHECK(!closed_);
- if (LIKELY(DeepCopy(row))) return true;
- return AddRowSlow(row, status);
-}
-
-inline uint8_t* BufferedTupleStream::AllocateRow(int fixed_size, int varlen_size,
- uint8_t** varlen_data, Status* status) {
- DCHECK(!closed_);
- DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable tuples";
- const int total_size = fixed_size + varlen_size;
- if (UNLIKELY(write_block_ == NULL || write_block_bytes_remaining() < total_size)) {
- bool got_block;
- *status = NewWriteBlockForRow(total_size, &got_block);
- if (!status->ok() || !got_block) return NULL;
- }
- DCHECK(write_block_ != NULL);
- DCHECK(write_block_->is_pinned());
- DCHECK_GE(write_block_bytes_remaining(), total_size);
- ++num_rows_;
- write_block_->AddRow();
-
- uint8_t* fixed_data = write_ptr_;
- write_ptr_ += fixed_size;
- *varlen_data = write_ptr_;
- write_ptr_ += varlen_size;
- return fixed_data;
-}
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 9b16112..83f2e6a 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -308,6 +308,16 @@ int64_t BufferPool::ClientHandle::GetUnusedReservation() const {
return impl_->reservation()->GetUnusedReservation();
}
+bool BufferPool::ClientHandle::TransferReservationFrom(
+ ReservationTracker* src, int64_t bytes) {
+ return src->TransferReservationTo(impl_->reservation(), bytes);
+}
+
+bool BufferPool::ClientHandle::TransferReservationTo(
+ ReservationTracker* dst, int64_t bytes) {
+ return impl_->reservation()->TransferReservationTo(dst, bytes);
+}
+
void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t bytes) {
DCHECK_EQ(dst->tracker_->parent(), impl_->reservation());
bool success = impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes);
@@ -355,7 +365,7 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
RuntimeProfile* child_profile = profile->CreateChild("Buffer pool", true, true);
reservation_.InitChildTracker(
child_profile, parent_reservation, mem_tracker, reservation_limit);
- counters_.alloc_time = ADD_TIMER(profile, "AllocTime");
+ counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
counters_.cumulative_allocations =
ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);
counters_.cumulative_bytes_alloced =
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index f2ff99b..e3df8df 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -338,6 +338,14 @@ class BufferPool::ClientHandle {
int64_t GetUsedReservation() const;
int64_t GetUnusedReservation() const;
+ /// Try to transfer 'bytes' of reservation from 'src' to this client using
+ /// ReservationTracker::TransferReservationTo().
+ bool TransferReservationFrom(ReservationTracker* src, int64_t bytes);
+
+ /// Transfer 'bytes' of reservation from this client to 'dst' using
+ /// ReservationTracker::TransferReservationTo().
+ bool TransferReservationTo(ReservationTracker* dst, int64_t bytes);
+
bool is_registered() const { return impl_ != NULL; }
std::string DebugString() const;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 4d525c0..80084bc 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -127,6 +127,10 @@ class ReservationTracker {
/// Returns true if the reservation increase was successful or not necessary.
bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
+ /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's
+ /// reservation must be at least 'bytes' before calling this method.
+ void DecreaseReservation(int64_t bytes) { DecreaseReservation(bytes, false); }
+
/// Transfer reservation from this tracker to 'other'. Both trackers must be in the
/// same query subtree of the hierarchy. One tracker can be the ancestor of the other,
/// or they can share a common ancestor. The subtree root must be at the query level
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 3393ab3..55042d8 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -83,7 +83,7 @@ DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads");
// not introduce seeks. The literature seems to agree that with 8 MB reads, random
// io and sequential io perform similarly.
DEFINE_int32(read_size, 8 * 1024 * 1024, "Read Size (in bytes)");
-DEFINE_int32(min_buffer_size, 1024, "The minimum read buffer size (in bytes)");
+DECLARE_int64(min_buffer_size);
// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
DEFINE_int32(max_free_io_buffers, 128,
@@ -937,9 +937,8 @@ void DiskIoMgr::HandleWriteFinished(
int disk_id = write_range->disk_id_;
// Execute the callback before decrementing the thread count. Otherwise CancelContext()
- // that waits for the disk ref count to be 0 will return, creating a race, e.g.
- // between BufferedBlockMgr::WriteComplete() and BufferedBlockMgr::~BufferedBlockMgr().
- // See IMPALA-1890.
+ // that waits for the disk ref count to be 0 will return, creating a race, e.g. see
+ // IMPALA-1890.
// The status of the write does not affect the status of the writer context.
write_range->callback_(write_status);
{
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 960e3c9..f2ee6f0 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -75,6 +75,8 @@ DEFINE_int32(state_store_subscriber_port, 23000,
DEFINE_int32(num_hdfs_worker_threads, 16,
"(Advanced) The number of threads in the global HDFS operation pool");
DEFINE_bool(disable_admission_control, false, "Disables admission control.");
+DEFINE_int64(min_buffer_size, 64 * 1024,
+ "(Advanced) The minimum buffer size to use in the buffer pool");
DECLARE_int32(state_store_port);
DECLARE_int32(num_threads_per_core);
@@ -204,13 +206,14 @@ Status ExecEnv::StartServices() {
// memory limit either based on the available physical memory, or if overcommitting
// is turned off, we use the memory commit limit from /proc/meminfo (see
// IMPALA-1690).
- // --mem_limit="" means no memory limit
+ // --mem_limit="" means no memory limit. TODO: IMPALA-5652: deprecate this mode
int64_t bytes_limit = 0;
bool is_percent;
+ int64_t system_mem;
if (MemInfo::vm_overcommit() == 2 &&
MemInfo::commit_limit() < MemInfo::physical_mem()) {
- bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent,
- MemInfo::commit_limit());
+ system_mem = MemInfo::commit_limit();
+ bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem);
// There might be the case of misconfiguration, when on a system swap is disabled
// and overcommitting is turned off the actual usable memory is less than the
// available physical memory.
@@ -225,14 +228,23 @@ Status ExecEnv::StartServices() {
<< "/proc/sys/vm/overcommit_memory and "
<< "/proc/sys/vm/overcommit_ratio.";
} else {
- bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent,
- MemInfo::physical_mem());
+ system_mem = MemInfo::physical_mem();
+ bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem);
}
-
+ // ParseMemSpec returns 0 to mean unlimited. TODO: IMPALA-5652: deprecate this mode.
+ bool no_process_mem_limit = bytes_limit == 0;
if (bytes_limit < 0) {
return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'.");
}
+ if (!BitUtil::IsPowerOf2(FLAGS_min_buffer_size)) {
+ return Status(Substitute(
+ "--min_buffer_size must be a power-of-two: $0", FLAGS_min_buffer_size));
+ }
+ int64_t buffer_pool_capacity = BitUtil::RoundDown(
+ no_process_mem_limit ? system_mem : bytes_limit * 4 / 5, FLAGS_min_buffer_size);
+ InitBufferPool(FLAGS_min_buffer_size, buffer_pool_capacity);
+
metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr);
impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
@@ -240,8 +252,8 @@ Status ExecEnv::StartServices() {
metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
// Limit of -1 means no memory limit.
- mem_tracker_.reset(new MemTracker(
- AggregateMemoryMetrics::TOTAL_USED, bytes_limit > 0 ? bytes_limit : -1, "Process"));
+ mem_tracker_.reset(new MemTracker(AggregateMemoryMetrics::TOTAL_USED,
+ no_process_mem_limit ? -1 : bytes_limit, "Process"));
if (buffer_pool_ != nullptr) {
// Add BufferPool MemTrackers for cached memory that is not tracked against queries
// but is included in process memory consumption.
@@ -270,6 +282,8 @@ Status ExecEnv::StartServices() {
}
LOG(INFO) << "Using global memory limit: "
<< PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
+ LOG(INFO) << "Buffer pool capacity: "
+ << PrettyPrinter::Print(buffer_pool_capacity, TUnit::BYTES);
RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
@@ -310,9 +324,8 @@ Status ExecEnv::StartServices() {
return Status::OK();
}
-void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) {
- DCHECK(buffer_pool_ == nullptr);
- buffer_pool_.reset(new BufferPool(min_page_size, capacity));
+void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity) {
+ buffer_pool_.reset(new BufferPool(min_buffer_size, capacity));
buffer_reservation_.reset(new ReservationTracker());
buffer_reservation_->InitRootTracker(nullptr, capacity);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 4674072..63d2e0b 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -159,8 +159,8 @@ class ExecEnv {
boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
/// Query-wide buffer pool and the root reservation tracker for the pool. The
- /// reservation limit is equal to the maximum capacity of the pool.
- /// For now this is only used by backend tests that create them via InitBufferPool();
+ /// reservation limit is equal to the maximum capacity of the pool. Created in
+ /// InitBufferPool();
boost::scoped_ptr<ReservationTracker> buffer_reservation_;
boost::scoped_ptr<BufferPool> buffer_pool_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 2385eab..07b3f1c 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -126,8 +126,6 @@ Status FragmentInstanceState::Prepare() {
profile()->AddChild(timings_profile_);
SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
- // TODO: move this into a RuntimeState::Init()
- RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
runtime_state_->InitFilterBank();
// Reserve one main thread from the pool
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/initial-reservations.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc
new file mode 100644
index 0000000..4987ec3
--- /dev/null
+++ b/be/src/runtime/initial-reservations.cc
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/initial-reservations.h"
+
+#include <limits>
+
+#include <boost/thread/mutex.hpp>
+#include <gflags/gflags.h>
+
+#include "common/logging.h"
+#include "common/object-pool.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "util/debug-util.h"
+
+#include "common/names.h"
+
+using std::numeric_limits;
+
+DECLARE_int32(be_port);
+DECLARE_string(hostname);
+
+namespace impala {
+
+InitialReservations::InitialReservations(ObjectPool* obj_pool,
+ ReservationTracker* query_reservation, MemTracker* query_mem_tracker,
+ int64_t initial_reservation_total_claims)
+ : remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+ MemTracker* initial_reservation_tracker = obj_pool->Add(
+ new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false));
+ initial_reservations_.InitChildTracker(nullptr, query_reservation,
+ initial_reservation_tracker, numeric_limits<int64_t>::max());
+}
+
+Status InitialReservations::Init(
+ const TUniqueId& query_id, int64_t query_min_reservation) {
+ DCHECK_EQ(0, initial_reservations_.GetReservation()) << "Already inited";
+ if (!initial_reservations_.IncreaseReservation(query_min_reservation)) {
+ return Status(TErrorCode::MINIMUM_RESERVATION_UNAVAILABLE,
+ PrettyPrinter::Print(query_min_reservation, TUnit::BYTES), FLAGS_hostname,
+ FLAGS_be_port, PrintId(query_id),
+ ExecEnv::GetInstance()->process_mem_tracker()->LogUsage());
+ }
+ VLOG_QUERY << "Successfully claimed initial reservations ("
+ << PrettyPrinter::Print(query_min_reservation, TUnit::BYTES) << ") for"
+ << " query " << PrintId(query_id);
+ return Status::OK();
+}
+
+void InitialReservations::Claim(BufferPool::ClientHandle* dst, int64_t bytes) {
+ DCHECK_GE(bytes, 0);
+ lock_guard<SpinLock> l(lock_);
+ DCHECK_LE(bytes, remaining_initial_reservation_claims_);
+ bool success = dst->TransferReservationFrom(&initial_reservations_, bytes);
+ DCHECK(success) << "Planner computation should ensure enough initial reservations";
+ remaining_initial_reservation_claims_ -= bytes;
+}
+
+void InitialReservations::Return(BufferPool::ClientHandle* src, int64_t bytes) {
+ lock_guard<SpinLock> l(lock_);
+ bool success = src->TransferReservationTo(&initial_reservations_, bytes);
+ // No limits on our tracker - no way this should fail.
+ DCHECK(success);
+ // Check to see if we can release any reservation.
+ int64_t excess_reservation =
+ initial_reservations_.GetReservation() - remaining_initial_reservation_claims_;
+ if (excess_reservation > 0) {
+ initial_reservations_.DecreaseReservation(excess_reservation);
+ }
+}
+
+void InitialReservations::ReleaseResources() {
+ initial_reservations_.Close();
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/initial-reservations.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.h b/be/src/runtime/initial-reservations.h
new file mode 100644
index 0000000..dfcb114
--- /dev/null
+++ b/be/src/runtime/initial-reservations.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_INITIAL_RESERVATIONS_H
+#define IMPALA_RUNTIME_INITIAL_RESERVATIONS_H
+
+#include "common/status.h"
+#include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class ObjectPool;
+
+/**
+ * Manages the pool of initial reservations for different nodes in the plan tree.
+ * Each plan node and sink claims its initial reservation from here, then returns it when
+ * it is done executing. The frontend is responsible for making sure that enough initial
+ * reservation is in this pool for all of the concurrent claims.
+ */
+class InitialReservations {
+ public:
+ /// 'query_reservation' and 'query_mem_tracker' are the top-level trackers for the
+ /// query. This creates trackers for initial reservations under those.
+ /// 'initial_reservation_total_claims' is the total of initial reservations that will be
+ /// claimed over the lifetime of the query. The total bytes claimed via Claim()
+ /// cannot exceed this. Allocated objects are stored in 'obj_pool'.
+ InitialReservations(ObjectPool* obj_pool, ReservationTracker* query_reservation,
+ MemTracker* query_mem_tracker, int64_t initial_reservation_total_claims);
+
+ /// Initialize the query's pool of initial reservations by acquiring the minimum
+ /// reservation required for the query on this host. Fails if the reservation could
+ /// not be acquired, e.g. because it would exceed a pool or process limit.
+ Status Init(
+ const TUniqueId& query_id, int64_t query_min_reservation) WARN_UNUSED_RESULT;
+
+ /// Claim the initial reservation of 'bytes' for 'dst'. Assumes that the transfer will
+ /// not violate any reservation limits on 'dst'.
+ void Claim(BufferPool::ClientHandle* dst, int64_t bytes);
+
+ /// Return the initial reservation of 'bytes' from 'src'. The reservation is returned
+ /// to the pool of reservations if it may be needed to satisfy a subsequent claim or
+ /// otherwise is released.
+ void Return(BufferPool::ClientHandle* src, int64_t bytes);
+
+ /// Release any reservations held onto by this object.
+ void ReleaseResources();
+
+ private:
+ // Protects all below members to ensure that the internal state is consistent.
+ SpinLock lock_;
+
+ // The pool of initial reservations that Claim() returns reservations from and
+ // Return() returns reservations to.
+ ReservationTracker initial_reservations_;
+
+ /// The total bytes of additional reservations that we expect to be claimed.
+ /// initial_reservations_->GetReservation() <= remaining_initial_reservation_claims_.
+ int64_t remaining_initial_reservation_claims_;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 6057b52..22c2826 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -124,6 +124,8 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) {
}
#endif
+ // decrement refcount taken in QueryState::Init();
+ qs->ReleaseInitialReservationRefcount();
// decrement refcount taken in StartQuery()
ReleaseQueryState(qs);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 21f35fb..64a8c5a 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -21,11 +21,12 @@
#include <boost/thread/locks.hpp>
#include "exprs/expr.h"
+#include "runtime/backend-client.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/backend-client.h"
#include "runtime/exec-env.h"
#include "runtime/fragment-instance-state.h"
+#include "runtime/initial-reservations.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/runtime-state.h"
@@ -37,6 +38,20 @@
using namespace impala;
+// The fraction of the query mem limit that is used for buffer reservations. Most
+// operators that accumulate memory use reservations, so the majority of memory should
+// be allocated to buffer reservations, as a heuristic.
+// TODO: this will go away once all operators use buffer reservations.
+static const double RESERVATION_MEM_FRACTION = 0.8;
+
+// The minimum amount of memory that should be left after buffer reservations.
+// The limit on reservations is computed as:
+// min(query_limit * RESERVATION_MEM_FRACTION,
+// query_limit - RESERVATION_MEM_MIN_REMAINING)
+// TODO: this will go away once all operators use buffer reservations and we have accurate
+// minimum requirements.
+static const int64_t RESERVATION_MEM_MIN_REMAINING = 100 * 1024 * 1024;
+
QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -49,8 +64,10 @@ QueryState::ScopedRef::~ScopedRef() {
QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
: query_ctx_(query_ctx),
+ initial_reservation_refcnt_(0),
refcnt_(0),
- is_cancelled_(0) {
+ is_cancelled_(0),
+ query_spilled_(0) {
if (query_ctx_.request_pool.empty()) {
// fix up pool name for tests
DCHECK(!request_pool.empty());
@@ -75,6 +92,7 @@ void QueryState::ReleaseResources() {
// Clean up temporary files.
if (file_group_ != nullptr) file_group_->Close();
// Release any remaining reservation.
+ if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources();
if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
// Avoid dangling reference from the parent of 'query_mem_tracker_'.
if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent();
@@ -85,6 +103,7 @@ void QueryState::ReleaseResources() {
QueryState::~QueryState() {
DCHECK(released_resources_);
DCHECK_EQ(refcnt_.Load(), 0);
+ DCHECK_EQ(initial_reservation_refcnt_.Load(), 0);
}
Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
@@ -99,9 +118,8 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
"is over its memory limit", PrintId(query_id()));
RETURN_IF_ERROR(process_mem_tracker->MemLimitExceeded(NULL, msg, 0));
}
- // Do buffer-pool-related setup if running in a backend test that explicitly created
- // the pool.
- if (exec_env->buffer_pool() != nullptr) RETURN_IF_ERROR(InitBufferPoolState());
+
+ RETURN_IF_ERROR(InitBufferPoolState());
// don't copy query_ctx, it's large and we already did that in the c'tor
rpc_params_.__set_coord_state_idx(rpc_params.coord_state_idx);
@@ -112,6 +130,15 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
rpc_params_.__isset.fragment_instance_ctxs = true;
+ // Claim the query-wide minimum reservation. Do this last so that we don't need
+ // to handle releasing it if a later step fails.
+ initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
+ buffer_reservation_, query_mem_tracker_,
+ query_ctx_.per_host_initial_reservation_total_claims));
+ RETURN_IF_ERROR(
+ initial_reservations_->Init(query_id(), query_ctx_.per_host_min_reservation));
+ DCHECK_EQ(0, initial_reservation_refcnt_.Load());
+ initial_reservation_refcnt_.Add(1); // Decremented in QueryExecMgr::StartQueryHelper().
return Status::OK();
}
@@ -129,19 +156,23 @@ void QueryState::InitMemTrackers() {
Status QueryState::InitBufferPoolState() {
ExecEnv* exec_env = ExecEnv::GetInstance();
- int64_t query_mem_limit = query_mem_tracker_->limit();
- if (query_mem_limit == -1) query_mem_limit = numeric_limits<int64_t>::max();
-
- // TODO: IMPALA-3200: add a default upper bound to buffer pool memory derived from
- // query_mem_limit.
- int64_t max_reservation = numeric_limits<int64_t>::max();
- if (query_options().__isset.max_block_mgr_memory
- && query_options().max_block_mgr_memory > 0) {
- max_reservation = query_options().max_block_mgr_memory;
+ int64_t mem_limit = query_mem_tracker_->lowest_limit();
+ int64_t max_reservation;
+ if (query_options().__isset.buffer_pool_limit
+ && query_options().buffer_pool_limit > 0) {
+ max_reservation = query_options().buffer_pool_limit;
+ } else if (mem_limit == -1) {
+ // No query mem limit. The process-wide reservation limit is the only limit on
+ // reservations.
+ max_reservation = numeric_limits<int64_t>::max();
+ } else {
+ DCHECK_GE(mem_limit, 0);
+ max_reservation = min<int64_t>(
+ mem_limit * RESERVATION_MEM_FRACTION, mem_limit - RESERVATION_MEM_MIN_REMAINING);
+ max_reservation = max<int64_t>(0, max_reservation);
}
+ VLOG_QUERY << "Buffer pool limit for " << PrintId(query_id()) << ": " << max_reservation;
- // TODO: IMPALA-3748: claim the query-wide minimum reservation.
- // For now, rely on exec nodes to grab their minimum reservation during Prepare().
buffer_reservation_ = obj_pool_.Add(new ReservationTracker);
buffer_reservation_->InitChildTracker(
NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation);
@@ -256,6 +287,7 @@ void QueryState::StartFInstances() {
VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id())
<< " #instances=" << rpc_params_.fragment_instance_ctxs.size();
DCHECK_GT(refcnt_.Load(), 0);
+ DCHECK_GT(initial_reservation_refcnt_.Load(), 0) << "Should have been taken in Init()";
// set up desc tbl
DCHECK(query_ctx().__isset.desc_tbl);
@@ -290,6 +322,7 @@ void QueryState::StartFInstances() {
// start new thread to execute instance
refcnt_.Add(1); // decremented in ExecFInstance()
+ initial_reservation_refcnt_.Add(1); // decremented in ExecFInstance()
string thread_name = Substitute(
"exec-finstance (finst:$0)", PrintId(instance_ctx.fragment_instance_id));
Thread t(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
@@ -311,6 +344,12 @@ void QueryState::StartFInstances() {
instances_prepared_promise_.Set(prepare_status);
}
+void QueryState::ReleaseInitialReservationRefcount() {
+ int32_t new_val = initial_reservation_refcnt_.Add(-1);
+ DCHECK_GE(new_val, 0);
+ if (new_val == 0) initial_reservations_->ReleaseResources();
+}
+
void QueryState::ExecFInstance(FragmentInstanceState* fis) {
ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
@@ -327,6 +366,8 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
// initiate cancellation if nobody has done so yet
if (!status.ok()) Cancel();
// decrement refcount taken in StartFInstances()
+ ReleaseInitialReservationRefcount();
+ // decrement refcount taken in StartFInstances()
ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
}
@@ -345,3 +386,21 @@ void QueryState::PublishFilter(int32_t filter_id, int fragment_idx,
fis->PublishFilter(filter_id, thrift_bloom_filter);
}
}
+
+Status QueryState::StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker) {
+ // Return an error message with the root cause of why spilling is disabled.
+ if (query_options().scratch_limit == 0) {
+ return mem_tracker->MemLimitExceeded(
+ runtime_state, "Could not free memory by spilling to disk: scratch_limit is 0");
+ } else if (query_ctx_.disable_spilling) {
+ return mem_tracker->MemLimitExceeded(runtime_state,
+ "Could not free memory by spilling to disk: spilling was disabled by planner. "
+ "Re-enable spilling by setting the query option DISABLE_UNSAFE_SPILLS=false");
+ }
+ // 'file_group_' must be non-NULL for spilling to be enabled.
+ DCHECK(file_group_ != nullptr);
+ if (query_spilled_.CompareAndSwap(0, 1)) {
+ ImpaladMetrics::NUM_QUERIES_SPILLED->Increment(1);
+ }
+ return Status::OK();
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 9ce4316..fc71772 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -34,8 +34,10 @@
namespace impala {
class FragmentInstanceState;
+class InitialReservations;
class MemTracker;
class ReservationTracker;
+class RuntimeState;
/// Central class for all backend execution state (example: the FragmentInstanceStates
/// of the individual fragment instances) created for a particular query.
@@ -110,6 +112,7 @@ class QueryState {
// the following getters are only valid after Prepare()
ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
+ InitialReservations* initial_reservations() const { return initial_reservations_; }
TmpFileMgr::FileGroup* file_group() const { return file_group_; }
const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; }
@@ -117,8 +120,10 @@ class QueryState {
const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
/// Sets up state required for fragment execution: memory reservations, etc. Fails
- /// if resources could not be acquired. Uses few cycles and never blocks.
- /// Not idempotent, not thread-safe.
+ /// if resources could not be acquired. On success, acquires an initial reservation
+ /// refcount for the caller, which the caller must release by calling
+ /// ReleaseInitialReservationRefcount().
+ /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
/// The remaining public functions must be called only after Init().
Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT;
@@ -155,6 +160,12 @@ class QueryState {
/// If there is an error during the rpc, initiates cancellation.
void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis);
+ /// Checks whether spilling is enabled for this query. Must be called before the first
+ /// call to BufferPool::Unpin() for the query. Returns OK if spilling is enabled. If
+ /// spilling is not enabled, logs a MEM_LIMIT_EXCEEDED error from
+ /// tracker->MemLimitExceeded() to 'runtime_state'.
+ Status StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker);
+
~QueryState();
private:
@@ -162,6 +173,7 @@ class QueryState {
/// test execution
friend class RuntimeState;
+ friend class TestEnv;
static const int DEFAULT_BATCH_SIZE = 1024;
@@ -176,16 +188,21 @@ class QueryState {
/// TODO: find a way not to have to copy this
TExecQueryFInstancesParams rpc_params_;
- /// Buffer reservation for this query (owned by obj_pool_)
- /// Only non-null in backend tests that explicitly enabled the new buffer pool
- /// Set in Prepare().
- /// TODO: this will always be non-null once IMPALA-3200 is done
+ /// Buffer reservation for this query (owned by obj_pool_). Set in Prepare().
ReservationTracker* buffer_reservation_ = nullptr;
- /// Temporary files for this query (owned by obj_pool_)
- /// Only non-null in backend tests the explicitly enabled the new buffer pool
- /// Set in Prepare().
- /// TODO: this will always be non-null once IMPALA-3200 is done
+ /// Pool of buffer reservations used to distribute initial reservations to operators
+ /// in the query. Contains a ReservationTracker that is a child of
+ /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare().
+ InitialReservations* initial_reservations_ = nullptr;
+
+ /// Number of fragment instances executing, which may need to claim
+ /// from 'initial_reservations_'.
+ /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575).
+ AtomicInt32 initial_reservation_refcnt_;
+
+ /// Temporary files for this query (owned by obj_pool_). Non-null if spilling is
+ /// enabled. Set in Prepare().
TmpFileMgr::FileGroup* file_group_ = nullptr;
/// created in StartFInstances(), owned by obj_pool_
@@ -214,6 +231,11 @@ class QueryState {
/// True if and only if ReleaseResources() has been called.
bool released_resources_ = false;
+ /// Whether the query has spilled. 0 if the query has not spilled. Atomically set to 1
+ /// when the query first starts to spill. Required to correctly maintain the
+ /// "num-queries-spilled" metric.
+ AtomicInt32 query_spilled_;
+
/// Create QueryState w/ refcnt of 0.
/// The query is associated with the resource pool query_ctx.request_pool or
/// 'request_pool', if the former is not set (needed for tests).
@@ -222,13 +244,16 @@ class QueryState {
/// Execute the fragment instance and decrement the refcnt when done.
void ExecFInstance(FragmentInstanceState* fis);
- /// Called from Prepare() to initialize MemTrackers.
+ /// Called from constructor to initialize MemTrackers.
void InitMemTrackers();
- /// Called from Prepare() to setup buffer reservations and the
- /// file group. Fails if required resources are not available.
+ /// Called from Init() to set up buffer reservations and the file group.
Status InitBufferPoolState() WARN_UNUSED_RESULT;
+ /// Decrement 'initial_reservation_refcnt_' and release the initial reservation if it
+ /// goes to zero.
+ void ReleaseInitialReservationRefcount();
+
/// Same behavior as ReportExecStatus().
/// Cancel on error only if instances_started is true.
void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 11cf363..942ac05 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -147,9 +147,6 @@ RowBatch::~RowBatch() {
for (int i = 0; i < io_buffers_.size(); ++i) {
ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
}
- for (int i = 0; i < blocks_.size(); ++i) {
- blocks_[i]->Delete();
- }
for (BufferInfo& buffer_info : buffers_) {
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
buffer_info.client, &buffer_info.buffer);
@@ -295,14 +292,6 @@ void RowBatch::AddIoBuffer(unique_ptr<DiskIoMgr::BufferDescriptor> buffer) {
io_buffers_.emplace_back(move(buffer));
}
-void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) {
- DCHECK(block != NULL);
- DCHECK(block->is_pinned());
- blocks_.push_back(block);
- auxiliary_mem_usage_ += block->buffer_len();
- if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
-}
-
void RowBatch::AddBuffer(BufferPool::ClientHandle* client,
BufferPool::BufferHandle&& buffer, FlushMode flush) {
auxiliary_mem_usage_ += buffer.len();
@@ -322,10 +311,6 @@ void RowBatch::Reset() {
ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
}
io_buffers_.clear();
- for (int i = 0; i < blocks_.size(); ++i) {
- blocks_[i]->Delete();
- }
- blocks_.clear();
for (BufferInfo& buffer_info : buffers_) {
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
buffer_info.client, &buffer_info.buffer);
@@ -342,10 +327,6 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
dest->AddIoBuffer(move(io_buffers_[i]));
}
io_buffers_.clear();
- for (int i = 0; i < blocks_.size(); ++i) {
- dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES);
- }
- blocks_.clear();
for (BufferInfo& buffer_info : buffers_) {
dest->AddBuffer(
buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 1b75ebb..35a8f14 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -25,7 +25,6 @@
#include "codegen/impala-ir.h"
#include "common/compiler-util.h"
#include "common/logging.h"
-#include "runtime/buffered-block-mgr.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/descriptors.h"
#include "runtime/disk-io-mgr.h"
@@ -207,7 +206,6 @@ class RowBatch {
int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); }
MemPool* tuple_data_pool() { return &tuple_data_pool_; }
int num_io_buffers() const { return io_buffers_.size(); }
- int num_blocks() const { return blocks_.size(); }
int num_buffers() const { return buffers_.size(); }
/// Resets the row batch, returning all resources it has accumulated.
@@ -216,13 +214,6 @@ class RowBatch {
/// Add io buffer to this row batch.
void AddIoBuffer(std::unique_ptr<DiskIoMgr::BufferDescriptor> buffer);
- /// Adds a block to this row batch. The block must be pinned. The blocks must be
- /// deleted when freeing resources. The block's memory remains accounted against
- /// the original owner, even when the ownership of batches is transferred. If the
- /// original owner wants the memory to be released, it should call this with 'mode'
- /// FLUSH_RESOURCES (see MarkFlushResources() for further explanation).
- void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush);
-
/// Adds a buffer to this row batch. The buffer is deleted when freeing resources.
/// The buffer's memory remains accounted against the original owner, even when the
/// ownership of batches is transferred. If the original owner wants the memory to be
@@ -426,10 +417,6 @@ class RowBatch {
/// (i.e. they are not ref counted) so most row batches don't own any.
std::vector<std::unique_ptr<DiskIoMgr::BufferDescriptor>> io_buffers_;
- /// Blocks attached to this row batch. The underlying memory and block manager client
- /// are owned by the BufferedBlockMgr.
- std::vector<BufferedBlockMgr::Block*> blocks_;
-
struct BufferInfo {
BufferPool::ClientHandle* client;
BufferPool::BufferHandle buffer;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index ab70d4a..7b6066a 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -23,6 +23,7 @@
#include "runtime/runtime-filter-bank.h"
#include "util/bloom-filter.h"
#include "util/spinlock.h"
+#include "util/time.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 89eec29..ba8e75d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -17,21 +17,21 @@
#include "runtime/runtime-state.h"
-#include <iostream>
#include <jni.h>
+#include <iostream>
#include <sstream>
#include <string>
-#include "common/logging.h"
#include <boost/algorithm/string/join.hpp>
#include <gutil/strings/substitute.h>
+#include "common/logging.h"
#include "codegen/llvm-codegen.h"
#include "common/object-pool.h"
#include "common/status.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-fn-call.h"
-#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/data-stream-mgr.h"
#include "runtime/data-stream-recvr.h"
@@ -54,22 +54,10 @@
#include "common/names.h"
using namespace llvm;
+using strings::Substitute;
DECLARE_int32(max_errors);
-// The fraction of the query mem limit that is used for the block mgr. Operators
-// that accumulate memory all use the block mgr so the majority of the memory should
-// be allocated to the block mgr. The remaining memory is used by the non-spilling
-// operators and should be independent of data size.
-static const float BLOCK_MGR_MEM_FRACTION = 0.8f;
-
-// The minimum amount of memory that must be left after the block mgr reserves the
-// BLOCK_MGR_MEM_FRACTION. The block limit is:
-// min(query_limit * BLOCK_MGR_MEM_FRACTION, query_limit - BLOCK_MGR_MEM_MIN_REMAINING)
-// TODO: this value was picked arbitrarily and the tests are written to rely on this
-// for the minimum memory required to run the query. Revisit.
-static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024;
-
namespace impala {
RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
@@ -82,7 +70,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
query_state->query_ctx().utc_timestamp_string))),
exec_env_(exec_env),
profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
- instance_buffer_reservation_(nullptr),
+ instance_buffer_reservation_(new ReservationTracker),
is_cancelled_(false),
root_node_id_(-1) {
Init();
@@ -127,8 +115,7 @@ void RuntimeState::Init() {
instance_mem_tracker_.reset(new MemTracker(
runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
- if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) {
- instance_buffer_reservation_ = obj_pool()->Add(new ReservationTracker);
+ if (instance_buffer_reservation_ != nullptr) {
instance_buffer_reservation_->InitChildTracker(&profile_,
query_state_->buffer_reservation(), instance_mem_tracker_.get(),
numeric_limits<int64_t>::max());
@@ -139,28 +126,6 @@ void RuntimeState::InitFilterBank() {
filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this));
}
-Status RuntimeState::CreateBlockMgr() {
- DCHECK(block_mgr_.get() == NULL);
-
- // Compute the max memory the block mgr will use.
- int64_t block_mgr_limit = query_mem_tracker()->lowest_limit();
- if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max();
- block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION),
- block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING);
- if (block_mgr_limit < 0) block_mgr_limit = 0;
- if (query_options().__isset.max_block_mgr_memory &&
- query_options().max_block_mgr_memory > 0) {
- block_mgr_limit = query_options().max_block_mgr_memory;
- LOG(WARNING) << "Block mgr mem limit: "
- << PrettyPrinter::Print(block_mgr_limit, TUnit::BYTES);
- }
-
- RETURN_IF_ERROR(BufferedBlockMgr::Create(this, query_mem_tracker(),
- runtime_profile(), exec_env()->tmp_file_mgr(), block_mgr_limit,
- io_mgr()->max_read_buffer_size(), &block_mgr_));
- return Status::OK();
-}
-
Status RuntimeState::CreateCodegen() {
if (codegen_.get() != NULL) return Status::OK();
// TODO: add the fragment ID to the codegen ID as well
@@ -179,6 +144,10 @@ Status RuntimeState::CodegenScalarFns() {
return Status::OK();
}
+Status RuntimeState::StartSpilling(MemTracker* mem_tracker) {
+ return query_state_->StartSpilling(this, mem_tracker);
+}
+
string RuntimeState::ErrorLog() {
lock_guard<SpinLock> l(error_log_lock_);
return PrintErrorMapToString(error_log_);
@@ -270,7 +239,6 @@ void RuntimeState::ReleaseResources() {
if (resource_pool_ != nullptr) {
exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
}
- block_mgr_.reset(); // Release any block mgr memory, if this is the last reference.
codegen_.reset(); // Release any memory associated with codegen.
// Release the reservation, which should be unused at the point.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 9a1d0b2..12e7d8c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -32,7 +32,7 @@
namespace impala {
-class BufferedBlockMgr;
+class BufferPool;
class DataStreamRecvr;
class DescriptorTbl;
class DiskIoMgr;
@@ -92,9 +92,6 @@ class RuntimeState {
/// Initializes the runtime filter bank.
void InitFilterBank();
- /// Gets/Creates the query wide block mgr.
- Status CreateBlockMgr();
-
QueryState* query_state() const { return query_state_; }
/// Return the query's ObjectPool
ObjectPool* obj_pool() const;
@@ -132,7 +129,7 @@ class RuntimeState {
MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
MemTracker* query_mem_tracker(); // reference to the query_state_'s memtracker
ReservationTracker* instance_buffer_reservation() {
- return instance_buffer_reservation_;
+ return instance_buffer_reservation_.get();
}
ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
@@ -206,11 +203,6 @@ class RuntimeState {
/// Unregisters all reader contexts acquired through AcquireReaderContext().
void UnregisterReaderContexts();
- BufferedBlockMgr* block_mgr() {
- DCHECK(block_mgr_.get() != NULL);
- return block_mgr_.get();
- }
-
inline Status GetQueryStatus() {
// Do a racy check for query_status_ to avoid unnecessary spinlock acquisition.
if (UNLIKELY(!query_status_.ok())) {
@@ -307,21 +299,19 @@ class RuntimeState {
/// TODO: Fix IMPALA-4233
Status CodegenScalarFns();
+ /// Helper to call QueryState::StartSpilling().
+ Status StartSpilling(MemTracker* mem_tracker);
+
/// Release resources and prepare this object for destruction.
void ReleaseResources();
private:
- /// Allow TestEnv to set block_mgr manually for testing.
+ /// Allow TestEnv to use private methods for testing.
friend class TestEnv;
/// Set per-fragment state.
void Init();
- /// Use a custom block manager for the query for testing purposes.
- void set_block_mgr(const std::shared_ptr<BufferedBlockMgr>& block_mgr) {
- block_mgr_ = block_mgr;
- }
-
/// Lock protecting error_log_
SpinLock error_log_lock_;
@@ -382,9 +372,8 @@ class RuntimeState {
boost::scoped_ptr<MemTracker> instance_mem_tracker_;
/// Buffer reservation for this fragment instance - a child of the query buffer
- /// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_
- /// was created by a backend test. Owned by obj_pool().
- ReservationTracker* instance_buffer_reservation_;
+ /// reservation. Non-NULL if 'query_state_' is not NULL.
+ boost::scoped_ptr<ReservationTracker> instance_buffer_reservation_;
/// if true, execution should stop with a CANCELLED status
bool is_cancelled_;
@@ -401,11 +390,6 @@ class RuntimeState {
SpinLock reader_contexts_lock_;
std::vector<DiskIoRequestContext*> reader_contexts_;
- /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory
- /// with a fixed memory budget.
- /// The block mgr is shared by all fragments for this query.
- std::shared_ptr<BufferedBlockMgr> block_mgr_;
-
/// This is the node id of the root node for this plan fragment. This is used as the
/// hash seed and has two useful properties:
/// 1) It is the same for all exec nodes in a fragment, so the resulting hash values