You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/08 04:39:35 UTC
[5/7] incubator-impala git commit: IMPALA-3905: Add single-threaded
scan node.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 36e95b5..2efb24e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -19,83 +19,29 @@
#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_H_
#define IMPALA_EXEC_HDFS_SCAN_NODE_H_
-#include <vector>
-#include <memory>
+#include <map>
#include <stdint.h>
+#include <vector>
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
#include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include "exec/filter-context.h"
-#include "exec/scan-node.h"
-#include "exec/scanner-context.h"
-#include "runtime/descriptors.h"
+#include "exec/hdfs-scan-node-base.h"
#include "runtime/disk-io-mgr.h"
-#include "util/avro-util.h"
#include "util/counting-barrier.h"
-#include "util/progress-updater.h"
-#include "util/spinlock.h"
#include "util/thread.h"
-#include "gen-cpp/PlanNodes_types.h"
-
namespace impala {
class DescriptorTbl;
-class HdfsScanner;
+class ObjectPool;
+class RuntimeState;
class RowBatch;
-class RuntimeFilter;
-class Status;
-class Tuple;
class TPlanNode;
-class TScanRange;
-
-/// Maintains per file information for files assigned to this scan node. This includes
-/// all the splits for the file. Note that it is not thread-safe.
-struct HdfsFileDesc {
- /// Connection to the filesystem containing the file.
- hdfsFS fs;
-
- /// File name including the path.
- std::string filename;
-
- /// Length of the file. This is not related to which parts of the file have been
- /// assigned to this node.
- int64_t file_length;
-
- /// Last modified time
- int64_t mtime;
-
- THdfsCompression::type file_compression;
-
- /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
- std::vector<DiskIoMgr::ScanRange*> splits;
- HdfsFileDesc(const std::string& filename)
- : filename(filename), file_length(0), mtime(0),
- file_compression(THdfsCompression::NONE) {
- }
-};
-
-/// Struct for additional metadata for scan ranges. This contains the partition id
-/// that this scan range is for.
-struct ScanRangeMetadata {
- /// The partition id that this range is part of.
- int64_t partition_id;
-
- /// For parquet scan ranges we initially create a request for the file footer for each
- /// split; we store a pointer to the actual split so that we can recover its information
- /// for the scanner to process.
- const DiskIoMgr::ScanRange* original_split;
-
- ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split)
- : partition_id(partition_id), original_split(original_split) { }
-};
-/// A ScanNode implementation that is used for all tables read directly from
-/// HDFS-serialised data.
+/// Legacy ScanNode implementation used in the non-multi-threaded execution mode
+/// that is used for all tables read directly from HDFS-serialised data.
/// A HdfsScanNode spawns multiple scanner threads to process the bytes in
/// parallel. There is a handshake between the scan node and the scanners
/// to get all the splits queued and bytes processed.
@@ -111,147 +57,36 @@ struct ScanRangeMetadata {
/// 5. The scanner finishes the scan range and informs the scan node so it can track
/// end of stream.
///
-/// An HdfsScanNode may expect to receive runtime filters produced elsewhere in the plan
-/// (even from remote fragments). These filters arrive asynchronously during execution,
-/// and are applied as soon as they arrive. Filters may be applied by the scan node in the
-/// following scopes:
-///
-/// 1. Per-file (all file formats, partition column filters only) - filtering at this
-/// scope saves IO as the filters are applied before scan ranges are issued.
-/// 2. Per-scan-range (all file formats, partition column filters only) - filtering at
-/// this scope saves CPU as filtered scan ranges are never scanned.
-///
-/// Scanners may also use the same filters to eliminate rows at finer granularities
-/// (e.g. per row).
-///
-/// TODO: this class allocates a bunch of small utility objects that should be
+/// TODO: This class allocates a bunch of small utility objects that should be
/// recycled.
-class HdfsScanNode : public ScanNode {
+/// TODO: Remove this class once the fragment-based multi-threaded execution is
+/// fully functional.
+class HdfsScanNode : public HdfsScanNodeBase {
public:
HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-
~HdfsScanNode();
- /// ExecNode methods
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
virtual Status Prepare(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
- virtual Status Reset(RuntimeState* state);
virtual void Close(RuntimeState* state);
- int limit() const { return limit_; }
-
- const std::vector<SlotDescriptor*>& materialized_slots()
- const { return materialized_slots_; }
-
- /// Returns the tuple idx into the row for this scan node to output to.
- /// Currently this is always 0.
- int tuple_idx() const { return 0; }
-
- /// Returns number of partition keys in the table.
- int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); }
-
- /// Returns number of partition key slots.
- int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
-
- const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
-
- const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; }
-
- const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); }
-
- RuntimeState* runtime_state() { return runtime_state_; }
+ virtual bool HasRowBatchQueue() const { return true; }
- int skip_header_line_count() const { return skip_header_line_count_; }
+ bool done() const { return done_; }
- DiskIoRequestContext* reader_context() { return reader_context_; }
-
- typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap;
- const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; }
-
- RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() {
- return max_compressed_text_file_length_;
- }
-
- const static int SKIP_COLUMN = -1;
-
- /// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if
- /// that path is not materialized.
- int GetMaterializedSlotIdx(const std::vector<int>& path) const {
- PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
- if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
- return result->second;
- }
-
- /// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff
- /// column i should be materialized.
- const bool* is_materialized_col() {
- return reinterpret_cast<const bool*>(&is_materialized_col_[0]);
- }
-
- /// Returns the per format codegen'd function. Scanners call this to get the
- /// codegen'd function to use. Returns NULL if codegen should not be used.
- void* GetCodegenFn(THdfsFileFormat::type);
-
- inline void IncNumScannersCodegenEnabled() {
- num_scanners_codegen_enabled_.Add(1);
- }
-
- inline void IncNumScannersCodegenDisabled() {
- num_scanners_codegen_disabled_.Add(1);
- }
+ /// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
+ virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
+ int num_files_queued);
/// Adds a materialized row batch for the scan node. This is called from scanner
/// threads.
/// This function will block if materialized_row_batches_ is full.
void AddMaterializedRowBatch(RowBatch* row_batch);
- /// Allocate a new scan range object, stored in the runtime state's object pool. For
- /// scan ranges that correspond to the original hdfs splits, the partition id must be
- /// set to the range's partition id. For other ranges (e.g. columns in parquet, read
- /// past buffers), the partition_id is unused. expected_local should be true if this
- /// scan range is not expected to require a remote read. The range must fall within
- /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length.
- /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
- /// metadata of the scan range that is to be allocated.
+ /// Sets the scanner specific metadata for 'filename'.
/// This is thread safe.
- DiskIoMgr::ScanRange* AllocateScanRange(
- hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
- int disk_id, bool try_cache, bool expected_local, int64_t mtime,
- const DiskIoMgr::ScanRange* original_split = NULL);
-
- /// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
- /// 'num_files_queued' indicates how many file's scan ranges have been added
- /// completely. A file's scan ranges are added completely if no new scanner threads
- /// will be needed to process that file besides the additional threads needed to
- /// process those in 'ranges'.
- Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
- int num_files_queued);
-
- /// Adds all splits for file_desc to the io mgr queue and indicates one file has
- /// been added completely.
- inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc) {
- return AddDiskIoRanges(file_desc->splits, 1);
- }
-
- /// Allocates and initialises template_tuple_ with any values from the partition columns
- /// for the current scan range
- /// Returns NULL if there are no partition keys slots.
- /// TODO: cache the tuple template in the partition object.
- Tuple* InitTemplateTuple(RuntimeState* state,
- const std::vector<ExprContext*>& value_ctxs);
-
- /// Allocates and return an empty template tuple (i.e. with no values filled in).
- /// Scanners can use this method to initialize a template tuple even if there are no
- /// partition keys slots (e.g. to hold Avro default values).
- Tuple* InitEmptyTemplateTuple(const TupleDescriptor& tuple_desc);
-
- /// Acquires all allocations from pool into scan_node_pool_. Thread-safe.
- void TransferToScanNodePool(MemPool* pool);
-
- /// Returns the file desc for 'filename'. Returns NULL if filename is invalid.
- HdfsFileDesc* GetFileDesc(const std::string& filename);
+ void SetFileMetadata(const std::string& filename, void* metadata);
/// Gets scanner specific metadata for 'filename'. Scanners can use this to store
/// file header information.
@@ -259,117 +94,18 @@ class HdfsScanNode : public ScanNode {
/// This is thread safe.
void* GetFileMetadata(const std::string& filename);
- /// Sets the scanner specific metadata for 'filename'.
- /// This is thread safe.
- void SetFileMetadata(const std::string& filename, void* metadata);
-
- /// Called by the scanner when a range is complete. Used to trigger done_ and
- /// to log progress. This *must* only be called after the scanner has completely
- /// finished the scan range (i.e. context->Flush()), and has added the final
- /// row batch to the row batch queue. Otherwise the last batch may be
- /// lost due to racing with shutting down the row batch queue.
- void RangeComplete(const THdfsFileFormat::type& file_type,
- const THdfsCompression::type& compression_type);
- /// Same as above except for when multiple compression codecs were used
- /// in the file. The metrics are incremented for each compression_type.
- void RangeComplete(const THdfsFileFormat::type& file_type,
+ /// Called by scanners when a range is complete. Used to record progress and set done_.
+ /// This *must* only be called after a scanner has completely finished its
+ /// scan range (i.e. context->Flush()), and has added the final row batch to the row
+ /// batch queue. Otherwise, we may lose the last batch due to racing with shutting down
+ /// the RowBatch queue.
+ virtual void RangeComplete(const THdfsFileFormat::type& file_type,
const std::vector<THdfsCompression::type>& compression_type);
- /// Utility function to compute the order in which to materialize slots to allow for
- /// computing conjuncts as slots get materialized (on partial tuples).
- /// 'order' will contain for each slot, the first conjunct it is associated with.
- /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
- /// evaluating conjuncts[1]. Slots that are not referenced by any conjuncts will have
- /// order set to conjuncts.size()
- void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
-
- /// Returns true if there are no materialized slots, such as a count(*) over the table.
- inline bool IsZeroSlotTableScan() const {
- return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
- }
-
- /// map from volume id to <number of split, per volume split lengths>
- typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats;
-
- /// Update the per volume stats with the given scan range params list
- static void UpdateHdfsSplitStats(
- const std::vector<TScanRangeParams>& scan_range_params_list,
- PerVolumnStats* per_volume_stats);
-
- /// Output the per_volume_stats to stringstream. The output format is a list of:
- /// <volume id>:<# splits>/<per volume split lengths>
- static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
- std::stringstream* ss);
-
- /// Description string for the per volume stats output.
- static const std::string HDFS_SPLIT_STATS_DESC;
-
- /// Returns true if partition 'partition_id' passes all the filter predicates in
- /// 'filter_ctxs' and should not be filtered out. 'stats_name' is the key of one of the
- /// counter groups in FilterStats, and is used to update the correct statistics.
- ///
- /// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the
- /// function returns true, or a set of filter contexts to evaluate.
- bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name,
- const std::vector<FilterContext>& filter_ctxs);
-
- const std::vector<FilterContext> filter_ctxs() const { return filter_ctxs_; }
-
- protected:
- friend class ScannerContext;
- friend class HdfsScanner;
-
- RuntimeState* runtime_state_;
-
- // Number of header lines to skip at the beginning of each file of this table. Only set
- // to values > 0 for hdfs text files.
- const int skip_header_line_count_;
-
- /// Tuple id resolved in Prepare() to set tuple_desc_
- const int tuple_id_;
-
- /// RequestContext object to use with the disk-io-mgr for reads.
- DiskIoRequestContext* reader_context_;
-
- /// Descriptor for tuples this scan node constructs
- const TupleDescriptor* tuple_desc_;
-
- /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
- /// the partition columns for that partition materialized. Used to filter files and scan
- /// ranges on partition-column filters. Populated in Prepare().
- boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
-
- /// Descriptor for the hdfs table, including partition and format metadata.
- /// Set in Prepare, owned by RuntimeState
- const HdfsTableDescriptor* hdfs_table_;
-
- /// The root of the table's Avro schema, if we're scanning an Avro table.
- ScopedAvroSchemaElement avro_schema_;
-
- /// If true, the warning that some disk ids are unknown was logged. Only log this once
- /// per scan node since it can be noisy.
- bool unknown_disk_id_warned_;
-
- /// Partitions scanned by this scan node.
- boost::unordered_set<int64_t> partition_ids_;
-
- /// File path => file descriptor (which includes the file's splits)
- typedef std::map<std::string, HdfsFileDesc*> FileDescMap;
- FileDescMap file_descs_;
-
- /// File format => file descriptors.
- typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> FileFormatsMap;
- FileFormatsMap per_type_files_;
-
- /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
- /// item tuples). Includes a copy of ExecNode.conjuncts_.
- ConjunctsMap conjuncts_map_;
-
- /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
- /// the first call to GetNext(). The token manager, in a different thread, will read
- /// this variable.
- bool initial_ranges_issued_;
+ /// Acquires all allocations from pool into scan_node_pool_. Thread-safe.
+ void TransferToScanNodePool(MemPool* pool);
+ private:
/// Released when initial ranges are issued in the first call to GetNext().
CountingBarrier ranges_issued_barrier_;
@@ -378,44 +114,6 @@ class HdfsScanNode : public ScanNode {
/// scanner threads.
int64_t scanner_thread_bytes_required_;
- /// Number of files that have not been issued from the scanners.
- AtomicInt32 num_unqueued_files_;
-
- /// Map of HdfsScanner objects to file types. Only one scanner object will be
- /// created for each file type. Objects stored in runtime_state's pool.
- typedef std::map<THdfsFileFormat::type, HdfsScanner*> ScannerMap;
- ScannerMap scanner_map_;
-
- /// Per scanner type codegen'd fn.
- typedef std::map<THdfsFileFormat::type, void*> CodegendFnMap;
- CodegendFnMap codegend_fn_map_;
-
- /// Maps from a slot's path to its index into materialized_slots_.
- typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
- PathToSlotIdxMap path_to_materialized_slot_idx_;
-
- /// List of contexts for expected runtime filters for this scan node. These contexts are
- /// cloned by individual scanners to be used in multi-threaded contexts, passed through
- /// the per-scanner ScannerContext..
- std::vector<FilterContext> filter_ctxs_;
-
- /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
- /// for 0 <= i < total # columns in table
- //
- /// This should be a vector<bool>, but bool vectors are special-cased and not stored
- /// internally as arrays, so instead we store as chars and cast to bools as needed
- std::vector<char> is_materialized_col_;
-
- /// Vector containing slot descriptors for all non-partition key slots. These
- /// descriptors are sorted in order of increasing col_pos.
- std::vector<SlotDescriptor*> materialized_slots_;
-
- /// Vector containing slot descriptors for all partition key slots.
- std::vector<SlotDescriptor*> partition_key_slots_;
-
- /// Keeps track of total splits and the number finished.
- ProgressUpdater progress_;
-
/// Scanner specific per file metadata (e.g. header information) and associated lock.
/// This lock cannot be taken together with any other locks except lock_.
boost::mutex metadata_lock_;
@@ -431,43 +129,15 @@ class HdfsScanNode : public ScanNode {
/// Maximum size of materialized_row_batches_.
int max_materialized_row_batches_;
- /// This is the number of io buffers that are owned by the scan node and the scanners.
- /// This is used just to help debug leaked io buffers to determine if the leak is
- /// happening in the scanners vs other parts of the execution.
- AtomicInt32 num_owned_io_buffers_;
-
- /// Counters which track the number of scanners that have codegen enabled for the
- /// materialize and conjuncts evaluation code paths.
- AtomicInt32 num_scanners_codegen_enabled_;
- AtomicInt32 num_scanners_codegen_disabled_;
-
- /// The size of the largest compressed text file to be scanned. This is used to
- /// estimate scanner thread memory usage.
- RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_;
-
- /// Disk accessed bitmap
- RuntimeProfile::Counter disks_accessed_bitmap_;
-
- /// Total number of bytes read locally
- RuntimeProfile::Counter* bytes_read_local_;
-
- /// Total number of bytes read via short circuit read
- RuntimeProfile::Counter* bytes_read_short_circuit_;
-
- /// Total number of bytes read from data node cache
- RuntimeProfile::Counter* bytes_read_dn_cache_;
-
- /// Total number of remote scan ranges
- RuntimeProfile::Counter* num_remote_ranges_;
-
- /// Total number of bytes read remotely that were expected to be local
- RuntimeProfile::Counter* unexpected_remote_bytes_;
-
/// Lock protects access between scanner thread and main query thread (the one calling
/// GetNext()) for all fields below. If this lock and any other locks needs to be taken
/// together, this lock must be taken first.
boost::mutex lock_;
+ /// Protects file_type_counts_. Cannot be taken together with any other lock
+ /// except lock_, and if so, lock_ must be taken first.
+ SpinLock file_type_counts_;
+
/// Flag signaling that all scanner threads are done. This could be because they
/// are finished, an error/cancellation occurred, or the limit was reached.
/// Setting this to true triggers the scanner threads to clean up.
@@ -478,27 +148,6 @@ class HdfsScanNode : public ScanNode {
/// being processed by scanner threads, but no new ScannerThreads should be started.
bool all_ranges_started_;
- /// Pool for allocating some amounts of memory that is shared between scanners.
- /// e.g. partition key tuple and their string buffers
- boost::scoped_ptr<MemPool> scan_node_pool_;
-
- /// Status of failed operations. This is set in the ScannerThreads
- /// Returned in GetNext() if an error occurred. An non-ok status triggers cleanup
- /// scanner threads.
- Status status_;
-
- /// Mapping of file formats (file type, compression type) to the number of
- /// splits of that type and the lock protecting it.
- /// This lock cannot be taken together with any other lock except lock_.
- SpinLock file_type_counts_lock_;
- typedef std::map<
- std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap;
- FileTypeCountsMap file_type_counts_;
-
- /// If true, counters are actively running and need to be reported in the runtime
- /// profile.
- bool counters_running_;
-
/// The id of the callback added to the thread resource manager when thread token
/// is available. Used to remove the callback before this scan node is destroyed.
/// -1 if no callback is registered.
@@ -519,11 +168,6 @@ class HdfsScanNode : public ScanNode {
/// (e.g., when adding new ranges) or when threads are available for this scan node.
void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
- /// Create and open new scanner for this partition type.
- /// If the scanner is successfully created, it is returned in 'scanner'.
- Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
- ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner);
-
/// Main function for scanner thread. This thread pulls the next range to be
/// processed from the IoMgr and then processes the entire range end to end.
/// This thread terminates when all scan ranges are complete or an error occurred.
@@ -546,42 +190,9 @@ class HdfsScanNode : public ScanNode {
/// Checks for eos conditions and returns batches from materialized_row_batches_.
Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
- /// sets done_ to true and triggers threads to cleanup. Cannot be calld with
+ /// sets done_ to true and triggers threads to cleanup. Cannot be called with
/// any locks taken. Calling it repeatedly ignores subsequent calls.
void SetDone();
-
- /// Stops periodic counters and aggregates counter values for the entire scan node.
- /// This should be called as soon as the scan node is complete to get the most accurate
- /// counter values.
- /// This can be called multiple times, subsequent calls will be ignored.
- /// This must be called on Close() to unregister counters.
- void StopAndFinalizeCounters();
-
- /// Recursively initializes all NULL collection slots to an empty CollectionValue in
- /// addition to maintaining the null bit. Hack to allow UnnestNode to project out
- /// collection slots. Assumes that the null bit has already been un/set.
- /// TODO: remove this function once the TODOs in UnnestNode regarding projection
- /// have been addressed.
- void InitNullCollectionValues(const TupleDescriptor* tuple_desc, Tuple* tuple) const;
-
- /// Helper to call InitNullCollectionValues() on all tuples produced by this scan
- /// in 'row_batch'.
- void InitNullCollectionValues(RowBatch* row_batch) const;
-
- /// Returns false if, according to filters in 'filter_ctxs', 'file' should be filtered
- /// and therefore not processed. 'file_type' is the the format of 'file', and is used
- /// for bookkeeping. Returns true if all filters pass or are not present.
- bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs,
- const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
-
- /// Waits for up to time_ms for runtime filters to arrive, checking every 20ms. Returns
- /// true if all filters arrived within the time limit (as measured from the time of
- /// RuntimeFilterBank::RegisterFilter()), false otherwise.
- bool WaitForRuntimeFilters(int32_t time_ms);
-
- /// Calls ExecDebugAction(). Returns the status based on the debug action specified
- /// for the query.
- Status TriggerDebugAction();
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index 2b053ca..e6da220 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -37,7 +37,7 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_
FieldLocation* fields, int num_tuples, int max_added_tuples,
int slots_per_tuple, int row_idx_start) {
- DCHECK(add_batches_to_queue_);
+ DCHECK(scan_node_->HasRowBatchQueue());
DCHECK(tuple_ != NULL);
uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row);
uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index ad3d834..841d1e4 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -26,6 +26,7 @@
#include "common/object-pool.h"
#include "exec/text-converter.h"
#include "exec/hdfs-scan-node.h"
+#include "exec/hdfs-scan-node-mt.h"
#include "exec/read-write-util.h"
#include "exec/text-converter.inline.h"
#include "exprs/expr-context.h"
@@ -56,20 +57,20 @@ using namespace strings;
const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation";
const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
-HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state,
- bool add_batches_to_queue)
+HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: scan_node_(scan_node),
state_(state),
- add_batches_to_queue_(add_batches_to_queue),
context_(NULL),
stream_(NULL),
+ eos_(false),
scanner_conjunct_ctxs_(NULL),
+ template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
template_tuple_(NULL),
tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
+ num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()),
tuple_(NULL),
batch_(NULL),
tuple_mem_(NULL),
- num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()),
parse_status_(Status::OK()),
decompression_type_(THdfsCompression::NONE),
data_buffer_pool_(new MemPool(scan_node->mem_tracker())),
@@ -80,16 +81,17 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state,
HdfsScanner::HdfsScanner()
: scan_node_(NULL),
state_(NULL),
- add_batches_to_queue_(true),
context_(NULL),
stream_(NULL),
+ eos_(false),
scanner_conjunct_ctxs_(NULL),
+ template_tuple_pool_(NULL),
template_tuple_(NULL),
tuple_byte_size_(-1),
+ num_null_bytes_(-1),
tuple_(NULL),
batch_(NULL),
tuple_mem_(NULL),
- num_null_bytes_(-1),
parse_status_(Status::OK()),
decompression_type_(THdfsCompression::NONE),
data_buffer_pool_(NULL),
@@ -107,19 +109,23 @@ Status HdfsScanner::Open(ScannerContext* context) {
// Clone the scan node's conjuncts map. The cloned contexts must be closed by the
// caller.
- HdfsScanNode::ConjunctsMap::const_iterator iter = scan_node_->conjuncts_map().begin();
- for (; iter != scan_node_->conjuncts_map().end(); ++iter) {
- RETURN_IF_ERROR(Expr::CloneIfNotExists(iter->second,
- scan_node_->runtime_state(), &scanner_conjuncts_map_[iter->first]));
+ for (const auto& entry: scan_node_->conjuncts_map()) {
+ RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second,
+ scan_node_->runtime_state(), &scanner_conjuncts_map_[entry.first]));
}
DCHECK(scanner_conjuncts_map_.find(scan_node_->tuple_desc()->id()) !=
scanner_conjuncts_map_.end());
scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()];
// Initialize the template_tuple_.
- template_tuple_ = scan_node_->InitTemplateTuple(
- state_, context_->partition_descriptor()->partition_key_value_ctxs());
+ vector<ExprContext*> partition_key_value_ctxs;
+ RETURN_IF_ERROR(Expr::CloneIfNotExists(
+ context_->partition_descriptor()->partition_key_value_ctxs(), state_,
+ &partition_key_value_ctxs));
+ template_tuple_ = scan_node_->InitTemplateTuple(partition_key_value_ctxs,
+ template_tuple_pool_.get(), state_);
template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_;
+ Expr::Close(partition_key_value_ctxs, state_);
decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime");
return Status::OK();
@@ -127,10 +133,7 @@ Status HdfsScanner::Open(ScannerContext* context) {
void HdfsScanner::Close(RowBatch* row_batch) {
if (decompressor_.get() != NULL) decompressor_->Close();
- HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin();
- for (; iter != scanner_conjuncts_map_.end(); ++iter) {
- Expr::Close(iter->second, state_);
- }
+ for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_);
obj_pool_.Clear();
stream_ = NULL;
context_->ClearStreams();
@@ -158,7 +161,7 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
}
Status HdfsScanner::StartNewRowBatch() {
- DCHECK(add_batches_to_queue_);
+ DCHECK(scan_node_->HasRowBatchQueue());
batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
scan_node_->mem_tracker());
int64_t tuple_buffer_size;
@@ -168,7 +171,7 @@ Status HdfsScanner::StartNewRowBatch() {
}
int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) {
- DCHECK(add_batches_to_queue_);
+ DCHECK(scan_node_->HasRowBatchQueue());
DCHECK(batch_ != NULL);
DCHECK_GT(batch_->capacity(), batch_->num_rows());
*pool = batch_->tuple_data_pool();
@@ -190,7 +193,7 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool
// TODO(skye): have this check scan_node_->ReachedLimit() and get rid of manual check?
Status HdfsScanner::CommitRows(int num_rows) {
- DCHECK(add_batches_to_queue_);
+ DCHECK(scan_node_->HasRowBatchQueue());
DCHECK(batch_ != NULL);
DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
batch_->CommitRows(num_rows);
@@ -201,16 +204,15 @@ Status HdfsScanner::CommitRows(int num_rows) {
// if no rows passed predicates.
if (batch_->AtCapacity() || context_->num_completed_io_buffers() > 0) {
context_->ReleaseCompletedResources(batch_, /* done */ false);
- scan_node_->AddMaterializedRowBatch(batch_);
+ static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(batch_);
RETURN_IF_ERROR(StartNewRowBatch());
}
if (context_->cancelled()) return Status::CANCELLED;
// Check for UDF errors.
RETURN_IF_ERROR(state_->GetQueryStatus());
// Free local expr allocations for this thread
- HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin();
- for (; iter != scanner_conjuncts_map_.end(); ++iter) {
- ExprContext::FreeLocalAllocations(iter->second);
+ for (const auto& entry: scanner_conjuncts_map_) {
+ ExprContext::FreeLocalAllocations(entry.second);
}
return Status::OK();
}
@@ -357,8 +359,9 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
// eval_fail: ; preds = %parse
// ret i1 false
// }
-Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* codegen,
- const vector<ExprContext*>& conjunct_ctxs, Function** write_complete_tuple_fn) {
+Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
+ LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs,
+ Function** write_complete_tuple_fn) {
*write_complete_tuple_fn = NULL;
SCOPED_TIMER(codegen->codegen_timer());
RuntimeState* state = node->runtime_state();
@@ -565,8 +568,9 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* c
return Status::OK();
}
-Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node, LlvmCodeGen* codegen,
- Function* write_complete_tuple_fn, Function** write_aligned_tuples_fn) {
+Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNodeBase* node,
+ LlvmCodeGen* codegen, Function* write_complete_tuple_fn,
+ Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = NULL;
SCOPED_TIMER(codegen->codegen_timer());
DCHECK(write_complete_tuple_fn != NULL);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 6245e5f..d9fa424 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -27,7 +27,7 @@
#include "codegen/impala-ir.h"
#include "common/object-pool.h"
-#include "exec/hdfs-scan-node.h"
+#include "exec/hdfs-scan-node-base.h"
#include "exec/scan-node.h"
#include "exec/scanner-context.h"
#include "runtime/disk-io-mgr.h"
@@ -75,8 +75,10 @@ struct FieldLocation {
/// 1. Open()
/// 2. ProcessSplit() or GetNext()*
/// 3. Close()
-/// The scanner can be used in either of two modes, indicated via the add_batches_to_queue
-/// c'tor parameter.
+/// The scanner can be used in either of two modes. Which mode is expected to be used
+/// depends on the type of parent scan node. Parent scan nodes with a row batch queue
+/// are expected to call ProcessSplit() and not GetNext(). Row batches will be added to
+/// the scan node's row batch queue, including the final one in Close().
/// ProcessSplit() scans the split and adds materialized row batches to the scan node's
/// row batch queue until the split is complete or an error occurred.
/// GetNext() provides an iterator-like interface where the caller can request
@@ -103,16 +105,15 @@ struct FieldLocation {
/// resources (IO buffers and mem pools) to the current row batch, and passing row batches
/// up to the scan node. Subclasses can also use GetMemory() to help with per-row memory
/// management.
+/// TODO: Have a pass over all members and move them out of the base class if sensible
+/// to clarify which state each concrete scanner type actually has.
class HdfsScanner {
public:
/// Assumed size of an OS file block. Used mostly when reading file format headers, etc.
/// This probably ought to be a derived number from the environment.
const static int FILE_BLOCK_SIZE = 4096;
- /// If 'add_batches_to_queue' is true the caller must call ProcessSplit() and not
- /// GetNext(). Row batches will be added to the scan node's row batch queue, including
- /// the final one in Close().
- HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, bool add_batches_to_queue);
+ HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsScanner();
@@ -125,16 +126,16 @@ class HdfsScanner {
/// returned, 'parse_status_' is guaranteed to be OK as well.
/// The memory referenced by the tuples is valid until this or any subsequently
/// returned batch is reset or destroyed.
- /// Only valid to call if 'add_batches_to_queue_' is false.
- Status GetNext(RowBatch* row_batch, bool* eos) {
- DCHECK(!add_batches_to_queue_);
- return GetNextInternal(row_batch, eos);
+ /// Only valid to call if the parent scan node is single-threaded.
+ Status GetNext(RowBatch* row_batch) {
+ DCHECK(!scan_node_->HasRowBatchQueue());
+ return GetNextInternal(row_batch);
}
/// Process an entire split, reading bytes from the context's streams. Context is
/// initialized with the split data (e.g. template tuple, partition descriptor, etc).
/// This function should only return on error or end of scan range.
- /// Only valid to call if 'add_batches_to_queue_' is true.
+ /// Only valid to call if the parent scan node is multi-threaded.
virtual Status ProcessSplit() = 0;
/// Transfers the ownership of memory backing returned tuples such as IO buffers
@@ -143,9 +144,15 @@ class HdfsScanner {
/// that are not backing returned rows (e.g. temporary decompression buffers).
virtual void Close(RowBatch* row_batch);
- /// Only valid to call if 'add_batches_to_queue_' is true.
+ /// Only valid to call if the parent scan node is single-threaded.
+ bool eos() const {
+ DCHECK(!scan_node_->HasRowBatchQueue());
+ return eos_;
+ }
+
+ /// Only valid to call if the parent scan node is multi-threaded.
RowBatch* batch() const {
- DCHECK(add_batches_to_queue_);
+ DCHECK(scan_node_->HasRowBatchQueue());
return batch_;
}
@@ -177,16 +184,11 @@ class HdfsScanner {
protected:
/// The scan node that started this scanner
- HdfsScanNode* scan_node_;
+ HdfsScanNodeBase* scan_node_;
/// RuntimeState for error reporting
RuntimeState* state_;
- /// True if the creator of this scanner intends to use ProcessSplit() and not GetNext).
- /// Row batches will be added to the scan node's row batch queue, including the final
- /// one in Close().
- const bool add_batches_to_queue_;
-
/// Context for this scanner
ScannerContext* context_;
@@ -196,14 +198,24 @@ class HdfsScanner {
/// The first stream for context_
ScannerContext::Stream* stream_;
+ /// Set if this scanner has processed all ranges and will not produce more rows.
+ /// Only relevant when calling the GetNext() interface.
+ bool eos_;
+
/// Clones of the conjuncts ExprContexts in scan_node_->conjuncts_map(). Each scanner
/// has its own ExprContexts so the conjuncts can be safely evaluated in parallel.
- HdfsScanNode::ConjunctsMap scanner_conjuncts_map_;
+ HdfsScanNodeBase::ConjunctsMap scanner_conjuncts_map_;
// Convenience reference to scanner_conjuncts_map_[scan_node_->tuple_idx()] for scanners
// that do not support nested types.
const std::vector<ExprContext*>* scanner_conjunct_ctxs_;
+ /// Holds memory for template tuples. The memory in this pool must remain valid as long
+ /// as the row batches produced by this scanner. This typically means that the
+ /// ownership is transferred to the last row batch in Close(). Some scanners transfer
+ /// the ownership to the parent scan node instead due being closed multiple times.
+ boost::scoped_ptr<MemPool> template_tuple_pool_;
+
/// A template tuple is a partially-materialized tuple with only partition key slots set
/// (or other default values, such as NULL for columns missing in a file). The other
/// slots are set to NULL. The template tuple must be copied into output tuples before
@@ -211,9 +223,8 @@ class HdfsScanner {
///
/// Each tuple descriptor (i.e. scan_node_->tuple_desc() and any collection item tuple
/// descs) has a template tuple, or NULL if there are no partition key or default slots.
- /// Template tuples are computed once for each file and valid for the duration of that
- /// file. They are owned by the HDFS scan node, although each scanner has its own
- /// template tuples.
+ /// Template tuples are computed once for each file and are allocated from
+ /// template_tuple_pool_.
std::map<const TupleDescriptor*, Tuple*> template_tuple_map_;
/// Convenience variable set to the top-level template tuple
@@ -221,7 +232,10 @@ class HdfsScanner {
Tuple* template_tuple_;
/// Fixed size of each top-level tuple, in bytes
- int tuple_byte_size_;
+ const int32_t tuple_byte_size_;
+
+ /// Number of null bytes in the top-level tuple.
+ const int32_t num_null_bytes_;
/// Current tuple pointer into tuple_mem_.
Tuple* tuple_;
@@ -239,9 +253,6 @@ class HdfsScanner {
/// Helper class for converting text to other types;
boost::scoped_ptr<TextConverter> text_converter_;
- /// Number of null bytes in the top-level tuple.
- int32_t num_null_bytes_;
-
/// Contains current parse status to minimize the number of Status objects returned.
/// This significantly minimizes the cross compile dependencies for llvm since status
/// objects inline a bunch of string functions. Also, status objects aren't extremely
@@ -269,8 +280,8 @@ class HdfsScanner {
WriteTuplesFn write_tuples_fn_;
/// Implements GetNext(). Should be overridden by subclasses.
- /// May be called even if 'add_batches_to_queue_' is true.
- virtual Status GetNextInternal(RowBatch* row_batch, bool* eos) {
+ /// Only valid to call if the parent scan node is multi-threaded.
+ virtual Status GetNextInternal(RowBatch* row_batch) {
DCHECK(false) << "GetNextInternal() not implemented for this scanner type.";
return Status::OK();
}
@@ -283,7 +294,7 @@ class HdfsScanner {
THdfsFileFormat::type type, const std::string& scanner_name);
/// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly.
- /// Only valid to call if 'add_batches_to_queue_' is true.
+ /// Only valid to call if the parent scan node is multi-threaded.
Status StartNewRowBatch();
/// Reset internal state for a new scan range.
@@ -297,7 +308,7 @@ class HdfsScanner {
/// current row batch is complete and a new one is allocated).
/// Memory returned from this call is invalidated after calling CommitRows().
/// Callers must call GetMemory() again after calling this function.
- /// Only valid to call if 'add_batches_to_queue_' is true.
+ /// Only valid to call if the parent scan node is multi-threaded.
int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem);
/// Gets memory for outputting tuples into the CollectionValue being constructed via
@@ -316,15 +327,15 @@ class HdfsScanner {
/// Returns Status::OK if the query is not cancelled and hasn't exceeded any mem limits.
/// Scanner can call this with 0 rows to flush any pending resources (attached pools
/// and io buffers) to minimize memory consumption.
- /// Only valid to call if 'add_batches_to_queue_' is true.
+ /// Only valid to call if the parent scan node is multi-threaded.
Status CommitRows(int num_rows);
/// Release all memory in 'pool' to batch_. If commit_batch is true, the row batch
/// will be committed. commit_batch should be true if the attached pool is expected
/// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage.
- /// Only valid to call if 'add_batches_to_queue_' is true.
+ /// Only valid to call if the parent scan node is multi-threaded.
void AttachPool(MemPool* pool, bool commit_batch) {
- DCHECK(add_batches_to_queue_);
+ DCHECK(scan_node_->HasRowBatchQueue());
DCHECK(batch_ != NULL);
DCHECK(pool != NULL);
batch_->tuple_data_pool()->AcquireData(pool, false);
@@ -359,7 +370,7 @@ class HdfsScanner {
/// Returns the number of tuples added to the row batch. This can be less than
/// num_tuples/tuples_till_limit because of failed conjuncts.
/// Returns -1 if parsing should be aborted due to parse errors.
- /// Only valid to call if 'add_batches_to_queue_' is true.
+ /// Only valid to call if the parent scan node is multi-threaded.
int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size,
FieldLocation* fields, int num_tuples,
int max_added_tuples, int slots_per_tuple, int row_start_indx);
@@ -410,7 +421,7 @@ class HdfsScanner {
/// Codegen function to replace WriteCompleteTuple. Should behave identically
/// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
/// if codegen was successful or NULL otherwise.
- static Status CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*,
+ static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen,
const std::vector<ExprContext*>& conjunct_ctxs,
llvm::Function** write_complete_tuple_fn);
@@ -418,7 +429,7 @@ class HdfsScanner {
/// compiled to IR. This function loads the precompiled IR function, modifies it,
/// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
/// successful or NULL otherwise.
- static Status CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*,
+ static Status CodegenWriteAlignedTuples(HdfsScanNodeBase*, LlvmCodeGen*,
llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
/// Report parse error for column @ desc. If abort_on_error is true, sets
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index ec52901..03a07ce 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -41,9 +41,9 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
-HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state,
- bool add_batches_to_queue)
- : BaseSequenceScanner(scan_node, state, add_batches_to_queue),
+HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNodeBase* scan_node,
+ RuntimeState* state)
+ : BaseSequenceScanner(scan_node, state),
unparsed_data_buffer_(NULL),
num_buffered_records_in_compressed_block_(0) {
}
@@ -52,7 +52,7 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
}
// Codegen for materialized parsed data into tuples.
-Status HdfsSequenceScanner::Codegen(HdfsScanNode* node,
+Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = NULL;
if (!node->runtime_state()->codegen_enabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index e2dcee9..5d6074f 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -162,8 +162,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
/// SeqFile file: {'S', 'E', 'Q', 6}
static const uint8_t SEQFILE_VERSION_HEADER[4];
- HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state,
- bool add_batches_to_queue);
+ HdfsSequenceScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsSequenceScanner();
@@ -172,7 +171,8 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
- static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs,
+ static Status Codegen(HdfsScanNodeBase* node,
+ const std::vector<ExprContext*>& conjunct_ctxs,
llvm::Function** write_aligned_tuples_fn);
protected:
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f1c80b0..f3f0a7c 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -50,9 +50,8 @@ const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index";
// progress.
const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024;
-HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
- bool add_batches_to_queue)
- : HdfsScanner(scan_node, state, add_batches_to_queue),
+HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+ : HdfsScanner(scan_node, state),
byte_buffer_ptr_(NULL),
byte_buffer_end_(NULL),
byte_buffer_read_size_(0),
@@ -67,7 +66,7 @@ HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
HdfsTextScanner::~HdfsTextScanner() {
}
-Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node,
+Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges;
int compressed_text_files = 0;
@@ -108,7 +107,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node,
// Populate the list of compressed text scan ranges.
DCHECK_GT(files[i]->file_length, 0);
ScanRangeMetadata* metadata =
- reinterpret_cast<ScanRangeMetadata*>(split->meta_data());
+ static_cast<ScanRangeMetadata*>(split->meta_data());
DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange(
files[i]->fs, files[i]->filename.c_str(), files[i]->file_length, 0,
metadata->partition_id, split->disk_id(), split->try_cache(),
@@ -149,7 +148,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node,
}
Status HdfsTextScanner::ProcessSplit() {
- DCHECK(add_batches_to_queue_);
+ DCHECK(scan_node_->HasRowBatchQueue());
// Reset state for new scan range
RETURN_IF_ERROR(InitNewRange());
@@ -187,12 +186,19 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
decompressor_.reset(NULL);
}
if (row_batch != NULL) {
+ row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false);
context_->ReleaseCompletedResources(row_batch, true);
- if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch);
+ if (scan_node_->HasRowBatchQueue()) {
+ static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+ }
+ } else {
+ if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll();
}
+
// Verify all resources (if any) have been transferred.
+ DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0);
DCHECK_EQ(context_->num_completed_io_buffers(), 0);
@@ -687,7 +693,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
// Codegen for materializing parsed data into tuples. The function WriteCompleteTuple is
// codegen'd using the IRBuilder for the specific tuple description. This function
// is then injected into the cross-compiled driving function, WriteAlignedTuples().
-Status HdfsTextScanner::Codegen(HdfsScanNode* node,
+Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = NULL;
if (!node->runtime_state()->codegen_enabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 937626f..e68d45f 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -47,8 +47,7 @@ struct HdfsFileDesc;
/// scanner for the tuple directly after it.
class HdfsTextScanner : public HdfsScanner {
public:
- HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
- bool add_batches_to_queue);
+ HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsTextScanner();
/// Implementation of HdfsScanner interface.
@@ -57,12 +56,13 @@ class HdfsTextScanner : public HdfsScanner {
virtual void Close(RowBatch* row_batch);
/// Issue io manager byte ranges for 'files'.
- static Status IssueInitialRanges(HdfsScanNode* scan_node,
+ static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files);
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
- static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs,
+ static Status Codegen(HdfsScanNodeBase* node,
+ const std::vector<ExprContext*>& conjunct_ctxs,
llvm::Function** write_aligned_tuples_fn);
/// Suffix for lzo index files.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 69d682e..033ab82 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -19,6 +19,7 @@
#include <gutil/strings/substitute.h>
+#include "exec/hdfs-scan-node-base.h"
#include "exec/hdfs-scan-node.h"
#include "runtime/row-batch.h"
#include "runtime/mem-pool.h"
@@ -40,7 +41,7 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
// output_buffer_bytes_left_ will be set to something else.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
-ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNode* scan_node,
+ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range,
const vector<FilterContext>& filter_ctxs)
: state_(state),
@@ -311,7 +312,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
}
bool ScannerContext::cancelled() const {
- return scan_node_->done_;
+ if (!scan_node_->HasRowBatchQueue()) return false;
+ return static_cast<HdfsScanNode*>(scan_node_)->done();
}
Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 9178324..1f1bc0d 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -31,7 +31,7 @@ namespace impala {
struct HdfsFileDesc;
class HdfsPartitionDescriptor;
-class HdfsScanNode;
+class HdfsScanNodeBase;
class MemPool;
class RowBatch;
class RuntimeState;
@@ -55,12 +55,14 @@ class TupleRow;
/// from processing the bytes. This is the consumer.
/// 3. The scan node/main thread which calls into the context to trigger cancellation
/// or other end of stream conditions.
+/// TODO: Some of the synchronization mechanisms such as cancelled() can be removed
+/// once the legacy hdfs scan node has been removed.
class ScannerContext {
public:
/// Create a scanner context with the parent scan_node (where materialized row batches
/// get pushed to) and the scan range to process.
/// This context starts with 1 stream.
- ScannerContext(RuntimeState*, HdfsScanNode*, HdfsPartitionDescriptor*,
+ ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs);
/// Destructor verifies that all stream objects have been released.
@@ -295,7 +297,8 @@ class ScannerContext {
/// The stream is created in the runtime state's object pool
Stream* AddStream(DiskIoMgr::ScanRange* range);
- /// If true, the ScanNode has been cancelled and the scanner thread should finish up
+ /// Returns false it scan_node_ is multi-threaded and has been cancelled.
+ /// Always returns false if the scan_node_ is not multi-threaded.
bool cancelled() const;
int num_completed_io_buffers() const { return num_completed_io_buffers_; }
@@ -306,7 +309,7 @@ class ScannerContext {
friend class Stream;
RuntimeState* state_;
- HdfsScanNode* scan_node_;
+ HdfsScanNodeBase* scan_node_;
HdfsPartitionDescriptor* partition_desc_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 3c5d5af..09de6a5 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -110,8 +110,9 @@ class ExprContext {
return fn_contexts_[i];
}
- Expr* root() { return root_; }
- bool closed() { return closed_; }
+ Expr* root() const { return root_; }
+ bool closed() const { return closed_; }
+ bool is_clone() const { return is_clone_; }
/// Calls Get*Val on root_
BooleanVal GetBooleanVal(TupleRow* row);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 10811ea..3a3e399 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -19,7 +19,6 @@
#ifndef IMPALA_RUNTIME_TUPLE_H
#define IMPALA_RUNTIME_TUPLE_H
-#include <cstring>
#include "codegen/impala-ir.h"
#include "common/logging.h"
#include "gutil/macros.h"
@@ -70,9 +69,7 @@ class Tuple {
return result;
}
- void Init(int size) {
- bzero(this, size);
- }
+ void Init(int size) { memset(this, 0, size); }
/// The total size of all data represented in this tuple (tuple data and referenced
/// string and collection data).
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index f4b070f..bf03d98 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -183,10 +183,9 @@ struct TQueryOptions {
43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
// Multi-threaded execution: number of cores per query per node.
- // > 1: multi-threaded execution mode, with given number of cores
- // 1: single-threaded execution mode
- // 0: multi-threaded execution mode, number of cores is the pool default
- 44: optional i32 mt_num_cores = 1
+ // > 0: multi-threaded execution mode, with given number of cores
+ // 0: single-threaded execution mode
+ 44: optional i32 mt_num_cores = 0
// If true, INSERT writes to S3 go directly to their final location rather than being
// copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/fe/src/main/java/com/cloudera/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
index 83af818..2d38396 100644
--- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java
+++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
@@ -956,7 +956,7 @@ public class Frontend {
LOG.debug("create plan");
Planner planner = new Planner(analysisResult, queryCtx);
if (RuntimeEnv.INSTANCE.isTestEnv()
- && queryCtx.request.query_options.mt_num_cores != 1) {
+ && queryCtx.request.query_options.mt_num_cores > 0) {
// TODO: this is just to be able to run tests; implement this
List<PlanFragment> planRoots = planner.createParallelPlans();
for (PlanFragment planRoot: planRoots) {