You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/07/03 00:29:12 UTC
[2/6] incubator-impala git commit: IMPALA-3905:
HdfsScanner::GetNext() for Avro, RC, and Seq scans.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 7af5d5d..11e5718 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -231,15 +231,15 @@ class HdfsScanNodeBase : public ScanNode {
Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals,
MemPool* pool, RuntimeState* state) const;
- /// Returns the file desc for 'filename'. Returns NULL if filename is invalid.
+ /// Returns the file desc for 'filename'. Returns nullptr if filename is invalid.
HdfsFileDesc* GetFileDesc(const std::string& filename);
/// Sets the scanner specific metadata for 'filename'. Scanners can use this to store
/// file header information. Thread safe.
void SetFileMetadata(const std::string& filename, void* metadata);
- /// Returns the scanner specific metadata for 'filename'. Returns NULL if there is no
- /// metadata. Thread safe.
+ /// Returns the scanner specific metadata for 'filename'. Returns nullptr if there is
+ /// no metadata. Thread safe.
void* GetFileMetadata(const std::string& filename);
/// Called by scanners when a range is complete. Used to record progress.
@@ -267,6 +267,9 @@ class HdfsScanNodeBase : public ScanNode {
return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
}
+ /// Transfers all memory from 'pool' to 'scan_node_pool_'.
+ virtual void TransferToScanNodePool(MemPool* pool);
+
/// map from volume id to <number of split, per volume split lengths>
/// TODO: move this into some global .h, no need to include this file just for this
/// typedef
@@ -329,7 +332,7 @@ class HdfsScanNodeBase : public ScanNode {
/// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
/// the partition columns for that partition materialized. Used to filter files and scan
- /// ranges on partition-column filters. Populated in Prepare().
+ /// ranges on partition-column filters. Populated in Open().
boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
/// Descriptor for the hdfs table, including partition and format metadata.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 7829d47..4ce12fe 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -40,9 +40,10 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~HdfsScanNodeMt();
- virtual Status Prepare(RuntimeState* state);
- virtual Status Open(RuntimeState* state);
- virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+ virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
+ WARN_UNUSED_RESULT;
virtual void Close(RuntimeState* state);
virtual bool HasRowBatchQueue() const { return false; }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 4cbf503..1ab87ac 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -17,6 +17,7 @@
#include "exec/hdfs-scan-node.h"
+#include <memory>
#include <sstream>
#include "common/logging.h"
@@ -111,10 +112,10 @@ Status HdfsScanNode::GetNextInternal(
return Status::OK();
}
*eos = false;
- RowBatch* materialized_batch = materialized_row_batches_->GetBatch();
+ unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
if (materialized_batch != NULL) {
num_owned_io_buffers_.Add(-materialized_batch->num_io_buffers());
- row_batch->AcquireState(materialized_batch);
+ row_batch->AcquireState(materialized_batch.get());
// Update the number of materialized rows now instead of when they are materialized.
// This means that scanners might process and queue up more rows than are necessary
// for the limit case but we want to avoid the synchronized writes to
@@ -132,7 +133,7 @@ Status HdfsScanNode::GetNextInternal(
SetDone();
}
DCHECK_EQ(materialized_batch->num_io_buffers(), 0);
- delete materialized_batch;
+ materialized_batch.reset();
return Status::OK();
}
// The RowBatchQueue was shutdown either because all scan ranges are complete or a
@@ -248,12 +249,12 @@ void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
unique_lock<mutex> l(lock_);
- scan_node_pool_->AcquireData(pool, false);
+ HdfsScanNodeBase::TransferToScanNodePool(pool);
}
-void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) {
- InitNullCollectionValues(row_batch);
- materialized_row_batches_->AddBatch(row_batch);
+void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
+ InitNullCollectionValues(row_batch.get());
+ materialized_row_batches_->AddBatch(move(row_batch));
}
Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
@@ -541,9 +542,8 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
VLOG_QUERY << ss.str();
}
- // Transfer the remaining resources to the final row batch (if any) and add it to
- // the row batch queue.
- scanner->Close(scanner->batch());
+ // Transfer remaining resources to a final batch and add it to the row batch queue.
+ scanner->Close();
return status;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 41647da..b056e2e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -20,6 +20,7 @@
#define IMPALA_EXEC_HDFS_SCAN_NODE_H_
#include <map>
+#include <memory>
#include <stdint.h>
#include <vector>
@@ -66,10 +67,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~HdfsScanNode();
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
- virtual Status Prepare(RuntimeState* state);
- virtual Status Open(RuntimeState* state);
- virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+ virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
+ WARN_UNUSED_RESULT;
virtual void Close(RuntimeState* state);
virtual bool HasRowBatchQueue() const { return true; }
@@ -78,12 +80,12 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
- int num_files_queued);
+ int num_files_queued) WARN_UNUSED_RESULT;
/// Adds a materialized row batch for the scan node. This is called from scanner
/// threads.
/// This function will block if materialized_row_batches_ is full.
- void AddMaterializedRowBatch(RowBatch* row_batch);
+ void AddMaterializedRowBatch(std::unique_ptr<RowBatch> row_batch);
/// Called by scanners when a range is complete. Used to record progress and set done_.
/// This *must* only be called after a scanner has completely finished its
@@ -93,8 +95,8 @@ class HdfsScanNode : public HdfsScanNodeBase {
virtual void RangeComplete(const THdfsFileFormat::type& file_type,
const std::vector<THdfsCompression::type>& compression_type);
- /// Acquires all allocations from pool into scan_node_pool_. Thread-safe.
- void TransferToScanNodePool(MemPool* pool);
+ /// Transfers all memory from 'pool' to 'scan_node_pool_'.
+ virtual void TransferToScanNodePool(MemPool* pool);
private:
/// Released when initial ranges are issued in the first call to GetNext().
@@ -164,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
/// in this split.
Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
- DiskIoMgr::ScanRange* scan_range);
+ DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT;
/// Returns true if there is enough memory (against the mem tracker limits) to
/// have a scanner thread.
@@ -175,7 +177,8 @@ class HdfsScanNode : public HdfsScanNodeBase {
bool EnoughMemoryForScannerThread(bool new_thread);
/// Checks for eos conditions and returns batches from materialized_row_batches_.
- Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
+ Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)
+ WARN_UNUSED_RESULT;
/// sets done_ to true and triggers threads to cleanup. Cannot be called with
/// any locks taken. Calling it repeatedly ignores subsequent calls.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 991c40f..0670f81 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -60,44 +60,16 @@ const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: scan_node_(scan_node),
state_(state),
- context_(NULL),
- stream_(NULL),
- eos_(false),
- is_closed_(false),
expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
- conjunct_evals_(NULL),
template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
- template_tuple_(NULL),
tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
- tuple_(NULL),
- batch_(NULL),
- tuple_mem_(NULL),
- parse_status_(Status::OK()),
- decompression_type_(THdfsCompression::NONE),
- data_buffer_pool_(new MemPool(scan_node->mem_tracker())),
- decompress_timer_(NULL),
- write_tuples_fn_(NULL) {
+ data_buffer_pool_(new MemPool(scan_node->mem_tracker())) {
}
HdfsScanner::HdfsScanner()
- : scan_node_(NULL),
- state_(NULL),
- context_(NULL),
- stream_(NULL),
- eos_(false),
- is_closed_(false),
- conjunct_evals_(NULL),
- template_tuple_pool_(NULL),
- template_tuple_(NULL),
- tuple_byte_size_(-1),
- tuple_(NULL),
- batch_(NULL),
- tuple_mem_(NULL),
- parse_status_(Status::OK()),
- decompression_type_(THdfsCompression::NONE),
- data_buffer_pool_(NULL),
- decompress_timer_(NULL),
- write_tuples_fn_(NULL) {
+ : scan_node_(nullptr),
+ state_(nullptr),
+ tuple_byte_size_(0) {
DCHECK(TestInfo::is_test());
}
@@ -142,9 +114,31 @@ Status HdfsScanner::Open(ScannerContext* context) {
return Status::OK();
}
-void HdfsScanner::Close(RowBatch* row_batch) {
+Status HdfsScanner::ProcessSplit() {
+ DCHECK(scan_node_->HasRowBatchQueue());
+ HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
+ do {
+ unique_ptr<RowBatch> batch = std::make_unique<RowBatch>(scan_node_->row_desc(),
+ state_->batch_size(), scan_node_->mem_tracker());
+ Status status = GetNextInternal(batch.get());
+ // Always add batch to the queue because it may contain data referenced by previously
+ // appended batches.
+ scan_node->AddMaterializedRowBatch(move(batch));
+ RETURN_IF_ERROR(status);
+ } while (!eos_ && !scan_node_->ReachedLimit());
+ return Status::OK();
+}
+
+void HdfsScanner::Close() {
+ DCHECK(scan_node_->HasRowBatchQueue());
+ RowBatch* final_batch = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+ scan_node_->mem_tracker());
+ Close(final_batch);
+}
+
+void HdfsScanner::CloseInternal() {
DCHECK(!is_closed_);
- if (decompressor_.get() != NULL) {
+ if (decompressor_.get() != nullptr) {
decompressor_->Close();
decompressor_.reset();
}
@@ -153,7 +147,7 @@ void HdfsScanner::Close(RowBatch* row_batch) {
}
expr_mem_pool_->FreeAll();
obj_pool_.Clear();
- stream_ = NULL;
+ stream_ = nullptr;
context_->ClearStreams();
is_closed_ = true;
}
@@ -179,26 +173,6 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
return Status::OK();
}
-Status HdfsScanner::StartNewRowBatch() {
- DCHECK(scan_node_->HasRowBatchQueue());
- batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
- scan_node_->mem_tracker());
- int64_t tuple_buffer_size;
- RETURN_IF_ERROR(
- batch_->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
- return Status::OK();
-}
-
-int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) {
- DCHECK(scan_node_->HasRowBatchQueue());
- DCHECK(batch_ != NULL);
- DCHECK_GT(batch_->capacity(), batch_->num_rows());
- *pool = batch_->tuple_data_pool();
- *tuple_mem = reinterpret_cast<Tuple*>(tuple_mem_);
- *tuple_row_mem = batch_->GetRow(batch_->AddRow());
- return batch_->capacity() - batch_->num_rows();
-}
-
Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool,
Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) {
int num_tuples;
@@ -210,10 +184,7 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool
return Status::OK();
}
-Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch) {
- DCHECK(batch_ != NULL || !scan_node_->HasRowBatchQueue());
- DCHECK(batch_ == row_batch || !scan_node_->HasRowBatchQueue());
- DCHECK(!enqueue_if_full || scan_node_->HasRowBatchQueue());
+Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
DCHECK_LE(num_rows, row_batch->capacity() - row_batch->num_rows());
row_batch->CommitRows(num_rows);
tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
@@ -224,10 +195,6 @@ Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row
// if no rows passed predicates.
if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) {
context_->ReleaseCompletedResources(row_batch, /* done */ false);
- if (enqueue_if_full) {
- static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
- RETURN_IF_ERROR(StartNewRowBatch());
- }
}
if (context_->cancelled()) return Status::CANCELLED;
// Check for UDF errors.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index cb9dfeb..f2f3407 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -22,33 +22,27 @@
#include <vector>
#include <memory>
#include <stdint.h>
-#include <boost/regex.hpp>
#include <boost/scoped_ptr.hpp>
#include "codegen/impala-ir.h"
#include "common/object-pool.h"
+#include "common/status.h"
#include "exec/hdfs-scan-node-base.h"
-#include "exec/scan-node.h"
#include "exec/scanner-context.h"
-#include "runtime/disk-io-mgr.h"
#include "runtime/row-batch.h"
#include "runtime/tuple.h"
namespace impala {
+class Codec;
class CollectionValueBuilder;
class Compression;
-class DescriptorTbl;
class Expr;
class HdfsPartitionDescriptor;
class MemPool;
-class SlotDescriptor;
-class Status;
class TextConverter;
class TupleDescriptor;
-class TPlanNode;
-class TScanRange;
-class Codec;
+class SlotDescriptor;
/// Intermediate structure used for two pass parsing approach. In the first pass,
/// the FieldLocation structs are filled out and contain where all the fields start and
@@ -118,7 +112,7 @@ class HdfsScanner {
virtual ~HdfsScanner();
/// One-time initialisation of state that is constant across scan ranges.
- virtual Status Open(ScannerContext* context);
+ virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
/// Returns the next row batch from this scanner's split.
/// Recoverable errors are logged to the runtime state. Only returns a non-OK status
@@ -127,7 +121,7 @@ class HdfsScanner {
/// The memory referenced by the tuples is valid until this or any subsequently
/// returned batch is reset or destroyed.
/// Only valid to call if the parent scan node is single-threaded.
- Status GetNext(RowBatch* row_batch) {
+ Status GetNext(RowBatch* row_batch) WARN_UNUSED_RESULT {
DCHECK(!scan_node_->HasRowBatchQueue());
return GetNextInternal(row_batch);
}
@@ -136,14 +130,26 @@ class HdfsScanner {
/// initialized with the split data (e.g. template tuple, partition descriptor, etc).
/// This function should only return on error or end of scan range.
/// Only valid to call if the parent scan node is multi-threaded.
- virtual Status ProcessSplit() = 0;
+ virtual Status ProcessSplit() WARN_UNUSED_RESULT;
+
+ /// Creates a new row batch and transfers the ownership of memory backing returned
+ /// tuples to it by calling Close(RowBatch). That last batch is added to the row batch
+ /// queue. Only valid to call if HasRowBatchQueue().
+ void Close();
/// Transfers the ownership of memory backing returned tuples such as IO buffers
/// and memory in mem pools to the given row batch. If the row batch is NULL,
/// those resources are released instead. In any case, releases all other resources
/// that are not backing returned rows (e.g. temporary decompression buffers).
/// This function is not idempotent and must only be called once.
- virtual void Close(RowBatch* row_batch);
+ virtual void Close(RowBatch* row_batch) = 0;
+
+ /// Helper function that frees resources common to all scanner subclasses like the
+ /// 'decompressor_', 'context_', 'obj_pool_', etc. Should only be called once the last
+ /// row batch has been attached to the row batch queue (if applicable) to avoid freeing
+ /// memory that might be referenced by the last batch.
+ /// Only valid to call if 'is_closed_' is false. Sets 'is_closed_' to true.
+ void CloseInternal();
/// Only valid to call if the parent scan node is single-threaded.
bool eos() const {
@@ -151,12 +157,6 @@ class HdfsScanner {
return eos_;
}
- /// Only valid to call if the parent scan node is multi-threaded.
- RowBatch* batch() const {
- DCHECK(scan_node_->HasRowBatchQueue());
- return batch_;
- }
-
/// Scanner subclasses must implement these static functions as well. Unfortunately,
/// c++ does not allow static virtual functions.
@@ -173,7 +173,7 @@ class HdfsScanner {
/// should be issued to the io mgr. This is one range for the metadata and one
/// range for each column, for each split.
/// This function is how scanners can pick their strategy.
- /// void IssueInitialRanges(HdfsScanNode* scan_node,
+ /// void IssueInitialRanges(HdfsScanNodeBase* scan_node,
/// const std::vector<HdfsFileDesc*>& files);
/// Codegen all functions for this scanner. The codegen'd function is specific to
@@ -191,20 +191,19 @@ class HdfsScanner {
RuntimeState* state_;
/// Context for this scanner
- ScannerContext* context_;
+ ScannerContext* context_ = nullptr;
/// Object pool for objects with same lifetime as scanner.
ObjectPool obj_pool_;
/// The first stream for context_
- ScannerContext::Stream* stream_;
+ ScannerContext::Stream* stream_ = nullptr;
/// Set if this scanner has processed all ranges and will not produce more rows.
- /// Only relevant when calling the GetNext() interface.
- bool eos_;
+ bool eos_ = false;
/// Starts as false and is set to true in Close().
- bool is_closed_;
+ bool is_closed_ = false;
/// MemPool used for expression evaluators in this scanner. Need to be local
/// to each scanner as MemPool is not thread safe.
@@ -217,7 +216,7 @@ class HdfsScanner {
// Convenience reference to conjuncts_evals_map_[scan_node_->tuple_idx()] for
// scanners that do not support nested types.
- const std::vector<ScalarExprEvaluator*>* conjunct_evals_;
+ const std::vector<ScalarExprEvaluator*>* conjunct_evals_ = nullptr;
// Clones of the conjuncts' evaluators in scan_node_->dict_filter_conjuncts_map().
typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> DictFilterConjunctsMap;
@@ -242,23 +241,16 @@ class HdfsScanner {
/// Convenience variable set to the top-level template tuple
/// (i.e. template_tuple_map_[scan_node_->tuple_desc()]).
- Tuple* template_tuple_;
+ Tuple* template_tuple_ = nullptr;
/// Fixed size of each top-level tuple, in bytes
const int32_t tuple_byte_size_;
- /// Current tuple pointer into tuple_mem_.
- Tuple* tuple_;
+ /// Current tuple pointer into 'tuple_mem_'.
+ Tuple* tuple_ = nullptr;
- /// The current row batch being populated. Creating new row batches, attaching context
- /// resources, and handing off to the scan node is handled by this class in CommitRows(),
- /// but AttachPool() must be called by scanner subclasses to attach any memory allocated
- /// by that subclass. All row batches created by this class are transferred to the scan
- /// node (i.e., all batches are ultimately owned by the scan node).
- RowBatch* batch_;
-
- /// The tuple memory of batch_.
- uint8_t* tuple_mem_;
+ /// The tuple memory backing 'tuple_'.
+ uint8_t* tuple_mem_ = nullptr;
/// Helper class for converting text to other types;
boost::scoped_ptr<TextConverter> text_converter_;
@@ -267,31 +259,31 @@ class HdfsScanner {
/// This significantly minimizes the cross compile dependencies for llvm since status
/// objects inline a bunch of string functions. Also, status objects aren't extremely
/// cheap to create and destroy.
- Status parse_status_;
+ Status parse_status_ = Status::OK();
/// Decompressor class to use, if any.
boost::scoped_ptr<Codec> decompressor_;
/// The most recently used decompression type.
- THdfsCompression::type decompression_type_;
+ THdfsCompression::type decompression_type_ = THdfsCompression::NONE;
/// Pool to allocate per data block memory. This should be used with the
/// decompressor and any other per data block allocations.
boost::scoped_ptr<MemPool> data_buffer_pool_;
/// Time spent decompressing bytes.
- RuntimeProfile::Counter* decompress_timer_;
+ RuntimeProfile::Counter* decompress_timer_ = nullptr;
/// Matching typedef for WriteAlignedTuples for codegen. Refer to comments for
/// that function.
typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, int, FieldLocation*,
int, int, int, int);
/// Jitted write tuples function pointer. Null if codegen is disabled.
- WriteTuplesFn write_tuples_fn_;
+ WriteTuplesFn write_tuples_fn_ = nullptr;
/// Implements GetNext(). Should be overridden by subclasses.
/// Only valid to call if the parent scan node is multi-threaded.
- virtual Status GetNextInternal(RowBatch* row_batch) {
+ virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT {
DCHECK(false) << "GetNextInternal() not implemented for this scanner type.";
return Status::OK();
}
@@ -301,25 +293,10 @@ class HdfsScanner {
/// - type - type for this scanner
/// - scanner_name - debug string name for this scanner (e.g. HdfsTextScanner)
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
- THdfsFileFormat::type type, const std::string& scanner_name);
-
- /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly.
- /// Only valid to call if the parent scan node is multi-threaded.
- Status StartNewRowBatch();
+ THdfsFileFormat::type type, const std::string& scanner_name) WARN_UNUSED_RESULT;
/// Reset internal state for a new scan range.
- virtual Status InitNewRange() = 0;
-
- /// Gets memory for outputting tuples into batch_.
- /// *pool is the mem pool that should be used for memory allocated for those tuples.
- /// *tuple_mem should be the location to output tuples, and
- /// *tuple_row_mem for outputting tuple rows.
- /// Returns the maximum number of tuples/tuple rows that can be output (before the
- /// current row batch is complete and a new one is allocated).
- /// Memory returned from this call is invalidated after calling CommitRows().
- /// Callers must call GetMemory() again after calling this function.
- /// Only valid to call if the parent scan node is multi-threaded.
- int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem);
+ virtual Status InitNewRange() WARN_UNUSED_RESULT = 0;
/// Gets memory for outputting tuples into the CollectionValue being constructed via
/// 'builder'. If memory limit is exceeded, an error status is returned. Otherwise,
@@ -330,40 +307,13 @@ class HdfsScanner {
/// the next tuple. This also means its unnecessary to call
/// (*tuple_row_mem)->SetTuple().
Status GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool,
- Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows);
+ Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) WARN_UNUSED_RESULT;
/// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' accordingly.
/// Attaches completed resources from 'context_' to 'row_batch' if necessary.
- /// Frees local expr allocations.
- /// If 'enqueue_if_full' is true and 'row_batch' is at capacity after committing the
- /// rows, then 'row_batch' is added to the queue, and a new batch is created with
- /// StartNewRowBatch(). It is only valid to pass true for 'enqueue_if_full' if the
- /// parent parent scan node is multi-threaded.
- /// Returns non-OK if 'context_' is cancelled or the query status in 'state_' is
- /// non-OK.
- Status CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch);
-
- /// Calls the above CommitRows() passing true for 'queue_if_full', and 'batch_' as the
- /// row batch. Only valid to call if the parent scan node is multi-threaded.
- Status CommitRows(int num_rows) {
- DCHECK(scan_node_->HasRowBatchQueue());
- return CommitRows(num_rows, true, batch_);
- }
-
- /// Release all memory in 'pool' to batch_. If 'commit_batch' is true, the row batch
- /// will be committed. 'commit_batch' should be true if the attached pool is expected
- /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage.
- /// Can return an error status if 'commit_batch' is true and allocating the next
- /// batch fails, or if the query hit an error or is cancelled. Only valid to call if
- /// the parent scan node is multi-threaded.
- Status AttachPool(MemPool* pool, bool commit_batch) {
- DCHECK(scan_node_->HasRowBatchQueue());
- DCHECK(batch_ != NULL);
- DCHECK(pool != NULL);
- batch_->tuple_data_pool()->AcquireData(pool, false);
- if (commit_batch) RETURN_IF_ERROR(CommitRows(0));
- return Status::OK();
- }
+ /// Frees local expr allocations. Returns non-OK if 'context_' is cancelled or the
+ /// query status in 'state_' is non-OK.
+ Status CommitRows(int num_rows, RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Convenience function for evaluating conjuncts using this scanner's ScalarExprEvaluators.
/// This must always be inlined so we can correctly replace the call to
@@ -395,8 +345,8 @@ class HdfsScanner {
/// Update the decompressor_ object given a compression type or codec name. Depending on
/// the old compression type and the new one, it may close the old decompressor and/or
/// create a new one of different type.
- Status UpdateDecompressor(const THdfsCompression::type& compression);
- Status UpdateDecompressor(const std::string& codec);
+ Status UpdateDecompressor(const THdfsCompression::type& compression) WARN_UNUSED_RESULT;
+ Status UpdateDecompressor(const std::string& codec) WARN_UNUSED_RESULT;
/// Utility function to report parse errors for each field.
/// If errors[i] is nonzero, fields[i] had a parse error.
@@ -405,10 +355,10 @@ class HdfsScanner {
/// This is called from WriteAlignedTuples.
bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors);
- /// Triggers debug action of the scan node. This is currently used by parquet column
- /// readers to exercise various failure paths in parquet scanner. Returns the status
+ /// Triggers debug action of the scan node. This is currently used by Parquet column
+ /// readers to exercise various failure paths in Parquet scanner. Returns the status
/// returned by the scan node's TriggerDebugAction().
- Status ScannerDebugAction() {
+ Status ScannerDebugAction() WARN_UNUSED_RESULT {
return scan_node_->ScanNodeDebugAction(TExecNodePhase::GETNEXT_SCANNER);
}
@@ -435,21 +385,23 @@ class HdfsScanner {
/// TODO: revisit this
bool WriteCompleteTuple(MemPool* pool, FieldLocation* fields, Tuple* tuple,
TupleRow* tuple_row, Tuple* template_tuple, uint8_t* error_fields,
- uint8_t* error_in_row);
+ uint8_t* error_in_row) WARN_UNUSED_RESULT;
/// Codegen function to replace WriteCompleteTuple. Should behave identically
/// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
/// if codegen was successful or NULL otherwise.
static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen,
const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** write_complete_tuple_fn);
+ llvm::Function** write_complete_tuple_fn)
+ WARN_UNUSED_RESULT;
/// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross
/// compiled to IR. This function loads the precompiled IR function, modifies it,
/// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
/// successful or NULL otherwise.
static Status CodegenWriteAlignedTuples(HdfsScanNodeBase*, LlvmCodeGen*,
- llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
+ llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn)
+ WARN_UNUSED_RESULT;
/// Report parse error for column @ desc. If abort_on_error is true, sets
/// parse_status_ to the error message.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 355a554..9b66432 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -42,10 +42,7 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNodeBase* scan_node,
- RuntimeState* state)
- : BaseSequenceScanner(scan_node, state),
- unparsed_data_buffer_(NULL),
- num_buffered_records_in_compressed_block_(0) {
+ RuntimeState* state) : BaseSequenceScanner(scan_node, state) {
}
HdfsSequenceScanner::~HdfsSequenceScanner() {
@@ -54,22 +51,22 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
// Codegen for materialized parsed data into tuples.
Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
const vector<ScalarExpr*>& conjuncts, Function** write_aligned_tuples_fn) {
- *write_aligned_tuples_fn = NULL;
+ *write_aligned_tuples_fn = nullptr;
DCHECK(node->runtime_state()->ShouldCodegen());
LlvmCodeGen* codegen = node->runtime_state()->codegen();
- DCHECK(codegen != NULL);
+ DCHECK(codegen != nullptr);
Function* write_complete_tuple_fn;
RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
&write_complete_tuple_fn));
- DCHECK(write_complete_tuple_fn != NULL);
+ DCHECK(write_complete_tuple_fn != nullptr);
RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn,
write_aligned_tuples_fn));
- DCHECK(*write_aligned_tuples_fn != NULL);
+ DCHECK(*write_aligned_tuples_fn != nullptr);
return Status::OK();
}
Status HdfsSequenceScanner::InitNewRange() {
- DCHECK(header_ != NULL);
+ DCHECK(header_ != nullptr);
only_parsing_header_ = false;
HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor();
@@ -168,23 +165,34 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
// 3. Read the sync indicator and check the sync block
// This mimics the technique for text.
// This function only returns on error or when the entire scan range is complete.
-Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() {
+Status HdfsSequenceScanner::ProcessBlockCompressedScanRange(RowBatch* row_batch) {
DCHECK(header_->is_compressed);
- while (!finished()) {
- if (scan_node_->ReachedLimit()) return Status::OK();
-
+ if (num_buffered_records_in_compressed_block_ == 0) {
+ // We are reading a new compressed block. Pass the previous buffer pool bytes to the
+ // batch. We don't need them anymore.
+ if (!decompressor_->reuse_output_buffer()) {
+ row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
+ RETURN_IF_ERROR(CommitRows(0, row_batch));
+ if (row_batch->AtCapacity()) return Status::OK();
+ }
// Step 1
RETURN_IF_ERROR(ReadCompressedBlock());
if (num_buffered_records_in_compressed_block_ < 0) return parse_status_;
+ }
- // Step 2
- while (num_buffered_records_in_compressed_block_ > 0) {
- RETURN_IF_ERROR(ProcessDecompressedBlock());
- }
+ // Step 2
+ while (num_buffered_records_in_compressed_block_ > 0) {
+ RETURN_IF_ERROR(ProcessDecompressedBlock(row_batch));
+ if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+ }
- // SequenceFiles don't end with syncs
- if (stream_->eof()) return Status::OK();
+ if (num_buffered_records_in_compressed_block_ == 0) {
+ // SequenceFiles don't end with syncs.
+ if (stream_->eof()) {
+ eos_ = true;
+ return Status::OK();
+ }
// Step 3
int sync_indicator;
@@ -193,8 +201,8 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() {
if (state_->LogHasSpace()) {
stringstream ss;
ss << "Expecting sync indicator (-1) at file offset "
- << (stream_->file_offset() - sizeof(int)) << ". "
- << "Sync indicator found " << sync_indicator << ".";
+ << (stream_->file_offset() - sizeof(int)) << ". "
+ << "Sync indicator found " << sync_indicator << ".";
state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
}
return Status("Bad sync hash");
@@ -205,18 +213,17 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() {
return Status::OK();
}
-Status HdfsSequenceScanner::ProcessDecompressedBlock() {
- MemPool* pool;
- TupleRow* tuple_row;
- int64_t max_tuples = GetMemory(&pool, &tuple_, &tuple_row);
+Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
+ int64_t max_tuples = row_batch->capacity() - row_batch->num_rows();
int num_to_process = min(max_tuples, num_buffered_records_in_compressed_block_);
num_buffered_records_in_compressed_block_ -= num_to_process;
+ TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow());
if (scan_node_->materialized_slots().empty()) {
// Handle case where there are no slots to materialize (e.g. count(*))
num_to_process = WriteTemplateTuples(tuple_row, num_to_process);
COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
- RETURN_IF_ERROR(CommitRows(num_to_process));
+ RETURN_IF_ERROR(CommitRows(num_to_process, row_batch));
return Status::OK();
}
@@ -261,34 +268,32 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
// Call jitted function if possible
int tuples_returned;
- if (write_tuples_fn_ != NULL) {
+ if (write_tuples_fn_ != nullptr) {
// HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string slots
// and escape characters. TextConverter::WriteSlot() will be used instead.
DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
delimited_text_parser_->escape_char() == '\0');
// last argument: seq always starts at record_location[0]
- tuples_returned = write_tuples_fn_(this, pool, tuple_row,
- batch_->row_byte_size(), field_locations_.data(), num_to_process,
+ tuples_returned = write_tuples_fn_(this, row_batch->tuple_data_pool(), tuple_row,
+ row_batch->row_byte_size(), field_locations_.data(), num_to_process,
max_added_tuples, scan_node_->materialized_slots().size(), 0);
} else {
- tuples_returned = WriteAlignedTuples(pool, tuple_row,
- batch_->row_byte_size(), field_locations_.data(), num_to_process,
+ tuples_returned = WriteAlignedTuples(row_batch->tuple_data_pool(), tuple_row,
+ row_batch->row_byte_size(), field_locations_.data(), num_to_process,
max_added_tuples, scan_node_->materialized_slots().size(), 0);
}
if (tuples_returned == -1) return parse_status_;
COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
- RETURN_IF_ERROR(CommitRows(tuples_returned));
+ RETURN_IF_ERROR(CommitRows(tuples_returned, row_batch));
return Status::OK();
}
-Status HdfsSequenceScanner::ProcessRange() {
- num_buffered_records_in_compressed_block_ = 0;
-
+Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) {
SeqFileHeader* seq_header = reinterpret_cast<SeqFileHeader*>(header_);
// Block compressed is handled separately to minimize function calls.
if (seq_header->is_compressed && !seq_header->is_row_compressed) {
- return ProcessBlockCompressedScanRange();
+ return ProcessBlockCompressedScanRange(row_batch);
}
// We count the time here since there is too much overhead to do
@@ -296,40 +301,30 @@ Status HdfsSequenceScanner::ProcessRange() {
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
int64_t num_rows_read = 0;
- while (!finished()) {
+ const bool has_materialized_slots = !scan_node_->materialized_slots().empty();
+ while (!eos_) {
DCHECK_GT(record_locations_.size(), 0);
- // Get the next compressed or uncompressed record.
- RETURN_IF_ERROR(
- GetRecord(&record_locations_[0].record, &record_locations_[0].len));
+ TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
- MemPool* pool;
- TupleRow* tuple_row_mem;
- int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
- DCHECK_GT(max_tuples, 0);
-
- // Parse the current record.
+ // Get the next compressed or uncompressed record and parse it.
+ RETURN_IF_ERROR(GetRecord(&record_locations_[0].record, &record_locations_[0].len));
bool add_row = false;
-
- // Parse the current record.
- if (scan_node_->materialized_slots().size() != 0) {
+ if (has_materialized_slots) {
char* col_start;
uint8_t* record_start = record_locations_[0].record;
int num_tuples = 0;
int num_fields = 0;
char* row_end_loc;
- uint8_t error_in_row = false;
-
RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(
1, record_locations_[0].len, reinterpret_cast<char**>(&record_start),
&row_end_loc, field_locations_.data(), &num_tuples, &num_fields, &col_start));
- DCHECK(num_tuples == 1);
+ DCHECK_EQ(num_tuples, 1);
+ uint8_t error_in_row = false;
uint8_t errors[num_fields];
memset(errors, 0, num_fields);
-
- add_row = WriteCompleteTuple(pool, field_locations_.data(), tuple_, tuple_row_mem,
- template_tuple_, &errors[0], &error_in_row);
-
+ add_row = WriteCompleteTuple(row_batch->tuple_data_pool(), field_locations_.data(),
+ tuple_, tuple_row_mem, template_tuple_, &errors[0], &error_in_row);
if (UNLIKELY(error_in_row)) {
ReportTupleParseError(field_locations_.data(), errors);
RETURN_IF_ERROR(parse_status_);
@@ -338,11 +333,13 @@ Status HdfsSequenceScanner::ProcessRange() {
add_row = WriteTemplateTuples(tuple_row_mem, 1);
}
num_rows_read++;
- if (add_row) RETURN_IF_ERROR(CommitRows(1));
- if (scan_node_->ReachedLimit()) break;
+ if (add_row) RETURN_IF_ERROR(CommitRows(1, row_batch));
- // Sequence files don't end with syncs
- if (stream_->eof()) break;
+ // Sequence files don't end with syncs.
+ if (stream_->eof()) {
+ eos_ = true;
+ break;
+ }
// Check for sync by looking for the marker that precedes syncs.
int marker;
@@ -351,6 +348,10 @@ Status HdfsSequenceScanner::ProcessRange() {
RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ false));
RETURN_IF_ERROR(ReadSync());
}
+
+ // These checks must come after advancing past the next sync such that the stream is
+ // at the start of the next data block when this function is called again.
+ if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
}
COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
@@ -454,19 +455,14 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
}
Status HdfsSequenceScanner::ReadCompressedBlock() {
- // We are reading a new compressed block. Pass the previous buffer pool
- // bytes to the batch. We don't need them anymore.
- if (!decompressor_->reuse_output_buffer()) {
- RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
- }
-
+ int64_t num_buffered_records;
RETURN_IF_FALSE(stream_->ReadVLong(
- &num_buffered_records_in_compressed_block_, &parse_status_));
- if (num_buffered_records_in_compressed_block_ < 0) {
+ &num_buffered_records, &parse_status_));
+ if (num_buffered_records < 0) {
if (state_->LogHasSpace()) {
stringstream ss;
ss << "Bad compressed block record count: "
- << num_buffered_records_in_compressed_block_;
+ << num_buffered_records;
state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
}
return Status("bad record count");
@@ -490,7 +486,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
return Status(ss.str());
}
- uint8_t* compressed_data = NULL;
+ uint8_t* compressed_data = nullptr;
RETURN_IF_FALSE(stream_->ReadBytes(block_size, &compressed_data, &parse_status_));
{
@@ -501,6 +497,6 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
VLOG_FILE << "Decompressed " << block_size << " to " << len;
next_record_in_compressed_block_ = unparsed_data_buffer_;
}
-
+ num_buffered_records_in_compressed_block_ = num_buffered_records;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 01d5a68..4845edb 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -19,30 +19,29 @@
#ifndef IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
#define IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
-/// This scanner parses Sequence file located in HDFS, and writes the
-/// content as tuples in the Impala in-memory representation of data, e.g.
-/// (tuples, rows, row batches).
-//
+/// This scanner parses Sequence files and writes the content as tuples in the Impala
+/// in-memory representation of data (tuples, rows, row batches).
+///
/// TODO: Make the various sequence file formats behave more similarly. They should
/// all have a structure similar to block compressed operating in batches rather than
/// row at a time.
-//
+///
/// org.apache.hadoop.io.SequenceFile is the original SequenceFile implementation
/// and should be viewed as the canonical definition of this format. If
/// anything is unclear in this file you should consult the code in
/// org.apache.hadoop.io.SequenceFile.
-//
+///
/// The following is a pseudo-BNF grammar for SequenceFile. Comments are prefixed
/// with dashes:
-//
+///
/// seqfile ::=
/// <file-header>
/// <record-block>+
-//
+///
/// record-block ::=
/// <record>+
/// <file-sync-hash>
-//
+///
/// file-header ::=
/// <file-version-header>
/// <file-key-class-name>
@@ -52,68 +51,68 @@
/// [<file-compression-codec-class>]
/// <file-header-metadata>
/// <file-sync-field>
-//
+///
/// file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
-//
+///
/// -- The name of the Java class responsible for reading the key buffer
-//
+///
/// file-key-class-name ::=
/// Text {"org.apache.hadoop.io.BytesWritable"}
-//
+///
/// -- The name of the Java class responsible for reading the value buffer
-//
+///
/// -- We don't care what this is.
/// file-value-class-name ::=
-//
+///
/// -- Boolean variable indicating whether or not the file uses compression
/// -- for key/values in this file
-//
+///
/// file-is-compressed ::= Byte[1]
-//
+///
/// -- A boolean field indicating whether or not the file is block compressed.
-//
+///
/// file-is-block-compressed ::= Byte[1] {false}
-//
+///
/// -- The Java class name of the compression codec iff <file-is-compressed>
/// -- is true. The named class must implement
/// -- org.apache.hadoop.io.compress.CompressionCodec.
/// -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
-//
+///
/// file-compression-codec-class ::= Text
-//
+///
/// -- A collection of key-value pairs defining metadata values for the
/// -- file. The Map is serialized using standard JDK serialization, i.e.
/// -- an Int corresponding to the number of key-value pairs, followed by
/// -- Text key and value pairs.
-//
+///
/// file-header-metadata ::= Map<Text, Text>
-//
+///
/// -- A 16 byte marker that is generated by the writer. This marker appears
/// -- at regular intervals at the beginning of records or record blocks
/// -- intended to enable readers to skip to a random part of the file
/// -- the sync hash is preceeded by a length of -1, refered to as the sync marker
-//
+///
/// file-sync-hash ::= Byte[16]
-//
+///
/// -- Records are all of one type as determined by the compression bits in the header
-//
+///
/// record ::=
/// <uncompressed-record> |
/// <block-compressed-record> |
/// <record-compressed-record>
-//
+///
/// uncompressed-record ::=
/// <record-length>
/// <key-length>
/// <key>
/// <value>
-//
+///
/// record-compressed-record ::=
/// <record-length>
/// <key-length>
/// <key>
/// <compressed-value>
-//
+///
/// block-compressed-record ::=
/// <file-sync-field>
/// <key-lengths-block-size>
@@ -124,30 +123,30 @@
/// <value-lengths-block>
/// <values-block-size>
/// <values-block>
-//
+///
/// record-length := Int
/// key-length := Int
/// keys-lengths-block-size> := Int
/// value-lengths-block-size> := Int
-//
+///
/// keys-block :: = Byte[keys-block-size]
/// values-block :: = Byte[values-block-size]
-//
+///
/// -- The key-lengths and value-lengths blocks are are a sequence of lengths encoded
/// -- in ZeroCompressedInteger (VInt) format.
-//
+///
/// key-lengths-block :: = Byte[key-lengths-block-size]
/// value-lengths-block :: = Byte[value-lengths-block-size]
-//
+///
/// Byte ::= An eight-bit byte
-//
+///
/// VInt ::= Variable length integer. The high-order bit of each byte
/// indicates whether more bytes remain to be read. The low-order seven
/// bits are appended as increasingly more significant bits in the
/// resulting integer value.
-//
+///
/// Int ::= A four-byte integer in big-endian format.
-//
+///
/// Text ::= VInt, Chars (Length prefixed UTF-8 characters)
#include "exec/base-sequence-scanner.h"
@@ -167,20 +166,21 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
virtual ~HdfsSequenceScanner();
/// Implementation of HdfsScanner interface.
- virtual Status Open(ScannerContext* context);
+ virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
/// Codegen WriteAlignedTuples(). Stores the resulting function in
- /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
+ /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
static Status Codegen(HdfsScanNodeBase* node,
const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** write_aligned_tuples_fn);
+ llvm::Function** write_aligned_tuples_fn)
+ WARN_UNUSED_RESULT;
protected:
/// Implementation of sequence container super class methods.
virtual FileHeader* AllocateFileHeader();
- virtual Status ReadFileHeader();
- virtual Status InitNewRange();
- virtual Status ProcessRange();
+ virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
+ virtual Status InitNewRange() WARN_UNUSED_RESULT;
+ virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
virtual THdfsFileFormat::type file_format() const {
return THdfsFileFormat::SEQUENCE_FILE;
@@ -189,35 +189,37 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
private:
/// Maximum size of a compressed block. This is used to check for corrupted
/// block size so we do not read the whole file before we detect the error.
- const static int MAX_BLOCK_SIZE = (1024 * 1024 * 1024);
+ static const int64_t MAX_BLOCK_SIZE = 1024 * 1024 * 1024;
/// The value class name located in the SeqFile Header.
/// This is always "org.apache.hadoop.io.Text"
static const char* const SEQFILE_VALUE_CLASS_NAME;
- /// Read the record header.
- /// Sets:
- /// current_block_length_
- Status ReadBlockHeader();
+ /// Reads the record header and sets 'current_block_length_'.
+ Status ReadBlockHeader() WARN_UNUSED_RESULT;
- /// Process an entire block compressed scan range. Block compressed ranges are
- /// more common and can be parsed more efficiently in larger pieces.
- Status ProcessBlockCompressedScanRange();
+ /// Processes or continues processing a block-compressed scan range, adding tuples
+ /// to 'row_batch'. Block-compressed ranges are common and can be parsed more
+ /// efficiently in larger pieces.
+ Status ProcessBlockCompressedScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
- /// Read a compressed block. Does NOT read sync or -1 marker preceding sync.
- /// Decompress to unparsed_data_buffer_ allocated from unparsed_data_buffer_pool_.
- Status ReadCompressedBlock();
+ /// Reads a compressed block. Does NOT read sync or -1 marker preceding sync.
+ /// Decompresses the data into 'unparsed_data_buffer_' allocated from the
+ /// 'data_buffer_pool_' via the decompressor.
+ /// Sets 'num_buffered_records_in_compressed_block_' if decompression was
+ /// successful.
+ Status ReadCompressedBlock() WARN_UNUSED_RESULT;
- /// Utility function for parsing next_record_in_compressed_block_. Called by
- /// ProcessBlockCompressedScanRange.
- Status ProcessDecompressedBlock();
+ /// Utility function for parsing 'next_record_in_compressed_block_'. Called by
+ /// ProcessBlockCompressedScanRange().
+ Status ProcessDecompressedBlock(RowBatch* row_batch) WARN_UNUSED_RESULT;
- /// Read compressed or uncompressed records from the byte stream into memory
- /// in unparsed_data_buffer_pool_. Not used for block compressed files.
+ /// Read a single record from the current position in 'stream_', decompressing
+ /// the record, if necessary. Not used for block compressed files.
/// Output:
- /// record_ptr: ponter to the record.
+ /// record_ptr: pointer to the record
/// record_len: length of the record
- Status GetRecord(uint8_t** record_ptr, int64_t *record_len);
+ Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT;
/// Helper class for picking fields and rows from delimited text.
boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
@@ -235,25 +237,24 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
int64_t len;
};
- /// Records are processed in batches. This vector stores batches of record locations
+ /// Records are processed in batches. This vector stores batches of record locations
/// that are being processed.
- /// TODO: better perf not to use vector?
std::vector<RecordLocation> record_locations_;
/// Length of the current sequence file block (or record).
- int current_block_length_;
+ int current_block_length_ = -1;
/// Length of the current key. This is specified as 4 bytes in the format description.
- int current_key_length_;
+ int current_key_length_ = -1;
- /// Buffer for data read from HDFS or from decompressing the HDFS data.
- uint8_t* unparsed_data_buffer_;
+ /// Buffer for data read from the 'stream_' directly or after decompression.
+ uint8_t* unparsed_data_buffer_ = nullptr;
/// Number of buffered records unparsed_data_buffer_ from block compressed data.
- int64_t num_buffered_records_in_compressed_block_;
+ int64_t num_buffered_records_in_compressed_block_ = 0;
/// Next record from block compressed data.
- uint8_t* next_record_in_compressed_block_;
+ uint8_t* next_record_in_compressed_block_ = nullptr;
};
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index c1a12d6..378b45e 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -17,6 +17,8 @@
#include "exec/hdfs-text-scanner.h"
+#include <memory>
+
#include "codegen/llvm-codegen.h"
#include "exec/delimited-text-parser.h"
#include "exec/delimited-text-parser.inline.h"
@@ -152,22 +154,6 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
return Status::OK();
}
-Status HdfsTextScanner::ProcessSplit() {
- DCHECK(scan_node_->HasRowBatchQueue());
- HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
- do {
- batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
- scan_node_->mem_tracker());
- RETURN_IF_ERROR(GetNextInternal(batch_));
- scan_node->AddMaterializedRowBatch(batch_);
- } while (!eos_ && !scan_node_->ReachedLimit());
-
- // Transfer the remaining resources to this new batch in Close().
- batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
- scan_node_->mem_tracker());
- return Status::OK();
-}
-
void HdfsTextScanner::Close(RowBatch* row_batch) {
DCHECK(!is_closed_);
// Need to close the decompressor before transferring the remaining resources to
@@ -183,7 +169,8 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false);
context_->ReleaseCompletedResources(row_batch, true);
if (scan_node_->HasRowBatchQueue()) {
- static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+ static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+ unique_ptr<RowBatch>(row_batch));
}
} else {
if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll();
@@ -201,7 +188,7 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
scan_node_->RangeComplete(THdfsFileFormat::TEXT,
stream_->file_desc()->file_compression);
}
- HdfsScanner::Close(row_batch);
+ CloseInternal();
}
Status HdfsTextScanner::InitNewRange() {
@@ -321,14 +308,14 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
DCHECK_LE(num_tuples, 1);
DCHECK_GE(num_tuples, 0);
COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
- RETURN_IF_ERROR(CommitRows(num_tuples, false, row_batch));
+ RETURN_IF_ERROR(CommitRows(num_tuples, row_batch));
} else if (delimited_text_parser_->HasUnfinishedTuple()) {
DCHECK(scan_node_->materialized_slots().empty());
DCHECK_EQ(scan_node_->num_materialized_partition_keys(), 0);
// If no fields are materialized we do not update partial_tuple_empty_,
// boundary_column_, or boundary_row_. However, we still need to handle the case
// of partial tuple due to missing tuple delimiter at the end of file.
- RETURN_IF_ERROR(CommitRows(1, false, row_batch));
+ RETURN_IF_ERROR(CommitRows(1, row_batch));
}
break;
}
@@ -417,7 +404,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) {
}
RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row));
}
- RETURN_IF_ERROR(CommitRows(num_tuples_materialized, false, row_batch));
+ RETURN_IF_ERROR(CommitRows(num_tuples_materialized, row_batch));
// Already past the scan range and attempting to complete the last row.
if (scan_state_ == PAST_SCAN_RANGE) break;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index af8fa8a..ceeda8d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -51,19 +51,18 @@ class HdfsTextScanner : public HdfsScanner {
virtual ~HdfsTextScanner();
/// Implementation of HdfsScanner interface.
- virtual Status Open(ScannerContext* context);
- virtual Status ProcessSplit();
+ virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
virtual void Close(RowBatch* row_batch);
/// Issue io manager byte ranges for 'files'.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
- const std::vector<HdfsFileDesc*>& files);
+ const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT;
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
static Status Codegen(HdfsScanNodeBase* node,
const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** write_aligned_tuples_fn);
+ llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
/// Suffix for lzo index files.
const static std::string LZO_INDEX_SUFFIX;
@@ -71,11 +70,11 @@ class HdfsTextScanner : public HdfsScanner {
static const char* LLVM_CLASS_NAME;
protected:
- virtual Status GetNextInternal(RowBatch* row_batch);
+ virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Reset the scanner. This clears any partial state that needs to
/// be cleared when starting or when restarting after an error.
- Status ResetScanner();
+ Status ResetScanner() WARN_UNUSED_RESULT;
/// Current position in byte buffer.
char* byte_buffer_ptr_;
@@ -103,14 +102,14 @@ class HdfsTextScanner : public HdfsScanner {
/// Initializes this scanner for this context. The context maps to a single
/// scan range. Advances the scan state to SCAN_RANGE_INITIALIZED.
- virtual Status InitNewRange();
+ virtual Status InitNewRange() WARN_UNUSED_RESULT;
/// Finds the start of the first tuple in this scan range and initializes
/// 'byte_buffer_ptr_' to point to the start of first tuple. Advances the scan state
/// to FIRST_TUPLE_FOUND, if successful. Otherwise, consumes the whole scan range
/// and does not update the scan state (e.g. if there are really large columns).
/// Only valid to call in scan state SCAN_RANGE_INITIALIZED.
- Status FindFirstTuple(MemPool* pool);
+ Status FindFirstTuple(MemPool* pool) WARN_UNUSED_RESULT;
/// When in scan state FIRST_TUPLE_FOUND, starts or continues processing the scan range
/// by reading bytes from 'context_'. Adds materialized tuples that pass the conjuncts
@@ -122,11 +121,11 @@ class HdfsTextScanner : public HdfsScanner {
/// Advances the scan state to PAST_SCAN_RANGE if all bytes in the scan range have been
/// processed.
/// Only valid to call in scan state FIRST_TUPLE_FOUND or PAST_SCAN_RANGE.
- Status ProcessRange(RowBatch* row_batch, int* num_tuples);
+ Status ProcessRange(RowBatch* row_batch, int* num_tuples) WARN_UNUSED_RESULT;
/// Reads past the end of the scan range for the next tuple end. If successful,
/// advances the scan state to DONE. Only valid to call in state PAST_SCAN_RANGE.
- Status FinishScanRange(RowBatch* row_batch);
+ Status FinishScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Fills the next byte buffer from the context. This will block if there are no bytes
/// ready. Updates byte_buffer_ptr_, byte_buffer_end_ and byte_buffer_read_size_.
@@ -137,11 +136,12 @@ class HdfsTextScanner : public HdfsScanner {
/// If applicable, attaches decompression buffers from previous calls that might still
/// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are
/// freed instead.
- virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0);
+ virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0)
+ WARN_UNUSED_RESULT;
/// Fills the next byte buffer from the compressed data in stream_ by reading the entire
/// file, decompressing it, and setting the byte_buffer_ptr_ to the decompressed buffer.
- Status FillByteBufferCompressedFile(bool* eosr);
+ Status FillByteBufferCompressedFile(bool* eosr) WARN_UNUSED_RESULT;
/// Fills the next byte buffer from the compressed data in stream_. Unlike
/// FillByteBufferCompressedFile(), the entire file does not need to be read at once.
@@ -149,21 +149,21 @@ class HdfsTextScanner : public HdfsScanner {
/// to available decompressed data.
/// Attaches decompression buffers from previous calls that might still be referenced
/// by returned batches to 'pool'. If 'pool' is nullptr the buffers are freed instead.
- Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr);
+ Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr) WARN_UNUSED_RESULT;
/// Used by FillByteBufferCompressedStream() to decompress data from 'stream_'.
/// Returns COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS if it needs more input.
/// If bytes_to_read > 0, will read specified size.
/// If bytes_to_read = -1, will call GetBuffer().
Status DecompressBufferStream(int64_t bytes_to_read, uint8_t** decompressed_buffer,
- int64_t* decompressed_len, bool *eosr);
+ int64_t* decompressed_len, bool *eosr) WARN_UNUSED_RESULT;
/// Checks if the current buffer ends with a row delimiter spanning this and the next
/// buffer (i.e. a "\r\n" delimiter). Does not modify byte_buffer_ptr_, etc. Always
/// returns false if the table's row delimiter is not '\n'. This can only be called
/// after the buffer has been fully parsed, i.e. when byte_buffer_ptr_ ==
/// byte_buffer_end_.
- Status CheckForSplitDelimiter(bool* split_delimiter);
+ Status CheckForSplitDelimiter(bool* split_delimiter) WARN_UNUSED_RESULT;
/// Prepends field data that was from the previous file buffer (This field straddled two
/// file buffers). 'data' already contains the pointer/len from the current file buffer,
@@ -171,7 +171,7 @@ class HdfsTextScanner : public HdfsScanner {
/// This function will allocate a new string from the tuple pool, concatenate the
/// two pieces and update 'data' to contain the new pointer/len. Return error status if
/// memory limit is exceeded when allocating a new string.
- Status CopyBoundaryField(FieldLocation* data, MemPool* pool);
+ Status CopyBoundaryField(FieldLocation* data, MemPool* pool) WARN_UNUSED_RESULT;
/// Writes intermediate parsed data into 'tuple_', evaluates conjuncts, and appends
/// surviving rows to 'row'. Advances 'tuple_' and 'row' as necessary.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 08a3339..0845d9a 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -97,9 +97,9 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
}
*eos = false;
- RowBatch* materialized_batch = materialized_row_batches_->GetBatch();
+ unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
if (materialized_batch != NULL) {
- row_batch->AcquireState(materialized_batch);
+ row_batch->AcquireState(materialized_batch.get());
num_rows_returned_ += row_batch->num_rows();
COUNTER_SET(rows_returned_counter_, num_rows_returned_);
@@ -114,7 +114,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
done_ = true;
materialized_row_batches_->Shutdown();
}
- delete materialized_batch;
+ materialized_batch.reset();
} else {
*eos = true;
}
@@ -172,15 +172,16 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_t
RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token));
bool eos = false;
while (!eos && !done_) {
- gscoped_ptr<RowBatch> row_batch(new RowBatch(
- row_desc(), runtime_state_->batch_size(), mem_tracker()));
+ unique_ptr<RowBatch> row_batch = std::make_unique<RowBatch>(row_desc(),
+ runtime_state_->batch_size(), mem_tracker());
RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos));
while (!done_) {
scanner->KeepKuduScannerAlive();
- if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) {
- ignore_result(row_batch.release());
+ if (materialized_row_batches_->BlockingPutWithTimeout(move(row_batch), 1000000)) {
break;
}
+ // Make sure that we still own the RowBatch if BlockingPutWithTimeout() timed out.
+ DCHECK(row_batch != nullptr);
}
}
if (eos) scan_ranges_complete_counter()->Add(1);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 69fd53d..87b9c71 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -85,7 +85,7 @@ class ScanNode : public ExecNode {
active_scanner_thread_counter_(TUnit::UNIT, 0),
active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
- virtual Status Prepare(RuntimeState* state);
+ virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
/// This should be called before Prepare(), and the argument must be not destroyed until
/// after Prepare().
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index 5d88397..a32345c 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -23,6 +23,7 @@
#include <boost/thread/mutex.hpp>
#include <boost/scoped_ptr.hpp>
#include <deque>
+#include <memory>
#include <unistd.h>
#include "common/atomic.h"
@@ -92,7 +93,7 @@ class BlockingQueue : public CacheLineAligned {
}
DCHECK(!get_list_.empty());
- *out = get_list_.front();
+ *out = std::move(get_list_.front());
get_list_.pop_front();
get_list_size_.Store(get_list_.size());
read_lock.unlock();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
index e396583..3f62b2e 100644
--- a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
+++ b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
@@ -10,8 +10,7 @@ string
row_regex: .*Problem parsing file $NAMENODE/.*
File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/truncated_string.avro' is corrupt: truncated data block at offset 155
File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/negative_string_len.avro' is corrupt: invalid length -7 at offset 164
-File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid union value 4 at offset 174
-File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid encoded integer at offset 191
+File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid union value 4 at offset 174 (1 of 2 similar)
====
---- QUERY
# Read from the corrupt files. We may get partial results.