You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/08 04:39:36 UTC
[6/7] incubator-impala git commit: IMPALA-3905: Add single-threaded
scan node.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
new file mode 100644
index 0000000..7ea4b9d
--- /dev/null
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -0,0 +1,459 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_
+#define IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_
+
+#include <stdint.h>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/unordered_map.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/filter-context.h"
+#include "exec/scan-node.h"
+#include "runtime/descriptors.h"
+#include "runtime/disk-io-mgr.h"
+#include "util/avro-util.h"
+#include "util/progress-updater.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class ScannerContext;
+class DescriptorTbl;
+class HdfsScanner;
+class RowBatch;
+class Status;
+class Tuple;
+class TPlanNode;
+class TScanRange;
+
+/// Maintains per file information for files assigned to this scan node. This includes
+/// all the splits for the file. Note that it is not thread-safe.
+struct HdfsFileDesc {
+ HdfsFileDesc(const std::string& filename)
+ : fs(NULL), filename(filename), file_length(0), mtime(0),
+ file_compression(THdfsCompression::NONE) {
+ }
+
+ /// Connection to the filesystem containing the file.
+ hdfsFS fs;
+
+ /// File name including the path.
+ std::string filename;
+
+ /// Length of the file. This is not related to which parts of the file have been
+ /// assigned to this node.
+ int64_t file_length;
+
+ /// Last modified time
+ int64_t mtime;
+
+ THdfsCompression::type file_compression;
+
+ /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
+ std::vector<DiskIoMgr::ScanRange*> splits;
+};
+
+/// Struct for additional metadata for scan ranges. This contains the partition id
+/// that this scan range is for.
+struct ScanRangeMetadata {
+ /// The partition id that this range is part of.
+ int64_t partition_id;
+
+ /// For parquet scan ranges we initially create a request for the file footer for each
+ /// split; we store a pointer to the actual split so that we can recover its information
+ /// for the scanner to process.
+ const DiskIoMgr::ScanRange* original_split;
+
+ ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split)
+ : partition_id(partition_id), original_split(original_split) { }
+};
+
+
+/// Base class for all Hdfs scan nodes. Contains common members and functions
+/// that are independent of whether batches are materialized by the main thread
+/// (via HdfsScanner::GexNext()) or by spinning up separate threads that feed
+/// into a RowBatch queue (via HdfsScanner::ProcessSplit()). Those specifics
+/// are expected to be implemented in subclasses.
+///
+/// Subclasses may expect to receive runtime filters produced elsewhere in the plan
+/// (even from remote fragments). These filters arrive asynchronously during execution,
+/// and are applied as soon as they arrive. Filters may be applied by the scan node in
+/// the following scopes:
+///
+/// 1. Per-file (all file formats, partition column filters only) - filtering at this
+/// scope saves IO as the filters are applied before scan ranges are issued.
+/// 2. Per-scan-range (all file formats, partition column filters only) - filtering at
+/// this scope saves CPU as filtered scan ranges are never scanned.
+///
+/// Scanners may also use the same filters to eliminate rows at finer granularities
+/// (e.g. per row).
+///
+/// TODO: Revisit and minimize metrics. Move those specific to legacy multi-threaded
+/// scans into HdfsScanNode.
+/// TODO: Once the legacy scan node has been removed, several functions can be made
+/// non-virtual. Also merge this class with HdfsScanNodeMt.
+class HdfsScanNodeBase : public ScanNode {
+ public:
+ HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+ ~HdfsScanNodeBase();
+
+ virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
+ virtual Status Prepare(RuntimeState* state);
+ virtual Status Open(RuntimeState* state);
+ virtual Status Reset(RuntimeState* state);
+ virtual void Close(RuntimeState* state);
+
+ /// Returns true if this node uses separate threads for scanners that append RowBatches
+ /// to a queue, false otherwise.
+ virtual bool HasRowBatchQueue() const = 0;
+
+ int limit() const { return limit_; }
+
+ const std::vector<SlotDescriptor*>& materialized_slots()
+ const { return materialized_slots_; }
+
+ /// Returns the tuple idx into the row for this scan node to output to.
+ /// Currently this is always 0.
+ int tuple_idx() const { return 0; }
+
+ /// Returns number of partition keys in the table.
+ int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); }
+
+ /// Returns number of partition key slots.
+ int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
+
+ const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
+ const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; }
+ const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); }
+ RuntimeState* runtime_state() { return runtime_state_; }
+ int skip_header_line_count() const { return skip_header_line_count_; }
+ DiskIoRequestContext* reader_context() { return reader_context_; }
+
+ typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap;
+ const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; }
+
+ RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() {
+ return max_compressed_text_file_length_;
+ }
+
+ const static int SKIP_COLUMN = -1;
+
+ /// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if
+ /// that path is not materialized.
+ int GetMaterializedSlotIdx(const std::vector<int>& path) const {
+ PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
+ if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
+ return result->second;
+ }
+
+ /// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff
+ /// column i should be materialized.
+ const bool* is_materialized_col() {
+ return reinterpret_cast<const bool*>(&is_materialized_col_[0]);
+ }
+
+ /// Returns the per format codegen'd function. Scanners call this to get the
+ /// codegen'd function to use. Returns NULL if codegen should not be used.
+ void* GetCodegenFn(THdfsFileFormat::type);
+
+ inline void IncNumScannersCodegenEnabled() { num_scanners_codegen_enabled_.Add(1); }
+ inline void IncNumScannersCodegenDisabled() { num_scanners_codegen_disabled_.Add(1); }
+
+ /// Allocate a new scan range object, stored in the runtime state's object pool. For
+ /// scan ranges that correspond to the original hdfs splits, the partition id must be
+ /// set to the range's partition id. For other ranges (e.g. columns in parquet, read
+ /// past buffers), the partition_id is unused. expected_local should be true if this
+ /// scan range is not expected to require a remote read. The range must fall within
+ /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length.
+ /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
+ /// metadata of the scan range that is to be allocated.
+ /// This is thread safe.
+ DiskIoMgr::ScanRange* AllocateScanRange(
+ hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
+ int disk_id, bool try_cache, bool expected_local, int64_t mtime,
+ const DiskIoMgr::ScanRange* original_split = NULL);
+
+ /// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many file's scan
+ /// ranges have been added completely. A file's scan ranges are added completely if no
+ /// new scanner threads will be needed to process that file besides the additional
+ /// threads needed to process those in 'ranges'.
+ /// Can be overridden to add scan-node specific actions like starting scanner threads.
+ virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
+ int num_files_queued);
+
+ /// Adds all splits for file_desc to the io mgr queue and indicates one file has
+ /// been added completely.
+ inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc) {
+ return AddDiskIoRanges(file_desc->splits, 1);
+ }
+
+ /// Allocates and initializes a new template tuple allocated from pool with values
+ /// from the partition columns for the current scan range, if any,
+ /// Returns NULL if there are no partition keys slots.
+ Tuple* InitTemplateTuple(const std::vector<ExprContext*>& value_ctxs,
+ MemPool* pool, RuntimeState* state) const;
+
+ /// Returns the file desc for 'filename'. Returns NULL if filename is invalid.
+ HdfsFileDesc* GetFileDesc(const std::string& filename);
+
+ /// Called by scanners when a range is complete. Used to record progress.
+ /// This *must* only be called after a scanner has completely finished its
+ /// scan range (i.e. context->Flush()), and has returned the final row batch.
+ /// Otherwise, scan nodes using a RowBatch queue may lose the last batch due
+ /// to racing with shutting down the queue.
+ void RangeComplete(const THdfsFileFormat::type& file_type,
+ const THdfsCompression::type& compression_type);
+ /// Same as above except for when multiple compression codecs were used
+ /// in the file. The metrics are incremented for each compression_type.
+ virtual void RangeComplete(const THdfsFileFormat::type& file_type,
+ const std::vector<THdfsCompression::type>& compression_type);
+
+ /// Utility function to compute the order in which to materialize slots to allow for
+ /// computing conjuncts as slots get materialized (on partial tuples).
+ /// 'order' will contain for each slot, the first conjunct it is associated with.
+ /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
+ /// evaluating conjuncts[1]. Slots that are not referenced by any conjuncts will have
+ /// order set to conjuncts.size()
+ void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
+
+ /// Returns true if there are no materialized slots, such as a count(*) over the table.
+ inline bool IsZeroSlotTableScan() const {
+ return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
+ }
+
+ /// map from volume id to <number of split, per volume split lengths>
+ typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats;
+
+ /// Update the per volume stats with the given scan range params list
+ static void UpdateHdfsSplitStats(
+ const std::vector<TScanRangeParams>& scan_range_params_list,
+ PerVolumnStats* per_volume_stats);
+
+ /// Output the per_volume_stats to stringstream. The output format is a list of:
+ /// <volume id>:<# splits>/<per volume split lengths>
+ static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
+ std::stringstream* ss);
+
+ /// Description string for the per volume stats output.
+ static const std::string HDFS_SPLIT_STATS_DESC;
+
+ /// Returns true if partition 'partition_id' passes all the filter predicates in
+ /// 'filter_ctxs' and should not be filtered out. 'stats_name' is the key of one of the
+ /// counter groups in FilterStats, and is used to update the correct statistics.
+ ///
+ /// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the
+ /// function returns true, or a set of filter contexts to evaluate.
+ bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name,
+ const std::vector<FilterContext>& filter_ctxs);
+
+ const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
+
+ protected:
+ friend class ScannerContext;
+ friend class HdfsScanner;
+
+ RuntimeState* runtime_state_;
+
+ // Number of header lines to skip at the beginning of each file of this table. Only set
+ // to values > 0 for hdfs text files.
+ const int skip_header_line_count_;
+
+ /// Tuple id resolved in Prepare() to set tuple_desc_
+ const int tuple_id_;
+
+ /// RequestContext object to use with the disk-io-mgr for reads.
+ DiskIoRequestContext* reader_context_;
+
+ /// Descriptor for tuples this scan node constructs
+ const TupleDescriptor* tuple_desc_;
+
+ /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
+ /// the partition columns for that partition materialized. Used to filter files and scan
+ /// ranges on partition-column filters. Populated in Prepare().
+ boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
+
+ /// Descriptor for the hdfs table, including partition and format metadata.
+ /// Set in Prepare, owned by RuntimeState
+ const HdfsTableDescriptor* hdfs_table_;
+
+ /// The root of the table's Avro schema, if we're scanning an Avro table.
+ ScopedAvroSchemaElement avro_schema_;
+
+ /// If true, the warning that some disk ids are unknown was logged. Only log this once
+ /// per scan node since it can be noisy.
+ bool unknown_disk_id_warned_;
+
+ /// Partitions scanned by this scan node.
+ std::unordered_set<int64_t> partition_ids_;
+
+ /// File path => file descriptor (which includes the file's splits)
+ typedef std::map<std::string, HdfsFileDesc*> FileDescMap;
+ FileDescMap file_descs_;
+
+ /// File format => file descriptors.
+ typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> FileFormatsMap;
+ FileFormatsMap per_type_files_;
+
+ /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
+ /// item tuples). Includes a copy of ExecNode.conjuncts_.
+ ConjunctsMap conjuncts_map_;
+
+ /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
+ /// the first call to GetNext(). The token manager, in a different thread, will read
+ /// this variable.
+ bool initial_ranges_issued_;
+
+ /// Number of files that have not been issued from the scanners.
+ AtomicInt32 num_unqueued_files_;
+
+ /// Per scanner type codegen'd fn.
+ typedef std::map<THdfsFileFormat::type, void*> CodegendFnMap;
+ CodegendFnMap codegend_fn_map_;
+
+ /// Maps from a slot's path to its index into materialized_slots_.
+ typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
+ PathToSlotIdxMap path_to_materialized_slot_idx_;
+
+ /// List of contexts for expected runtime filters for this scan node. These contexts are
+ /// cloned by individual scanners to be used in multi-threaded contexts, passed through
+ /// the per-scanner ScannerContext..
+ std::vector<FilterContext> filter_ctxs_;
+
+ /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
+ /// for 0 <= i < total # columns in table
+ //
+ /// This should be a vector<bool>, but bool vectors are special-cased and not stored
+ /// internally as arrays, so instead we store as chars and cast to bools as needed
+ std::vector<char> is_materialized_col_;
+
+ /// Vector containing slot descriptors for all non-partition key slots. These
+ /// descriptors are sorted in order of increasing col_pos.
+ std::vector<SlotDescriptor*> materialized_slots_;
+
+ /// Vector containing slot descriptors for all partition key slots.
+ std::vector<SlotDescriptor*> partition_key_slots_;
+
+ /// Keeps track of total splits and the number finished.
+ ProgressUpdater progress_;
+
+ /// Counters which track the number of scanners that have codegen enabled for the
+ /// materialize and conjuncts evaluation code paths.
+ AtomicInt32 num_scanners_codegen_enabled_;
+ AtomicInt32 num_scanners_codegen_disabled_;
+
+ /// This is the number of io buffers that are owned by the scan node and the scanners.
+ /// This is used just to help debug leaked io buffers to determine if the leak is
+ /// happening in the scanners vs other parts of the execution.
+ /// TODO: Remove this counter when deprecating the multi-threaded scan node.
+ AtomicInt32 num_owned_io_buffers_;
+
+ /// If true, counters are actively running and need to be reported in the runtime
+ /// profile.
+ bool counters_running_;
+
+ /// The size of the largest compressed text file to be scanned. This is used to
+ /// estimate scanner thread memory usage.
+ RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_;
+
+ /// Disk accessed bitmap
+ RuntimeProfile::Counter disks_accessed_bitmap_;
+
+ /// Total number of bytes read locally
+ RuntimeProfile::Counter* bytes_read_local_;
+
+ /// Total number of bytes read via short circuit read
+ RuntimeProfile::Counter* bytes_read_short_circuit_;
+
+ /// Total number of bytes read from data node cache
+ RuntimeProfile::Counter* bytes_read_dn_cache_;
+
+ /// Total number of remote scan ranges
+ RuntimeProfile::Counter* num_remote_ranges_;
+
+ /// Total number of bytes read remotely that were expected to be local
+ RuntimeProfile::Counter* unexpected_remote_bytes_;
+
+ /// Pool for allocating some amounts of memory that is shared between scanners.
+ /// e.g. partition key tuple and their string buffers
+ boost::scoped_ptr<MemPool> scan_node_pool_;
+
+ /// Status of failed operations. This is set in the ScannerThreads
+ /// Returned in GetNext() if an error occurred. An non-ok status triggers cleanup
+ /// scanner threads.
+ Status status_;
+
+ /// Mapping of file formats (file type, compression type) to the number of
+ /// splits of that type and the lock protecting it.
+ typedef std::map<
+ std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap;
+ FileTypeCountsMap file_type_counts_;
+
+ /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and
+ /// issues initial ranges for all file types. Waits for runtime filters if necessary.
+ /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
+ Status IssueInitialScanRanges(RuntimeState* state);
+
+ /// Create and open new scanner for this partition type.
+ /// If the scanner is successfully created, it is returned in 'scanner'.
+ /// Passes 'add_batches_to_queue' to the scanner constructor.
+ Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+ ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner);
+
+ /// Recursively initializes all NULL collection slots to an empty CollectionValue in
+ /// addition to maintaining the null bit. Hack to allow UnnestNode to project out
+ /// collection slots. Assumes that the null bit has already been un/set.
+ /// TODO: Remove this function once the TODOs in UnnestNode regarding projection
+ /// have been addressed.
+ void InitNullCollectionValues(const TupleDescriptor* tuple_desc, Tuple* tuple) const;
+
+ /// Helper to call InitNullCollectionValues() on all tuples produced by this scan
+ /// in 'row_batch'.
+ void InitNullCollectionValues(RowBatch* row_batch) const;
+
+ /// Returns false if, according to filters in 'filter_ctxs', 'file' should be filtered
+ /// and therefore not processed. 'file_type' is the the format of 'file', and is used
+ /// for bookkeeping. Returns true if all filters pass or are not present.
+ bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs,
+ const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
+
+ /// Waits for up to time_ms for runtime filters to arrive, checking every 20ms. Returns
+ /// true if all filters arrived within the time limit (as measured from the time of
+ /// RuntimeFilterBank::RegisterFilter()), false otherwise.
+ bool WaitForRuntimeFilters(int32_t time_ms);
+
+ /// Stops periodic counters and aggregates counter values for the entire scan node.
+ /// This should be called as soon as the scan node is complete to get the most accurate
+ /// counter values.
+ /// This can be called multiple times, subsequent calls will be ignored.
+ /// This must be called on Close() to unregister counters.
+ /// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
+ void StopAndFinalizeCounters();
+
+ /// Calls ExecDebugAction(). Returns the status based on the debug action specified
+ /// for the query.
+ Status TriggerDebugAction();
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
new file mode 100644
index 0000000..8c61326
--- /dev/null
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-scan-node-mt.h"
+
+#include <sstream>
+
+#include "runtime/runtime-state.h"
+#include "runtime/row-batch.h"
+#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+using std::stringstream;
+
+namespace impala {
+
+HdfsScanNodeMt::HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : HdfsScanNodeBase(pool, tnode, descs),
+ scan_range_(NULL),
+ scanner_(NULL) {
+}
+
+HdfsScanNodeMt::~HdfsScanNodeMt() {
+}
+
+Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
+ // Return an error if this scan node has been assigned a range that is not supported
+ // because the scanner of the corresponding file format does implement GetNext().
+ for (const auto& files: per_type_files_) {
+ if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET) {
+ stringstream msg;
+ msg << "Unsupported file format with HdfsScanNodeMt: " << files.first;
+ return Status(msg.str());
+ }
+ }
+ return Status::OK();
+}
+
+Status HdfsScanNodeMt::Open(RuntimeState* state) {
+ SCOPED_TIMER(runtime_profile_->total_time_counter());
+ RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
+ DCHECK(!initial_ranges_issued_);
+ RETURN_IF_ERROR(IssueInitialScanRanges(state));
+ return Status::OK();
+}
+
+Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+ SCOPED_TIMER(runtime_profile_->total_time_counter());
+ RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(QueryMaintenance(state));
+
+ DCHECK(scan_range_ == NULL || scanner_ != NULL);
+ if (scan_range_ == NULL || scanner_->eos()) {
+ if (scanner_ != NULL && scanner_->eos()) {
+ scanner_->Close(row_batch);
+ scanner_.reset();
+ }
+ RETURN_IF_ERROR(
+ runtime_state_->io_mgr()->GetNextRange(reader_context_, &scan_range_));
+ if (scan_range_ == NULL) {
+ *eos = true;
+ StopAndFinalizeCounters();
+ return Status::OK();
+ }
+ ScanRangeMetadata* metadata =
+ static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
+ int64_t partition_id = metadata->partition_id;
+ HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
+ scanner_ctx_.reset(new ScannerContext(
+ runtime_state_, this, partition, scan_range_, filter_ctxs()));
+ RETURN_IF_ERROR(CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_));
+ }
+
+ Status status = scanner_->GetNext(row_batch);
+ if (!status.ok()) {
+ scanner_->Close(row_batch);
+ scanner_.reset();
+ num_owned_io_buffers_.Add(-row_batch->num_io_buffers());
+ return status;
+ }
+
+ num_rows_returned_ += row_batch->num_rows();
+ if (ReachedLimit()) {
+ int num_rows_over = num_rows_returned_ - limit_;
+ row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
+ num_rows_returned_ -= num_rows_over;
+ scan_range_ = NULL;
+ scanner_->Close(row_batch);
+ scanner_.reset();
+ *eos = true;
+ }
+ COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+ num_owned_io_buffers_.Add(-row_batch->num_io_buffers());
+
+ if (*eos) StopAndFinalizeCounters();
+ return Status::OK();
+}
+
+void HdfsScanNodeMt::Close(RuntimeState* state) {
+ if (is_closed()) return;
+ scanner_.reset();
+ scanner_ctx_.reset();
+ HdfsScanNodeBase::Close(state);
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
new file mode 100644
index 0000000..7829d47
--- /dev/null
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_MT_H_
+#define IMPALA_EXEC_HDFS_SCAN_NODE_MT_H_
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/hdfs-scanner.h"
+#include "exec/hdfs-scan-node-base.h"
+#include "exec/scanner-context.h"
+
+namespace impala {
+
+class DescriptorTbl;
+class ObjectPool;
+class RuntimeState;
+class RowBatch;
+class TPlanNode;
+
+/// Scan node that materializes tuples, evaluates conjuncts and runtime filters
+/// in the thread calling GetNext(). Uses the HdfsScanner::GetNext() interface.
+class HdfsScanNodeMt : public HdfsScanNodeBase {
+ public:
+ HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+ ~HdfsScanNodeMt();
+
+ virtual Status Prepare(RuntimeState* state);
+ virtual Status Open(RuntimeState* state);
+ virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+ virtual void Close(RuntimeState* state);
+
+ virtual bool HasRowBatchQueue() const { return false; }
+
+ private:
+ /// Current scan range and corresponding scanner.
+ DiskIoMgr::ScanRange* scan_range_;
+ boost::scoped_ptr<ScannerContext> scanner_ctx_;
+ boost::scoped_ptr<HdfsScanner> scanner_;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index e0cee4d..83f6452 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -16,70 +16,31 @@
// under the License.
#include "exec/hdfs-scan-node.h"
-#include "exec/base-sequence-scanner.h"
-#include "exec/hdfs-text-scanner.h"
-#include "exec/hdfs-lzo-text-scanner.h"
-#include "exec/hdfs-sequence-scanner.h"
-#include "exec/hdfs-rcfile-scanner.h"
-#include "exec/hdfs-avro-scanner.h"
-#include "exec/hdfs-parquet-scanner.h"
#include <sstream>
-#include <avro/errors.h>
-#include <avro/schema.h>
-#include <boost/algorithm/string.hpp>
-#include <boost/filesystem.hpp>
-#include <gutil/strings/substitute.h>
-#include <hdfs.h>
-
-#include "codegen/llvm-codegen.h"
#include "common/logging.h"
-#include "common/object-pool.h"
-#include "exprs/expr-context.h"
+#include "exec/hdfs-scanner.h"
+#include "exec/scanner-context.h"
#include "runtime/descriptors.h"
-#include "runtime/hdfs-fs-cache.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
-#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
-#include "runtime/string-buffer.h"
#include "scheduling/query-resource-mgr.h"
-#include "util/bit-util.h"
-#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
-#include "util/error-util.h"
-#include "util/hdfs-util.h"
-#include "util/impalad-metrics.h"
-#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
-#include "gen-cpp/PlanNodes_types.h"
-
#include "common/names.h"
DEFINE_int32(max_row_batches, 0, "the maximum size of materialized_row_batches_");
-DEFINE_bool(suppress_unknown_disk_id_warnings, false,
- "Suppress unknown disk id warnings generated when the HDFS implementation does not"
- " provide volume/disk information.");
-DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, "
- "that a scan node will wait for expected runtime filters to arrive.");
#ifndef NDEBUG
DECLARE_bool(skip_file_runtime_filtering);
#endif
-namespace filesystem = boost::filesystem;
using namespace impala;
-using namespace llvm;
-using namespace strings;
-using boost::algorithm::join;
-
-const string HdfsScanNode::HDFS_SPLIT_STATS_DESC =
- "Hdfs split stats (<volume id>:<# splits>/<split lengths>)";
// Amount of memory that we approximate a scanner thread will use not including IoBuffers.
// The memory used does not vary considerably between file formats (just a couple of MBs).
@@ -91,37 +52,17 @@ const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
// estimate scanner thread memory usage.
const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
-// Determines how many unexpected remote bytes trigger an error in the runtime state
-const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
-
// Amount of time to block waiting for GetNext() to release scanner threads between
// checking if a scanner thread should yield itself back to the global thread pool.
const int SCANNER_THREAD_WAIT_TIME_MS = 20;
HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
- : ScanNode(pool, tnode, descs),
- runtime_state_(NULL),
- skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
- tnode.hdfs_scan_node.skip_header_line_count : 0),
- tuple_id_(tnode.hdfs_scan_node.tuple_id),
- reader_context_(NULL),
- tuple_desc_(NULL),
- hdfs_table_(NULL),
- unknown_disk_id_warned_(false),
- initial_ranges_issued_(false),
+ : HdfsScanNodeBase(pool, tnode, descs),
ranges_issued_barrier_(1),
scanner_thread_bytes_required_(0),
- max_compressed_text_file_length_(NULL),
- disks_accessed_bitmap_(TUnit::UNIT, 0),
- bytes_read_local_(NULL),
- bytes_read_short_circuit_(NULL),
- bytes_read_dn_cache_(NULL),
- num_remote_ranges_(NULL),
- unexpected_remote_bytes_(NULL),
done_(false),
all_ranges_started_(false),
- counters_running_(false),
thread_avail_cb_id_(-1),
rm_callback_id_(-1),
max_num_scanner_threads_(CpuInfo::num_cores()) {
@@ -139,99 +80,6 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
HdfsScanNode::~HdfsScanNode() {
}
-Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-
- // Add collection item conjuncts
- const map<TTupleId, vector<TExpr>>& collection_conjuncts =
- tnode.hdfs_scan_node.collection_conjuncts;
- map<TTupleId, vector<TExpr>>::const_iterator iter = collection_conjuncts.begin();
- for (; iter != collection_conjuncts.end(); ++iter) {
- DCHECK(conjuncts_map_[iter->first].empty());
- RETURN_IF_ERROR(
- Expr::CreateExprTrees(pool_, iter->second, &conjuncts_map_[iter->first]));
- }
-
- const TQueryOptions& query_options = state->query_options();
- for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
- auto it = filter.planid_to_target_ndx.find(tnode.node_id);
- DCHECK(it != filter.planid_to_target_ndx.end());
- const TRuntimeFilterTargetDesc& target = filter.targets[it->second];
- if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL &&
- !target.is_local_target) {
- continue;
- }
- if (query_options.disable_row_runtime_filtering &&
- !target.is_bound_by_partition_columns) {
- continue;
- }
-
- FilterContext filter_ctx;
- RETURN_IF_ERROR(Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr));
- filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false);
-
- string filter_profile_title = Substitute("Filter $0 ($1)", filter.filter_id,
- PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
- RuntimeProfile* profile = state->obj_pool()->Add(
- new RuntimeProfile(state->obj_pool(), filter_profile_title));
- runtime_profile_->AddChild(profile);
- filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
- target.is_bound_by_partition_columns));
-
- filter_ctxs_.push_back(filter_ctx);
- }
-
- // Add row batch conjuncts
- DCHECK(conjuncts_map_[tuple_id_].empty());
- conjuncts_map_[tuple_id_] = conjunct_ctxs_;
-
- return Status::OK();
-}
-
-bool HdfsScanNode::FilePassesFilterPredicates(const vector<FilterContext>& filter_ctxs,
- const THdfsFileFormat::type& format, HdfsFileDesc* file) {
-#ifndef NDEBUG
- if (FLAGS_skip_file_runtime_filtering) return true;
-#endif
- if (filter_ctxs_.size() == 0) return true;
- ScanRangeMetadata* metadata =
- reinterpret_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
- if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
- filter_ctxs)) {
- for (int j = 0; j < file->splits.size(); ++j) {
- // Mark range as complete to ensure progress.
- RangeComplete(format, file->file_compression);
- }
- return false;
- }
- return true;
-}
-
-bool HdfsScanNode::WaitForRuntimeFilters(int32_t time_ms) {
- vector<string> arrived_filter_ids;
- int32_t start = MonotonicMillis();
- for (auto& ctx: filter_ctxs_) {
- if (ctx.filter->WaitForArrival(time_ms)) {
- arrived_filter_ids.push_back(Substitute("$0", ctx.filter->id()));
- }
- }
- int32_t end = MonotonicMillis();
- const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS);
-
- if (arrived_filter_ids.size() == filter_ctxs_.size()) {
- runtime_profile()->AddInfoString("Runtime filters",
- Substitute("All filters arrived. Waited $0", wait_time));
- VLOG_QUERY << "Filters arrived. Waited " << wait_time;
- return true;
- }
-
- const string& filter_str = Substitute("Only following filters arrived: $0, waited $1",
- join(arrived_filter_ids, ", "), wait_time);
- runtime_profile()->AddInfoString("Runtime filters", filter_str);
- VLOG_QUERY << filter_str;
- return false;
-}
-
Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
@@ -241,35 +89,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
// so we need to tell them there is work to do.
// TODO: This is probably not worth splitting the organisational cost of splitting
// initialisation across two places. Move to before the scanner threads start.
- initial_ranges_issued_ = true;
-
- int32 wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
- if (state->query_options().runtime_filter_wait_time_ms > 0) {
- wait_time_ms = state->query_options().runtime_filter_wait_time_ms;
- }
- if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(wait_time_ms);
- // Apply dynamic partition-pruning per-file.
- FileFormatsMap matching_per_type_files;
- for (const FileFormatsMap::value_type& v: per_type_files_) {
- vector<HdfsFileDesc*>* matching_files = &matching_per_type_files[v.first];
- for (HdfsFileDesc* file: v.second) {
- if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) {
- matching_files->push_back(file);
- }
- }
- }
-
- // Issue initial ranges for all file types.
- RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
- matching_per_type_files[THdfsFileFormat::PARQUET]));
- RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
- matching_per_type_files[THdfsFileFormat::TEXT]));
- RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
- matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
- RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
- matching_per_type_files[THdfsFileFormat::RC_FILE]));
- RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
- matching_per_type_files[THdfsFileFormat::AVRO]));
+ RETURN_IF_ERROR(IssueInitialScanRanges(state));
// Release the scanner threads
ranges_issued_barrier_.Notify();
@@ -278,7 +98,11 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
}
Status status = GetNextInternal(state, row_batch, eos);
- if (!status.ok() || *eos) StopAndFinalizeCounters();
+ if (!status.ok() || *eos) {
+ unique_lock<mutex> l(lock_);
+ lock_guard<SpinLock> l2(file_type_counts_);
+ StopAndFinalizeCounters();
+ }
return status;
}
@@ -326,283 +150,16 @@ Status HdfsScanNode::GetNextInternal(
return status_;
}
-DiskIoMgr::ScanRange* HdfsScanNode::AllocateScanRange(
- hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
- int disk_id, bool try_cache, bool expected_local, int64_t mtime,
- const DiskIoMgr::ScanRange* original_split) {
- DCHECK_GE(disk_id, -1);
- // Require that the scan range is within [0, file_length). While this cannot be used
- // to guarantee safety (file_length metadata may be stale), it avoids different
- // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
- // beyond the end of the file).
- DCHECK_GE(offset, 0);
- DCHECK_GE(len, 0);
- DCHECK_LE(offset + len, GetFileDesc(file)->file_length)
- << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
- disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
-
- ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
- new ScanRangeMetadata(partition_id, original_split));
- DiskIoMgr::ScanRange* range =
- runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange());
- range->Reset(fs, file, len, offset, disk_id, try_cache, expected_local,
- mtime, metadata);
- return range;
-}
-
-HdfsFileDesc* HdfsScanNode::GetFileDesc(const string& filename) {
- DCHECK(file_descs_.find(filename) != file_descs_.end());
- return file_descs_[filename];
-}
-
-void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) {
- unique_lock<mutex> l(metadata_lock_);
- DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
- per_file_metadata_[filename] = metadata;
-}
-
-void* HdfsScanNode::GetFileMetadata(const string& filename) {
- unique_lock<mutex> l(metadata_lock_);
- map<string, void*>::iterator it = per_file_metadata_.find(filename);
- if (it == per_file_metadata_.end()) return NULL;
- return it->second;
-}
-
-void* HdfsScanNode::GetCodegenFn(THdfsFileFormat::type type) {
- CodegendFnMap::iterator it = codegend_fn_map_.find(type);
- if (it == codegend_fn_map_.end()) return NULL;
- return it->second;
-}
-
-Status HdfsScanNode::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
- ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
- DCHECK(context != NULL);
- THdfsCompression::type compression =
- context->GetStream()->file_desc()->file_compression;
-
- // Create a new scanner for this file format and compression.
- switch (partition->file_format()) {
- case THdfsFileFormat::TEXT:
- // Lzo-compressed text files are scanned by a scanner that it is implemented as a
- // dynamic library, so that Impala does not include GPL code.
- if (compression == THdfsCompression::LZO) {
- scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, runtime_state_));
- } else {
- scanner->reset(new HdfsTextScanner(this, runtime_state_, true));
- }
- break;
- case THdfsFileFormat::SEQUENCE_FILE:
- scanner->reset(new HdfsSequenceScanner(this, runtime_state_, true));
- break;
- case THdfsFileFormat::RC_FILE:
- scanner->reset(new HdfsRCFileScanner(this, runtime_state_, true));
- break;
- case THdfsFileFormat::AVRO:
- scanner->reset(new HdfsAvroScanner(this, runtime_state_, true));
- break;
- case THdfsFileFormat::PARQUET:
- scanner->reset(new HdfsParquetScanner(this, runtime_state_, true));
- break;
- default:
- return Status(Substitute("Unknown Hdfs file format type: $0",
- partition->file_format()));
- }
- DCHECK(scanner->get() != NULL);
- Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_);
- if (status.ok()) {
- status = scanner->get()->Open(context);
- if (!status.ok()) scanner->get()->Close(scanner->get()->batch());
- } else {
- context->ClearStreams();
- }
- return status;
-}
-
-Tuple* HdfsScanNode::InitTemplateTuple(RuntimeState* state,
- const vector<ExprContext*>& value_ctxs) {
- if (partition_key_slots_.empty()) return NULL;
-
- // Lock to protect access to partition_key_pool_ and value_ctxs
- // TODO: we can push the lock to the mempool and exprs_values should not
- // use internal memory.
- Tuple* template_tuple = InitEmptyTemplateTuple(*tuple_desc_);
-
- unique_lock<mutex> l(lock_);
- for (int i = 0; i < partition_key_slots_.size(); ++i) {
- const SlotDescriptor* slot_desc = partition_key_slots_[i];
- // Exprs guaranteed to be literals, so can safely be evaluated without a row context
- void* value = value_ctxs[slot_desc->col_pos()]->GetValue(NULL);
- RawValue::Write(value, template_tuple, slot_desc, NULL);
- }
- return template_tuple;
-}
-
-Tuple* HdfsScanNode::InitEmptyTemplateTuple(const TupleDescriptor& tuple_desc) {
- Tuple* template_tuple = NULL;
- {
- unique_lock<mutex> l(lock_);
- template_tuple = Tuple::Create(tuple_desc.byte_size(), scan_node_pool_.get());
- }
- memset(template_tuple, 0, tuple_desc.byte_size());
- return template_tuple;
-}
-
-void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
- unique_lock<mutex> l(lock_);
- scan_node_pool_->AcquireData(pool, false);
-}
-
-Status HdfsScanNode::TriggerDebugAction() {
- return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_);
-}
-
Status HdfsScanNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
- runtime_state_ = state;
- RETURN_IF_ERROR(ScanNode::Prepare(state));
-
- tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
- DCHECK(tuple_desc_ != NULL);
-
- // Prepare collection conjuncts
- ConjunctsMap::const_iterator iter = conjuncts_map_.begin();
- for (; iter != conjuncts_map_.end(); ++iter) {
- TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(iter->first);
-
- // conjuncts_ are already prepared in ExecNode::Prepare(), don't try to prepare again
- if (tuple_desc == tuple_desc_) continue;
-
- RowDescriptor* collection_row_desc =
- state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ false));
- RETURN_IF_ERROR(
- Expr::Prepare(iter->second, state, *collection_row_desc, expr_mem_tracker()));
- }
+ RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
+ // Assign scanner thread group to cgroup, if any.
if (!state->cgroup().empty()) {
scanner_threads_.SetCgroupsMgr(state->exec_env()->cgroups_mgr());
scanner_threads_.SetCgroup(state->cgroup());
}
- // One-time initialisation of state that is constant across scan ranges
- DCHECK(tuple_desc_->table_desc() != NULL);
- hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
- scan_node_pool_.reset(new MemPool(mem_tracker()));
-
- for (int i = 0; i < filter_ctxs_.size(); ++i) {
- RETURN_IF_ERROR(
- filter_ctxs_[i].expr->Prepare(state, row_desc(), expr_mem_tracker()));
- AddExprCtxToFree(filter_ctxs_[i].expr);
- }
-
- // Parse Avro table schema if applicable
- const string& avro_schema_str = hdfs_table_->avro_schema();
- if (!avro_schema_str.empty()) {
- avro_schema_t avro_schema;
- int error = avro_schema_from_json_length(
- avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
- if (error != 0) {
- return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
- }
- RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
- }
-
- // Gather materialized partition-key slots and non-partition slots.
- const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
- for (size_t i = 0; i < slots.size(); ++i) {
- if (hdfs_table_->IsClusteringCol(slots[i])) {
- partition_key_slots_.push_back(slots[i]);
- } else {
- materialized_slots_.push_back(slots[i]);
- }
- }
-
- // Order the materialized slots such that for schemaless file formats (e.g. text) the
- // order corresponds to the physical order in files. For formats where the file schema
- // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
- sort(materialized_slots_.begin(), materialized_slots_.end(),
- SlotDescriptor::ColPathLessThan);
-
- // Populate mapping from slot path to index into materialized_slots_.
- for (int i = 0; i < materialized_slots_.size(); ++i) {
- path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
- }
-
- // Initialize is_materialized_col_
- is_materialized_col_.resize(hdfs_table_->num_cols());
- for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
- is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN;
- }
-
- HdfsFsCache::HdfsFsMap fs_cache;
- // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate
- // partition_ids_, file_descs_, and per_type_files_.
- DCHECK(scan_range_params_ != NULL)
- << "Must call SetScanRanges() before calling Prepare()";
- int num_ranges_missing_volume_id = 0;
- for (int i = 0; i < scan_range_params_->size(); ++i) {
- DCHECK((*scan_range_params_)[i].scan_range.__isset.hdfs_file_split);
- const THdfsFileSplit& split = (*scan_range_params_)[i].scan_range.hdfs_file_split;
- partition_ids_.insert(split.partition_id);
- HdfsPartitionDescriptor* partition_desc =
- hdfs_table_->GetPartition(split.partition_id);
- if (partition_desc == NULL) {
- // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
- LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
- << " partition_id=" << split.partition_id
- << "\n" << PrintThrift(state->fragment_params());
- return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
- " Try rerunning the query.");
- }
-
- if (partition_template_tuple_map_.find(split.partition_id) ==
- partition_template_tuple_map_.end()) {
- partition_template_tuple_map_[split.partition_id] =
- InitTemplateTuple(state, partition_desc->partition_key_value_ctxs());;
- }
-
- filesystem::path file_path(partition_desc->location());
- file_path.append(split.file_name, filesystem::path::codecvt());
- const string& native_file_path = file_path.native();
-
- HdfsFileDesc* file_desc = NULL;
- FileDescMap::iterator file_desc_it = file_descs_.find(native_file_path);
- if (file_desc_it == file_descs_.end()) {
- // Add new file_desc to file_descs_ and per_type_files_
- file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
- file_descs_[native_file_path] = file_desc;
- file_desc->file_length = split.file_length;
- file_desc->mtime = split.mtime;
- file_desc->file_compression = split.file_compression;
- RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
- native_file_path, &file_desc->fs, &fs_cache));
- num_unqueued_files_.Add(1);
- per_type_files_[partition_desc->file_format()].push_back(file_desc);
- } else {
- // File already processed
- file_desc = file_desc_it->second;
- }
-
- bool expected_local =
- (*scan_range_params_)[i].__isset.is_remote && !(*scan_range_params_)[i].is_remote;
- if (expected_local && (*scan_range_params_)[i].volume_id == -1) {
- if (!FLAGS_suppress_unknown_disk_id_warnings && !unknown_disk_id_warned_) {
- runtime_profile()->AppendExecOption("Missing Volume Id");
- runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK));
- unknown_disk_id_warned_ = true;
- }
- ++num_ranges_missing_volume_id;
- }
-
- bool try_cache = (*scan_range_params_)[i].is_cached;
- if (runtime_state_->query_options().disable_cached_reads) {
- DCHECK(!try_cache) << "Params should not have had this set.";
- }
- file_desc->splits.push_back(
- AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
- split.offset, split.partition_id, (*scan_range_params_)[i].volume_id,
- try_cache, expected_local, file_desc->mtime));
- }
-
// Compute the minimum bytes required to start a new thread. This is based on the
// file format.
// The higher the estimate, the less likely it is the query will fail but more likely
@@ -632,100 +189,17 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
}
}
scanner_thread_bytes_required_ += scanner_thread_mem_usage;
-
- // Prepare all the partitions scanned by the scan node
- for (int64_t partition_id: partition_ids_) {
- HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
- // This is IMPALA-1702, but will have been caught earlier in this method.
- DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
- << " partition_id=" << partition_id
- << "\n" << PrintThrift(state->fragment_params());
- RETURN_IF_ERROR(partition_desc->PrepareExprs(state));
- }
-
- // Update server wide metrics for number of scan ranges and ranges that have
- // incomplete metadata.
- ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
- ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
-
- // Add per volume stats to the runtime profile
- PerVolumnStats per_volume_stats;
- stringstream str;
- UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
- PrintHdfsSplitStats(per_volume_stats, &str);
- runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
-
- // Create codegen'd functions
- for (int format = THdfsFileFormat::TEXT;
- format <= THdfsFileFormat::PARQUET; ++format) {
- vector<HdfsFileDesc*>& file_descs =
- per_type_files_[static_cast<THdfsFileFormat::type>(format)];
-
- if (file_descs.empty()) continue;
-
- // Randomize the order this node processes the files. We want to do this to avoid
- // issuing remote reads to the same DN from different impalads. In file formats such
- // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
- // If every node goes through the files in the same order, all the remote reads are
- // for the same file meaning a few DN serves a lot of remote reads at the same time.
- random_shuffle(file_descs.begin(), file_descs.end());
-
- // Create reusable codegen'd functions for each file type type needed
- // TODO: do this for conjuncts_map_
- Function* fn;
- Status status;
- switch (format) {
- case THdfsFileFormat::TEXT:
- status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn);
- break;
- case THdfsFileFormat::SEQUENCE_FILE:
- status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn);
- break;
- case THdfsFileFormat::AVRO:
- status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn);
- break;
- case THdfsFileFormat::PARQUET:
- status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, &fn);
- break;
- default:
- // No codegen for this format
- fn = NULL;
- status = Status("Not implemented for this format.");
- }
- DCHECK(fn != NULL || !status.ok());
-
- const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second;
- if (!status.ok()) {
- runtime_profile()->AddCodegenMsg(false, status, format_name);
- } else {
- runtime_profile()->AddCodegenMsg(true, status, format_name);
- LlvmCodeGen* codegen;
- RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
- codegen->AddFunctionToJit(
- fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
- }
- }
-
return Status::OK();
}
-// This function initiates the connection to hdfs and starts up the initial scanner
-// threads. The scanner subclasses are passed the initial splits. Scanners are expected to
-// queue up a non-zero number of those splits to the io mgr (via the ScanNode). Scan
-// ranges are not issued until the first GetNext() call; scanner threads will block on
-// ranges_issued_barrier_ until ranges are issued.
+// This function registers the ThreadTokenAvailableCb to start up the initial scanner
+// threads. Scan ranges are not issued until the first GetNext() call; scanner threads
+// will block on ranges_issued_barrier_ until ranges are issued.
Status HdfsScanNode::Open(RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::Open(state));
-
- // Open collection conjuncts
- ConjunctsMap::const_iterator iter = conjuncts_map_.begin();
- for (; iter != conjuncts_map_.end(); ++iter) {
- // conjuncts_ are already opened in ExecNode::Open()
- if (iter->first == tuple_id_) continue;
- RETURN_IF_ERROR(Expr::Open(iter->second, state));
- }
+ SCOPED_TIMER(runtime_profile_->total_time_counter());
+ RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
- for (auto& filter_ctx: filter_ctxs_) RETURN_IF_ERROR(filter_ctx.expr->Open(state));
+ if (file_descs_.empty() || progress_.done()) return Status::OK();
// We need at least one scanner thread to make progress. We need to make this
// reservation before any ranges are issued.
@@ -744,95 +218,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
runtime_state_->resource_pool()));
}
- if (file_descs_.empty()) {
- SetDone();
- return Status::OK();
- }
-
- // Open all the partition exprs used by the scan node
- for (int64_t partition_id: partition_ids_) {
- HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
- DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
- << " partition_id=" << partition_id
- << "\n" << PrintThrift(state->fragment_params());
- RETURN_IF_ERROR(partition_desc->OpenExprs(state));
- }
-
- RETURN_IF_ERROR(runtime_state_->io_mgr()->RegisterContext(
- &reader_context_, mem_tracker()));
-
- // Initialize HdfsScanNode specific counters
- read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
- per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
- PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
- bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
- scan_ranges_complete_counter_ =
- ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
- if (DiskInfo::num_disks() < 64) {
- num_disks_accessed_counter_ =
- ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT);
- } else {
- num_disks_accessed_counter_ = NULL;
- }
- num_scanner_threads_started_counter_ =
- ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
-
- runtime_state_->io_mgr()->set_bytes_read_counter(reader_context_, bytes_read_counter());
- runtime_state_->io_mgr()->set_read_timer(reader_context_, read_timer());
- runtime_state_->io_mgr()->set_active_read_thread_counter(reader_context_,
- &active_hdfs_read_thread_counter_);
- runtime_state_->io_mgr()->set_disks_access_bitmap(reader_context_,
- &disks_accessed_bitmap_);
-
- average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
- AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
- average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
- AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
-
- bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
- TUnit::BYTES);
- bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
- TUnit::BYTES);
- bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache",
- TUnit::BYTES);
- num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges",
- TUnit::UNIT);
- unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected",
- TUnit::BYTES);
-
- max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
- "MaxCompressedTextFileLength", TUnit::BYTES);
-
- for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
- hdfs_read_thread_concurrency_bucket_.push_back(
- pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
- }
- runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_,
- &hdfs_read_thread_concurrency_bucket_);
-
- counters_running_ = true;
-
- int total_splits = 0;
- for (FileDescMap::iterator it = file_descs_.begin(); it != file_descs_.end(); ++it) {
- total_splits += it->second->splits.size();
- }
-
- if (total_splits == 0) {
- SetDone();
- return Status::OK();
- }
-
- stringstream ss;
- ss << "Splits complete (node=" << id() << "):";
- progress_.Init(ss.str(), total_splits);
return Status::OK();
}
-Status HdfsScanNode::Reset(RuntimeState* state) {
- DCHECK(false) << "Internal error: Scan nodes should not appear in subplans.";
- return Status("Internal error: Scan nodes should not appear in subplans.");
-}
-
void HdfsScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
SetDone();
@@ -849,47 +237,36 @@ void HdfsScanNode::Close(RuntimeState* state) {
num_owned_io_buffers_.Add(-materialized_row_batches_->Cleanup());
DCHECK_EQ(num_owned_io_buffers_.Load(), 0) << "ScanNode has leaked io buffers";
- if (reader_context_ != NULL) {
- // There may still be io buffers used by parent nodes so we can't unregister the
- // reader context yet. The runtime state keeps a list of all the reader contexts and
- // they are unregistered when the fragment is closed.
- state->reader_contexts()->push_back(reader_context_);
- // Need to wait for all the active scanner threads to finish to ensure there is no
- // more memory tracked by this scan node's mem tracker.
- state->io_mgr()->CancelContext(reader_context_, true);
- }
-
- StopAndFinalizeCounters();
+ HdfsScanNodeBase::Close(state);
+}
- // There should be no active scanner threads and hdfs read threads.
- DCHECK_EQ(active_scanner_thread_counter_.value(), 0);
- DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0);
+void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) {
+ unique_lock<mutex> l(metadata_lock_);
+ DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
+ per_file_metadata_[filename] = metadata;
+}
- if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
+void* HdfsScanNode::GetFileMetadata(const string& filename) {
+ unique_lock<mutex> l(metadata_lock_);
+ map<string, void*>::iterator it = per_file_metadata_.find(filename);
+ if (it == per_file_metadata_.end()) return NULL;
+ return it->second;
+}
- // Close all the partitions scanned by the scan node
- for (int64_t partition_id: partition_ids_) {
- HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
- if (partition_desc == NULL) {
- // TODO: Revert when IMPALA-1702 is fixed.
- LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
- << " partition_id=" << partition_id
- << "\n" << PrintThrift(state->fragment_params());
- continue;
- }
- partition_desc->CloseExprs(state);
- }
+void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
+ const std::vector<THdfsCompression::type>& compression_type) {
+ lock_guard<SpinLock> l(file_type_counts_);
+ HdfsScanNodeBase::RangeComplete(file_type, compression_type);
+}
- // Close collection conjuncts
- ConjunctsMap::const_iterator iter = conjuncts_map_.begin();
- for (; iter != conjuncts_map_.end(); ++iter) {
- // conjuncts_ are already closed in ExecNode::Close()
- if (iter->first == tuple_id_) continue;
- Expr::Close(iter->second, state);
- }
+void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
+ unique_lock<mutex> l(lock_);
+ scan_node_pool_->AcquireData(pool, false);
+}
- for (auto& filter_ctx: filter_ctxs_) filter_ctx.expr->Close(state);
- ScanNode::Close(state);
+void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) {
+ InitNullCollectionValues(row_batch);
+ materialized_row_batches_->AddBatch(row_batch);
}
Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
@@ -902,43 +279,6 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges
return Status::OK();
}
-void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) {
- InitNullCollectionValues(row_batch);
- materialized_row_batches_->AddBatch(row_batch);
-}
-
-void HdfsScanNode::InitNullCollectionValues(const TupleDescriptor* tuple_desc,
- Tuple* tuple) const {
- for (const SlotDescriptor* slot_desc: tuple_desc->collection_slots()) {
- CollectionValue* slot = reinterpret_cast<CollectionValue*>(
- tuple->GetSlot(slot_desc->tuple_offset()));
- if (tuple->IsNull(slot_desc->null_indicator_offset())) {
- *slot = CollectionValue();
- continue;
- }
- // Recursively traverse collection items.
- const TupleDescriptor* item_desc = slot_desc->collection_item_descriptor();
- if (item_desc->collection_slots().empty()) continue;
- for (int i = 0; i < slot->num_tuples; ++i) {
- int item_offset = i * item_desc->byte_size();
- Tuple* collection_item = reinterpret_cast<Tuple*>(slot->ptr + item_offset);
- InitNullCollectionValues(item_desc, collection_item);
- }
- }
-}
-
-void HdfsScanNode::InitNullCollectionValues(RowBatch* row_batch) const {
- DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
- const TupleDescriptor& tuple_desc =
- *row_batch->row_desc().tuple_descriptors()[tuple_idx()];
- if (tuple_desc.collection_slots().empty()) return;
- for (int i = 0; i < row_batch->num_rows(); ++i) {
- Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx());
- DCHECK(tuple != NULL);
- InitNullCollectionValues(&tuple_desc, tuple);
- }
-}
-
// For controlling the amount of memory used for scanners, we approximate the
// scanner mem usage based on scanner_thread_bytes_required_, rather than the
// consumption in the scan node's mem tracker. The problem with the scan node
@@ -1161,33 +501,6 @@ void HdfsScanNode::ScannerThread() {
runtime_state_->resource_pool()->ReleaseThreadToken(false);
}
-bool HdfsScanNode::PartitionPassesFilters(int32_t partition_id,
- const string& stats_name, const vector<FilterContext>& filter_ctxs) {
- if (filter_ctxs.size() == 0) return true;
- DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
- << "Mismatched number of filter contexts";
- Tuple* template_tuple = partition_template_tuple_map_[partition_id];
- // Defensive - if template_tuple is NULL, there can be no filters on partition columns.
- if (template_tuple == NULL) return true;
- TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
- for (const FilterContext& ctx: filter_ctxs) {
- int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_);
- if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) {
- continue;
- }
- void* e = ctx.expr->GetValue(tuple_row_mem);
-
- // Not quite right because bitmap could arrive after Eval(), but we're ok with
- // off-by-one errors.
- bool processed = ctx.filter->HasBloomFilter();
- bool passed_filter = ctx.filter->Eval<void>(e, ctx.expr->root()->type());
- ctx.stats->IncrCounters(stats_name, 1, processed, !passed_filter);
- if (!passed_filter) return false;
- }
-
- return true;
-}
-
namespace {
// Returns true if 'format' uses a scanner derived from BaseSequenceScanner. Used to
@@ -1205,8 +518,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
DCHECK(scan_range != NULL);
- ScanRangeMetadata* metadata =
- reinterpret_cast<ScanRangeMetadata*>(scan_range->meta_data());
+ ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
int64_t partition_id = metadata->partition_id;
HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id()
@@ -1269,26 +581,6 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
return status;
}
-void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
- const THdfsCompression::type& compression_type) {
- vector<THdfsCompression::type> types;
- types.push_back(compression_type);
- RangeComplete(file_type, types);
-}
-
-void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
- const vector<THdfsCompression::type>& compression_types) {
- scan_ranges_complete_counter()->Add(1);
- progress_.Update(1);
-
- {
- lock_guard<SpinLock> l(file_type_counts_lock_);
- for (int i = 0; i < compression_types.size(); ++i) {
- ++file_type_counts_[make_pair(file_type, compression_types[i])];
- }
- }
-}
-
void HdfsScanNode::SetDone() {
{
unique_lock<mutex> l(lock_);
@@ -1300,129 +592,3 @@ void HdfsScanNode::SetDone() {
}
materialized_row_batches_->Shutdown();
}
-
-void HdfsScanNode::ComputeSlotMaterializationOrder(vector<int>* order) const {
- const vector<ExprContext*>& conjuncts = ExecNode::conjunct_ctxs();
- // Initialize all order to be conjuncts.size() (after the last conjunct)
- order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
-
- const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl();
-
- vector<SlotId> slot_ids;
- for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
- slot_ids.clear();
- int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids);
- for (int j = 0; j < num_slots; ++j) {
- SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]);
- int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path());
- // slot_idx == -1 means this was a partition key slot which is always
- // materialized before any slots.
- if (slot_idx == -1) continue;
- // If this slot hasn't been assigned an order, assign it be materialized
- // before evaluating conjuncts[i]
- if ((*order)[slot_idx] == conjuncts.size()) {
- (*order)[slot_idx] = conjunct_idx;
- }
- }
- }
-}
-
-void HdfsScanNode::StopAndFinalizeCounters() {
- unique_lock<mutex> l(lock_);
- if (!counters_running_) return;
- counters_running_ = false;
-
- PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
- PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
- PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_);
- PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_);
- PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_,
- true);
-
- // Output hdfs read thread concurrency into info string
- stringstream ss;
- for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
- ss << i << ":" << setprecision(4)
- << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
- }
- runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
-
- // Convert disk access bitmap to num of disk accessed
- uint64_t num_disk_bitmap = disks_accessed_bitmap_.value();
- int64_t num_disk_accessed = BitUtil::Popcount(num_disk_bitmap);
- if (num_disks_accessed_counter_ != NULL) {
- num_disks_accessed_counter_->Set(num_disk_accessed);
- }
-
- // output completed file types and counts to info string
- if (!file_type_counts_.empty()) {
- stringstream ss;
- {
- lock_guard<SpinLock> l2(file_type_counts_lock_);
- for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin();
- it != file_type_counts_.end(); ++it) {
- ss << it->first.first << "/" << it->first.second << ":" << it->second << " ";
- }
- }
- runtime_profile_->AddInfoString("File Formats", ss.str());
- }
-
- // Output fraction of scanners with codegen enabled
- int num_enabled = num_scanners_codegen_enabled_.Load();
- int total = num_enabled + num_scanners_codegen_disabled_.Load();
- runtime_profile()->AppendExecOption(
- Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
-
- if (reader_context_ != NULL) {
- bytes_read_local_->Set(runtime_state_->io_mgr()->bytes_read_local(reader_context_));
- bytes_read_short_circuit_->Set(
- runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_));
- bytes_read_dn_cache_->Set(
- runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_));
- num_remote_ranges_->Set(static_cast<int64_t>(
- runtime_state_->io_mgr()->num_remote_ranges(reader_context_)));
- unexpected_remote_bytes_->Set(
- runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_));
-
- if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
- runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(
- "Read $0 of data across network that was expected to be local. "
- "Block locality metadata for table '$1.$2' may be stale. Consider running "
- "\"INVALIDATE METADATA `$1`.`$2`\".",
- PrettyPrinter::Print(unexpected_remote_bytes_->value(), TUnit::BYTES),
- hdfs_table_->database(), hdfs_table_->name())));
- }
-
- ImpaladMetrics::IO_MGR_BYTES_READ->Increment(bytes_read_counter()->value());
- ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ->Increment(
- bytes_read_local_->value());
- ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ->Increment(
- bytes_read_short_circuit_->value());
- ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment(
- bytes_read_dn_cache_->value());
- }
-}
-
-void HdfsScanNode::UpdateHdfsSplitStats(
- const vector<TScanRangeParams>& scan_range_params_list,
- PerVolumnStats* per_volume_stats) {
- pair<int, int64_t> init_value(0, 0);
- for (const TScanRangeParams& scan_range_params: scan_range_params_list) {
- const TScanRange& scan_range = scan_range_params.scan_range;
- if (!scan_range.__isset.hdfs_file_split) continue;
- const THdfsFileSplit& split = scan_range.hdfs_file_split;
- pair<int, int64_t>* stats =
- FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value);
- ++(stats->first);
- stats->second += split.length;
- }
-}
-
-void HdfsScanNode::PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
- stringstream* ss) {
- for (PerVolumnStats::const_iterator i = per_volume_stats.begin();
- i != per_volume_stats.end(); ++i) {
- (*ss) << i->first << ":" << i->second.first << "/"
- << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " ";
- }
-}