You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/18 16:43:26 UTC
[03/32] incubator-impala git commit: IMPALA-4231: fix codegen time
regression
IMPALA-4231: fix codegen time regression
The commit "IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder"
slightly increased codegen time, which caused TPC-H Q2 to sometimes
regress significantly because of races in runtime filter arrival.
This patch attempts to fix the regression by improving codegen time in a
few places.
* Revert to using the old bool/Status return pattern. The regular Status
return pattern results in significantly more complex IR because it has
to emit code to copy and free statuses. I spent some time trying to
convince it to optimise the extra code out, but didn't have much success.
* Remove some code that cannot be specialized from cross-compilation.
* Add noexcept to some functions that are used from the IR to ensure
exception-handling IR is not emitted. This is less important after the
first change but still should help produce cleaner IR.
Performance:
I was able to reproduce a regression locally, which is fixed by this
patch. I'm in the process of trying to verify the fix on a cluster.
Change-Id: Idf0fdedabd488550b6db90167a30c582949d608d
Reviewed-on: http://gerrit.cloudera.org:8080/4623
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7fe4385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7fe4385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7fe4385
Branch: refs/heads/hadoop-next
Commit: c7fe4385d927509443a1c4e2c6e9a802d2dcf63b
Parents: 89b41c6
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Sep 30 15:18:54 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Oct 14 02:53:59 2016 +0000
----------------------------------------------------------------------
be/src/common/status.h | 30 +--
be/src/exec/hash-table.cc | 8 +-
be/src/exec/hash-table.h | 16 +-
be/src/exec/partitioned-aggregation-node-ir.cc | 6 +-
be/src/exec/partitioned-aggregation-node.cc | 10 +-
be/src/exec/partitioned-aggregation-node.h | 9 +-
be/src/exec/partitioned-hash-join-builder-ir.cc | 20 +-
be/src/exec/partitioned-hash-join-builder.cc | 19 +-
be/src/exec/partitioned-hash-join-builder.h | 14 +-
be/src/exec/partitioned-hash-join-node-ir.cc | 33 ++--
be/src/exec/partitioned-hash-join-node.h | 9 +-
be/src/runtime/buffered-tuple-stream.cc | 21 ++-
be/src/runtime/buffered-tuple-stream.h | 31 +--
be/src/runtime/buffered-tuple-stream.inline.h | 8 +-
be/src/runtime/raw-value.cc | 169 +++++++++++++++++
be/src/runtime/raw-value.h | 7 +-
be/src/runtime/raw-value.inline.h | 188 -------------------
be/src/util/bloom-filter.cc | 72 +++++++
be/src/util/bloom-filter.h | 80 +-------
19 files changed, 382 insertions(+), 368 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 35a9a94..9a9ea67 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -99,6 +99,10 @@ class Status {
if (UNLIKELY(status.msg_ != NULL)) CopyMessageFrom(status);
}
+ /// Move constructor that moves the error message (if any) and resets 'other' to the
+ /// default OK Status.
+ ALWAYS_INLINE Status(Status&& other) : msg_(other.msg_) { other.msg_ = NULL; }
+
/// Status using only the error code as a parameter. This can be used for error messages
/// that don't take format parameters.
Status(TErrorCode::type code);
@@ -153,6 +157,15 @@ class Status {
return *this;
}
+ /// Move assignment that moves the error message (if any) and resets 'other' to the
+ /// default OK Status.
+ ALWAYS_INLINE Status& operator=(Status&& other) {
+ if (UNLIKELY(msg_ != NULL)) FreeMessage();
+ msg_ = other.msg_;
+ other.msg_ = NULL;
+ return *this;
+ }
+
ALWAYS_INLINE ~Status() {
// The UNLIKELY and inlining here are important hints for the compiler to
// streamline the common case of Status::OK(). Use FreeMessage() which is
@@ -244,21 +257,12 @@ class Status {
};
/// some generally useful macros
-#define RETURN_IF_ERROR(stmt) \
- do { \
- Status __status__ = (stmt); \
- if (UNLIKELY(!__status__.ok())) return __status__; \
+#define RETURN_IF_ERROR(stmt) \
+ do { \
+ Status __status__ = (stmt); \
+ if (UNLIKELY(!__status__.ok())) return std::move(__status__); \
} while (false)
-#define RETURN_IF_ERROR_PREPEND(expr, prepend) \
- do { \
- Status __status__ = (stmt); \
- if (UNLIKELY(!__status__.ok())) { \
- return Status(strings::Substitute("$0: $1", prepend, __status__.GetDetail())); \
- } \
- } while (false)
-
-
#define ABORT_IF_ERROR(stmt) \
do { \
Status __status__ = (stmt); \
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 0d780b9..6626b33 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -149,7 +149,7 @@ uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
}
uint32_t HashTableCtx::HashRow(
- const uint8_t* expr_values, const uint8_t* expr_values_null) const {
+ const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept {
DCHECK_LT(level_, seeds_.size());
if (expr_values_cache_.var_result_offset() == -1) {
/// This handles NULLs implicitly since a constant seed value was put
@@ -162,7 +162,7 @@ uint32_t HashTableCtx::HashRow(
}
bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& ctxs,
- uint8_t* expr_values, uint8_t* expr_values_null) {
+ uint8_t* expr_values, uint8_t* expr_values_null) noexcept {
bool has_null = false;
for (int i = 0; i < ctxs.size(); ++i) {
void* loc = expr_values_cache_.ExprValuePtr(expr_values, i);
@@ -213,7 +213,7 @@ uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* expr_values,
template <bool FORCE_NULL_EQUALITY>
bool HashTableCtx::Equals(const TupleRow* build_row, const uint8_t* expr_values,
- const uint8_t* expr_values_null) const {
+ const uint8_t* expr_values_null) const noexcept {
for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
void* val = build_expr_ctxs_[i]->GetValue(build_row);
if (val == NULL) {
@@ -331,7 +331,7 @@ void HashTableCtx::ExprValuesCache::ResetIterators() {
cur_expr_values_hash_ = expr_values_hash_array_.get();
}
-void HashTableCtx::ExprValuesCache::Reset() {
+void HashTableCtx::ExprValuesCache::Reset() noexcept {
ResetIterators();
// Set the end pointer after resetting the other pointers so they point to
// the same location.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index fead1f7..4edd130 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -256,7 +256,7 @@ class HashTableCtx {
void Close(MemTracker* tracker);
/// Resets the cache states (iterators, end pointers etc) before writing.
- void Reset();
+ void Reset() noexcept;
/// Resets the iterators to the start before reading. Will record the current position
/// of the iterators in end pointer before resetting so AtEnd() can determine if all
@@ -406,7 +406,7 @@ class HashTableCtx {
/// This will be replaced by codegen. We don't want this inlined for replacing
/// with codegen'd functions so the function name does not change.
uint32_t IR_NO_INLINE HashRow(
- const uint8_t* expr_values, const uint8_t* expr_values_null) const;
+ const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept;
/// Wrapper function for calling correct HashUtil function in non-codegen'd case.
uint32_t Hash(const void* input, int len, uint32_t hash) const;
@@ -416,15 +416,15 @@ class HashTableCtx {
/// inlined when cross compiled because we need to be able to differentiate between
/// EvalBuildRow and EvalProbeRow by name and the build/probe exprs are baked into the
/// codegen'd function.
- bool IR_NO_INLINE EvalBuildRow(const TupleRow* row, uint8_t* expr_values,
- uint8_t* expr_values_null) {
+ bool IR_NO_INLINE EvalBuildRow(
+ const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept {
return EvalRow(row, build_expr_ctxs_, expr_values, expr_values_null);
}
/// Evaluate 'row' over probe exprs, storing the values into 'expr_values' and nullness
/// into 'expr_values_null'. This will be replaced by codegen.
- bool IR_NO_INLINE EvalProbeRow(const TupleRow* row, uint8_t* expr_values,
- uint8_t* expr_values_null) {
+ bool IR_NO_INLINE EvalProbeRow(
+ const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept {
return EvalRow(row, probe_expr_ctxs_, expr_values, expr_values_null);
}
@@ -437,7 +437,7 @@ class HashTableCtx {
/// 'expr_values_null'. Returns whether any expr evaluated to NULL. This will be
/// replaced by codegen.
bool EvalRow(const TupleRow* row, const std::vector<ExprContext*>& ctxs,
- uint8_t* expr_values, uint8_t* expr_values_null);
+ uint8_t* expr_values, uint8_t* expr_values_null) noexcept;
/// Returns true if the values of build_exprs evaluated over 'build_row' equal the
/// values in 'expr_values' with nullness 'expr_values_null'. FORCE_NULL_EQUALITY is
@@ -445,7 +445,7 @@ class HashTableCtx {
/// 'finds_nulls_'. This will be replaced by codegen.
template <bool FORCE_NULL_EQUALITY>
bool IR_NO_INLINE Equals(const TupleRow* build_row, const uint8_t* expr_values,
- const uint8_t* expr_values_null) const;
+ const uint8_t* expr_values_null) const noexcept;
/// Helper function that calls Equals() with the current row. Always inlined so that
/// it does not appear in cross-compiled IR.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index ed95844..83362e7 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -153,7 +153,7 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__
insert_it.SetTuple(intermediate_tuple, hash);
return Status::OK();
} else if (!process_batch_status_.ok()) {
- return process_batch_status_;
+ return std::move(process_batch_status_);
}
// We did not have enough memory to add intermediate_tuple to the stream.
@@ -198,13 +198,13 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
!TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx],
&process_batch_status_)) {
- RETURN_IF_ERROR(process_batch_status_);
+ RETURN_IF_ERROR(std::move(process_batch_status_));
// Tuple is not going into hash table, add it to the output batch.
Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_,
out_batch->tuple_data_pool(), &process_batch_status_);
if (UNLIKELY(intermediate_tuple == NULL)) {
DCHECK(!process_batch_status_.ok());
- return process_batch_status_;
+ return std::move(process_batch_status_);
}
UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row);
out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index f926725..629e407 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -965,7 +965,7 @@ Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
}
Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
- const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) {
+ const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) noexcept {
const int fixed_size = intermediate_tuple_desc_->byte_size();
const int varlen_size = GroupingExprsVarlenSize();
const int tuple_data_size = fixed_size + varlen_size;
@@ -985,8 +985,8 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
}
Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
- const vector<FunctionContext*>& agg_fn_ctxs,
- BufferedTupleStream* stream, Status* status) {
+ const vector<FunctionContext*>& agg_fn_ctxs, BufferedTupleStream* stream,
+ Status* status) noexcept {
DCHECK(stream != NULL && status != NULL);
// Allocate space for the entire tuple in the stream.
const int fixed_size = intermediate_tuple_desc_->byte_size();
@@ -1090,8 +1090,8 @@ void PartitionedAggregationNode::InitAggSlots(
}
}
-void PartitionedAggregationNode::UpdateTuple(FunctionContext** agg_fn_ctxs,
- Tuple* tuple, TupleRow* row, bool is_merge) {
+void PartitionedAggregationNode::UpdateTuple(
+ FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, bool is_merge) noexcept {
DCHECK(tuple != NULL || aggregate_evaluators_.empty());
for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
if (is_merge) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index c766ab2..0c0f3e8 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -444,14 +444,13 @@ class PartitionedAggregationNode : public ExecNode {
/// full, it will attempt to switch to IO-buffers.
Tuple* ConstructIntermediateTuple(
const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
- BufferedTupleStream* stream, Status* status);
+ BufferedTupleStream* stream, Status* status) noexcept;
/// Constructs intermediate tuple, allocating memory from pool instead of the stream.
/// Returns NULL and sets status if there is not enough memory to allocate the tuple.
Tuple* ConstructIntermediateTuple(
- const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
- MemPool* pool, Status* status);
-
+ const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool,
+ Status* status) noexcept;
/// Returns the number of bytes of variable-length data for the grouping values stored
/// in 'ht_ctx_'.
@@ -477,7 +476,7 @@ class PartitionedAggregationNode : public ExecNode {
/// This function is replaced by codegen (which is why we don't use a vector argument
/// for agg_fn_ctxs).. Any var-len data is allocated from the FunctionContexts.
void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row,
- bool is_merge = false);
+ bool is_merge = false) noexcept;
/// Called on the intermediate tuple of each group after all input rows have been
/// consumed and aggregated. Computes the final aggregate values to be returned in
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
index 21fd9e4..11974e3 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -29,15 +29,16 @@
using namespace impala;
-inline Status PhjBuilder::AppendRow(BufferedTupleStream* stream, TupleRow* row) {
- Status status;
- if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
- RETURN_IF_ERROR(status);
- return AppendRowStreamFull(stream, row);
+inline bool PhjBuilder::AppendRow(
+ BufferedTupleStream* stream, TupleRow* row, Status* status) {
+ if (LIKELY(stream->AddRow(row, status))) return true;
+ if (UNLIKELY(!status->ok())) return false;
+ return AppendRowStreamFull(stream, row, status);
}
Status PhjBuilder::ProcessBuildBatch(
RowBatch* build_batch, HashTableCtx* ctx, bool build_filters) {
+ Status status;
HashTableCtx::ExprValuesCache* expr_vals_cache = ctx->expr_values_cache();
expr_vals_cache->Reset();
FOREACH_ROW(build_batch, 0, build_batch_iter) {
@@ -47,7 +48,10 @@ Status PhjBuilder::ProcessBuildBatch(
// TODO: remove with codegen/template
// If we are NULL aware and this build row has NULL in the eq join slot,
// append it to the null_aware partition. We will need it later.
- RETURN_IF_ERROR(AppendRow(null_aware_partition_->build_rows(), build_row));
+ if (UNLIKELY(
+ !AppendRow(null_aware_partition_->build_rows(), build_row, &status))) {
+ return std::move(status);
+ }
}
continue;
}
@@ -66,7 +70,9 @@ Status PhjBuilder::ProcessBuildBatch(
const uint32_t hash = expr_vals_cache->CurExprValuesHash();
const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
Partition* partition = hash_partitions_[partition_idx];
- RETURN_IF_ERROR(AppendRow(partition->build_rows(), build_row));
+ if (UNLIKELY(!AppendRow(partition->build_rows(), build_row, &status))) {
+ return std::move(status);
+ }
}
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index b17bbff..bf5b42a 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -260,23 +260,26 @@ Status PhjBuilder::CreateHashPartitions(int level) {
return Status::OK();
}
-Status PhjBuilder::AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row) {
- Status status;
+bool PhjBuilder::AppendRowStreamFull(
+ BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept {
while (true) {
// Check if the stream is still using small buffers and try to switch to IO-buffers.
if (stream->using_small_buffers()) {
bool got_buffer;
- RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer));
+ *status = stream->SwitchToIoBuffers(&got_buffer);
+ if (!status->ok()) return false;
+
if (got_buffer) {
- if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
- RETURN_IF_ERROR(status);
+ if (LIKELY(stream->AddRow(row, status))) return true;
+ if (!status->ok()) return false;
}
}
// We ran out of memory. Pick a partition to spill. If we ran out of unspilled
// partitions, SpillPartition() will return an error status.
- RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
- if (stream->AddRow(row, &status)) return Status::OK();
- RETURN_IF_ERROR(status);
+ *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ if (!status->ok()) return false;
+ if (stream->AddRow(row, status)) return true;
+ if (!status->ok()) return false;
// Spilling one partition does not guarantee we can append a row. Keep
// spilling until we can append this row.
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 23822b2..7f81e5a 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -261,14 +261,18 @@ class PhjBuilder : public DataSink {
/// Append 'row' to 'stream'. In the common case, appending the row to the stream
/// immediately succeeds. Otherwise this function falls back to the slower path of
- /// AppendRowStreamFull(), which may spill partitions to free memory. Returns an error
- /// if it was unable to append the row, even after spilling partitions.
- Status AppendRow(BufferedTupleStream* stream, TupleRow* row);
+ /// AppendRowStreamFull(), which may spill partitions to free memory. Returns false
+ /// and sets 'status' if it was unable to append the row, even after spilling
+ /// partitions. This odd return convention is used to avoid emitting unnecessary code
+ /// for ~Status in perf-critical code.
+ bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status);
/// Slow path for AppendRow() above. It is called when the stream has failed to append
/// the row. We need to find more memory by either switching to IO-buffers, in case the
- /// stream still uses small buffers, or spilling a partition.
- Status AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row);
+ /// stream still uses small buffers, or spilling a partition. Returns false and sets
+ /// 'status' if it was unable to append the row, even after spilling partitions.
+ bool AppendRowStreamFull(
+ BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept;
/// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
/// to the Spill() call for the selected partition. The current policy is to spill the
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 44cb14b..bbed06d 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -149,10 +149,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
// build side. For those rows, we need to process the remaining join
// predicates later.
if (builder_->null_aware_partition()->build_rows()->num_rows() != 0) {
- if (num_other_join_conjuncts > 0) {
- *status = AppendProbeRow(
- null_aware_probe_partition_->probe_rows(), current_probe_row_);
- if (UNLIKELY(!status->ok())) return false;
+ if (num_other_join_conjuncts > 0
+ && UNLIKELY(!AppendProbeRow(null_aware_probe_partition_->probe_rows(),
+ current_probe_row_, status))) {
+ DCHECK(!status->ok());
+ return false;
}
return true;
}
@@ -217,7 +218,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
return true;
}
-template<int const JoinOp>
+template <int const JoinOp>
bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow(
ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
ExprContext* const* conjunct_ctxs, int num_conjuncts,
@@ -282,8 +283,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
skip_row = true;
} else {
// Condition 3 above.
- *status = AppendProbeRow(null_probe_rows_.get(), current_probe_row_);
- if (UNLIKELY(!status->ok())) return false;
+ if (UNLIKELY(
+ !AppendProbeRow(null_probe_rows_.get(), current_probe_row_, status))) {
+ DCHECK(!status->ok());
+ return false;
+ }
matched_null_probe_.push_back(false);
skip_row = true;
}
@@ -306,8 +310,10 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
// Skip the current row if we manage to append to the spilled partition's BTS.
// Otherwise, we need to bail out and report the failure.
BufferedTupleStream* probe_rows = probe_partition->probe_rows();
- *status = AppendProbeRow(probe_rows, current_probe_row_);
- if (UNLIKELY(!status->ok())) return false;
+ if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) {
+ DCHECK(!status->ok());
+ return false;
+ }
skip_row = true;
}
}
@@ -426,15 +432,12 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
return num_rows_added;
}
-inline Status PartitionedHashJoinNode::AppendProbeRow(
- BufferedTupleStream* stream, TupleRow* row) {
+inline bool PartitionedHashJoinNode::AppendProbeRow(
+ BufferedTupleStream* stream, TupleRow* row, Status* status) {
DCHECK(stream->has_write_block());
DCHECK(!stream->using_small_buffers());
DCHECK(!stream->is_pinned());
- Status status;
- if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
- DCHECK(!status.ok());
- return status;
+ return stream->AddRow(row, status);
}
template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 9827788..5b9264c 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -166,7 +166,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have
/// a write buffer allocated, so this will succeed unless an error is encountered.
- Status AppendProbeRow(BufferedTupleStream* stream, TupleRow* row);
+ /// Returns false and sets 'status' to an error if an error is encountered. This odd
+ /// return convention is used to avoid emitting unnecessary code for ~Status in perf-
+ /// critical code.
+ bool AppendProbeRow(BufferedTupleStream* stream, TupleRow* row, Status* status);
/// Probes the hash table for rows matching the current probe row and appends
/// all the matching build rows (with probe row) to output batch. Returns true
@@ -267,8 +270,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// probe_batch_ is entirely consumed.
/// For RIGHT_ANTI_JOIN, all this function does is to mark whether each build row
/// had a match.
- /// Returns the number of rows added to out_batch; -1 on error (and *status will be
- /// set). This function doesn't commit rows to the output batch so it's the caller's
+ /// Returns the number of rows added to out_batch; -1 on error (and *status will
+ /// be set). This function doesn't commit rows to the output batch so it's the caller's
/// responsibility to do so.
template<int const JoinOp>
int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, HashTableCtx* ht_ctx,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index b2fce45..e3b3b4a 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -222,8 +222,8 @@ Status BufferedTupleStream::UnpinBlock(BufferedBlockMgr::Block* block) {
return Status::OK();
}
-Status BufferedTupleStream::NewWriteBlock(int64_t block_len, int64_t null_indicators_size,
- bool* got_block) {
+Status BufferedTupleStream::NewWriteBlock(
+ int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept {
DCHECK(!closed_);
DCHECK_GE(null_indicators_size, 0);
*got_block = false;
@@ -282,7 +282,8 @@ Status BufferedTupleStream::NewWriteBlock(int64_t block_len, int64_t null_indica
return Status::OK();
}
-Status BufferedTupleStream::NewWriteBlockForRow(int64_t row_size, bool* got_block) {
+Status BufferedTupleStream::NewWriteBlockForRow(
+ int64_t row_size, bool* got_block) noexcept {
int64_t block_len;
int64_t null_indicators_size;
if (use_small_buffers_) {
@@ -694,7 +695,7 @@ void BufferedTupleStream::FixUpCollectionsForRead(const vector<SlotDescriptor*>&
}
}
-int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const {
+int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const noexcept {
int64_t size = 0;
if (has_nullable_tuple_) {
for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
@@ -733,7 +734,15 @@ int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const {
return size;
}
-bool BufferedTupleStream::DeepCopy(TupleRow* row) {
+bool BufferedTupleStream::AddRowSlow(TupleRow* row, Status* status) noexcept {
+ bool got_block;
+ int64_t row_size = ComputeRowSize(row);
+ *status = NewWriteBlockForRow(row_size, &got_block);
+ if (!status->ok() || !got_block) return false;
+ return DeepCopy(row);
+}
+
+bool BufferedTupleStream::DeepCopy(TupleRow* row) noexcept {
if (has_nullable_tuple_) {
return DeepCopyInternal<true>(row);
} else {
@@ -744,7 +753,7 @@ bool BufferedTupleStream::DeepCopy(TupleRow* row) {
// TODO: this really needs codegen
// TODO: in case of duplicate tuples, this can redundantly serialize data.
template <bool HasNullableTuple>
-bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) {
+bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) noexcept {
if (UNLIKELY(write_block_ == NULL)) return false;
DCHECK_GE(write_block_null_indicators_size_, 0);
DCHECK(write_block_->is_pinned()) << DebugString() << std::endl
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index d3bfa81..d138150 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -238,15 +238,17 @@ class BufferedTupleStream {
/// Must be called for streams using small buffers to switch to IO-sized buffers.
/// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_
/// back to false.
- /// TODO: this does not seem like the best mechanism.
+ /// TODO: IMPALA-3200: remove this when small buffers are removed.
Status SwitchToIoBuffers(bool* got_buffer);
- /// Adds a single row to the stream. Returns false and sets *status if an error
- /// occurred. BufferedTupleStream will do a deep copy of the memory in the row.
- /// After AddRow returns false, it should not be called again, unless
- /// using_small_buffers_ is true, in which case it is valid to call SwitchToIoBuffers()
- /// then AddRow() again.
- bool AddRow(TupleRow* row, Status* status);
+ /// Adds a single row to the stream. Returns true if the append succeeded, returns false
+ /// and sets 'status' to OK if appending failed but can be retried or returns false and
+ /// sets 'status' to an error if an error occurred.
+ /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow()
+ /// returns an error, it should not be called again. If appending failed without an
+ /// error and the stream is using small buffers, it is valid to call
+ /// SwitchToIoBuffers() then AddRow() again.
+ bool AddRow(TupleRow* row, Status* status) noexcept;
/// Allocates space to store a row of with fixed length 'fixed_size' and variable
/// length data 'varlen_size'. If successful, returns the pointer where fixed length
@@ -458,11 +460,15 @@ class BufferedTupleStream {
RuntimeProfile::Counter* unpin_timer_;
RuntimeProfile::Counter* get_new_block_timer_;
+ /// The slow path for AddRow() that is called if there is not sufficient space in
+ /// the current block.
+ bool AddRowSlow(TupleRow* row, Status* status) noexcept;
+
/// Copies 'row' into write_block_. Returns false if there is not enough space in
/// 'write_block_'. After returning false, write_ptr_ may be left pointing to the
/// partially-written row, and no more data can be written to write_block_.
template <bool HAS_NULLABLE_TUPLE>
- bool DeepCopyInternal(TupleRow* row);
+ bool DeepCopyInternal(TupleRow* row) noexcept;
/// Helper function to copy strings in string_slots from tuple into write_block_.
/// Updates write_ptr_ to the end of the string data added. Returns false if the data
@@ -480,7 +486,7 @@ class BufferedTupleStream {
const std::vector<SlotDescriptor*>& collection_slots);
/// Wrapper of the templated DeepCopyInternal() function.
- bool DeepCopy(TupleRow* row);
+ bool DeepCopy(TupleRow* row) noexcept;
/// Gets a new block of 'block_len' bytes from the block_mgr_, updating write_block_,
/// write_tuple_idx_, write_ptr_ and write_end_ptr_. 'null_indicators_size' is the
@@ -488,12 +494,13 @@ class BufferedTupleStream {
/// *got_block is set to true if a block was successfully acquired. Null indicators
/// (if any) will also be reserved and initialized. If there are no blocks available,
/// *got_block is set to false and write_block_ is unchanged.
- Status NewWriteBlock(int64_t block_len, int64_t null_indicators_size, bool* got_block);
+ Status NewWriteBlock(
+ int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept;
/// A wrapper around NewWriteBlock(). 'row_size' is the size of the tuple row to be
/// appended to this block. This function determines the block size required in order
/// to fit the row and null indicators.
- Status NewWriteBlockForRow(int64_t row_size, bool* got_block);
+ Status NewWriteBlockForRow(int64_t row_size, bool* got_block) noexcept;
/// Reads the next block from the block_mgr_. This blocks if necessary.
/// Updates read_block_, read_ptr_, read_tuple_idx_ and read_end_ptr_.
@@ -502,7 +509,7 @@ class BufferedTupleStream {
/// Returns the total additional bytes that this row will consume in write_block_ if
/// appended to the block. This includes the fixed length part of the row and the
/// data for inlined_string_slots_ and inlined_coll_slots_.
- int64_t ComputeRowSize(TupleRow* row) const;
+ int64_t ComputeRowSize(TupleRow* row) const noexcept;
/// Unpins block if it is an IO-sized block and updates tracking stats.
Status UnpinBlock(BufferedBlockMgr::Block* block);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h
index 7a2f247..ba6bb8c 100644
--- a/be/src/runtime/buffered-tuple-stream.inline.h
+++ b/be/src/runtime/buffered-tuple-stream.inline.h
@@ -25,14 +25,10 @@
namespace impala {
-inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) {
+inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept {
DCHECK(!closed_);
if (LIKELY(DeepCopy(row))) return true;
- bool got_block;
- int64_t row_size = ComputeRowSize(row);
- *status = NewWriteBlockForRow(row_size, &got_block);
- if (!status->ok() || !got_block) return false;
- return DeepCopy(row);
+ return AddRowSlow(row, status);
}
inline uint8_t* BufferedTupleStream::AllocateRow(int fixed_size, int varlen_size,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc
index 7247b8e..aef58f2 100644
--- a/be/src/runtime/raw-value.cc
+++ b/be/src/runtime/raw-value.cc
@@ -191,4 +191,173 @@ void RawValue::Write(const void* value, Tuple* tuple, const SlotDescriptor* slot
}
}
+uint32_t RawValue::GetHashValue(
+ const void* v, const ColumnType& type, uint32_t seed) noexcept {
+ // The choice of hash function needs to be consistent across all hosts of the cluster.
+
+ // Use HashCombine with arbitrary constant to ensure we don't return seed.
+ if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
+
+ switch (type.type) {
+ case TYPE_CHAR:
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ return RawValue::GetHashValueNonNull<impala::StringValue>(
+ reinterpret_cast<const StringValue*>(v), type, seed);
+ case TYPE_BOOLEAN:
+ return RawValue::GetHashValueNonNull<bool>(
+ reinterpret_cast<const bool*>(v), type, seed);
+ case TYPE_TINYINT:
+ return RawValue::GetHashValueNonNull<int8_t>(
+ reinterpret_cast<const int8_t*>(v), type, seed);
+ case TYPE_SMALLINT:
+ return RawValue::GetHashValueNonNull<int16_t>(
+ reinterpret_cast<const int16_t*>(v), type, seed);
+ case TYPE_INT:
+ return RawValue::GetHashValueNonNull<int32_t>(
+ reinterpret_cast<const int32_t*>(v), type, seed);
+ case TYPE_BIGINT:
+ return RawValue::GetHashValueNonNull<int64_t>(
+ reinterpret_cast<const int64_t*>(v), type, seed);
+ case TYPE_FLOAT:
+ return RawValue::GetHashValueNonNull<float>(
+ reinterpret_cast<const float*>(v), type, seed);
+ case TYPE_DOUBLE:
+ return RawValue::GetHashValueNonNull<double>(
+ reinterpret_cast<const double*>(v), type, seed);
+ case TYPE_TIMESTAMP:
+ return RawValue::GetHashValueNonNull<TimestampValue>(
+ reinterpret_cast<const TimestampValue*>(v), type, seed);
+ case TYPE_DECIMAL:
+ switch (type.GetByteSize()) {
+ case 4:
+ return RawValue::GetHashValueNonNull<Decimal4Value>(
+ reinterpret_cast<const impala::Decimal4Value*>(v), type, seed);
+ case 8:
+ return RawValue::GetHashValueNonNull<Decimal8Value>(
+ reinterpret_cast<const Decimal8Value*>(v), type, seed);
+ case 16:
+ return RawValue::GetHashValueNonNull<Decimal16Value>(
+ reinterpret_cast<const Decimal16Value*>(v), type, seed);
+ DCHECK(false);
+ }
+ default: DCHECK(false); return 0;
+ }
+}
+
+uint32_t RawValue::GetHashValueFnv(const void* v, const ColumnType& type, uint32_t seed) {
+ // Use HashCombine with arbitrary constant to ensure we don't return seed.
+ if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
+
+ switch (type.type) {
+ case TYPE_STRING:
+ case TYPE_VARCHAR: {
+ const StringValue* string_value = reinterpret_cast<const StringValue*>(v);
+ if (string_value->len == 0) {
+ return HashUtil::HashCombine32(HASH_VAL_EMPTY, seed);
+ }
+ return HashUtil::FnvHash64to32(string_value->ptr, string_value->len, seed);
+ }
+ case TYPE_BOOLEAN:
+ return HashUtil::HashCombine32(*reinterpret_cast<const bool*>(v), seed);
+ case TYPE_TINYINT: return HashUtil::FnvHash64to32(v, 1, seed);
+ case TYPE_SMALLINT: return HashUtil::FnvHash64to32(v, 2, seed);
+ case TYPE_INT: return HashUtil::FnvHash64to32(v, 4, seed);
+ case TYPE_BIGINT: return HashUtil::FnvHash64to32(v, 8, seed);
+ case TYPE_FLOAT: return HashUtil::FnvHash64to32(v, 4, seed);
+ case TYPE_DOUBLE: return HashUtil::FnvHash64to32(v, 8, seed);
+ case TYPE_TIMESTAMP: return HashUtil::FnvHash64to32(v, 12, seed);
+ case TYPE_CHAR:
+ return HashUtil::FnvHash64to32(StringValue::CharSlotToPtr(v, type), type.len, seed);
+ case TYPE_DECIMAL: return HashUtil::FnvHash64to32(v, type.GetByteSize(), seed);
+ default: DCHECK(false); return 0;
+ }
+}
+
+void RawValue::PrintValue(
+ const void* value, const ColumnType& type, int scale, std::stringstream* stream) {
+ if (value == NULL) {
+ *stream << "NULL";
+ return;
+ }
+
+ int old_precision = stream->precision();
+ std::ios_base::fmtflags old_flags = stream->flags();
+ if (scale > -1) {
+ stream->precision(scale);
+ // Setting 'fixed' causes precision to set the number of digits printed after the
+ // decimal (by default it sets the maximum number of digits total).
+ *stream << std::fixed;
+ }
+
+ const StringValue* string_val = NULL;
+ switch (type.type) {
+ case TYPE_BOOLEAN: {
+ bool val = *reinterpret_cast<const bool*>(value);
+ *stream << (val ? "true" : "false");
+ return;
+ }
+ case TYPE_TINYINT:
+ // Extra casting for chars since they should not be interpreted as ASCII.
+ *stream << static_cast<int>(*reinterpret_cast<const int8_t*>(value));
+ break;
+ case TYPE_SMALLINT: *stream << *reinterpret_cast<const int16_t*>(value); break;
+ case TYPE_INT: *stream << *reinterpret_cast<const int32_t*>(value); break;
+ case TYPE_BIGINT: *stream << *reinterpret_cast<const int64_t*>(value); break;
+ case TYPE_FLOAT: {
+ float val = *reinterpret_cast<const float*>(value);
+ if (LIKELY(std::isfinite(val))) {
+ *stream << val;
+ } else if (std::isinf(val)) {
+ // 'Infinity' is Java's text representation of inf. By staying close to Java, we
+ // allow Hive to read text tables containing non-finite values produced by
+ // Impala. (The same logic applies to 'NaN', below).
+ *stream << (val < 0 ? "-Infinity" : "Infinity");
+ } else if (std::isnan(val)) {
+ *stream << "NaN";
+ }
+ } break;
+ case TYPE_DOUBLE: {
+ double val = *reinterpret_cast<const double*>(value);
+ if (LIKELY(std::isfinite(val))) {
+ *stream << val;
+ } else if (std::isinf(val)) {
+ // See TYPE_FLOAT for rationale.
+ *stream << (val < 0 ? "-Infinity" : "Infinity");
+ } else if (std::isnan(val)) {
+ *stream << "NaN";
+ }
+ } break;
+ case TYPE_VARCHAR:
+ case TYPE_STRING:
+ string_val = reinterpret_cast<const StringValue*>(value);
+ if (type.type == TYPE_VARCHAR) DCHECK(string_val->len <= type.len);
+ stream->write(string_val->ptr, string_val->len);
+ break;
+ case TYPE_TIMESTAMP:
+ *stream << *reinterpret_cast<const TimestampValue*>(value);
+ break;
+ case TYPE_CHAR:
+ stream->write(StringValue::CharSlotToPtr(value, type), type.len);
+ break;
+ case TYPE_DECIMAL:
+ switch (type.GetByteSize()) {
+ case 4:
+ *stream << reinterpret_cast<const Decimal4Value*>(value)->ToString(type);
+ break;
+ case 8:
+ *stream << reinterpret_cast<const Decimal8Value*>(value)->ToString(type);
+ break;
+ case 16:
+ *stream << reinterpret_cast<const Decimal16Value*>(value)->ToString(type);
+ break;
+ default: DCHECK(false) << type;
+ }
+ break;
+ default: DCHECK(false);
+ }
+ stream->precision(old_precision);
+ // Undo setting stream to fixed
+ stream->flags(old_flags);
+}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h
index bc76b2c..5ec8ed1 100644
--- a/be/src/runtime/raw-value.h
+++ b/be/src/runtime/raw-value.h
@@ -54,8 +54,8 @@ class RawValue {
/// Returns hash value for 'v' interpreted as 'type'. The resulting hash value
/// is combined with the seed value.
- static inline uint32_t GetHashValue(const void* v, const ColumnType& type,
- uint32_t seed = 0);
+ static uint32_t GetHashValue(
+ const void* v, const ColumnType& type, uint32_t seed = 0) noexcept;
/// Templatized version of GetHashValue, use if type is known ahead. GetHashValue
/// handles nulls.
@@ -74,8 +74,7 @@ class RawValue {
/// GetHashValue() does not have this property and cannot be safely used as the first
/// step in data repartitioning. However, GetHashValue() can be significantly faster.
/// TODO: fix GetHashValue
- static inline uint32_t GetHashValueFnv(const void* v, const ColumnType& type,
- uint32_t seed);
+ static uint32_t GetHashValueFnv(const void* v, const ColumnType& type, uint32_t seed);
/// Compares both values.
/// Return value is < 0 if v1 < v2, 0 if v1 == v2, > 0 if v1 > v2.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/raw-value.inline.h b/be/src/runtime/raw-value.inline.h
index a1f1d75..63c9a07 100644
--- a/be/src/runtime/raw-value.inline.h
+++ b/be/src/runtime/raw-value.inline.h
@@ -211,194 +211,6 @@ inline uint32_t RawValue::GetHashValue(const T* v, const ColumnType& type,
if (UNLIKELY(v == NULL)) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
return RawValue::GetHashValueNonNull<T>(v, type, seed);
}
-
-inline uint32_t RawValue::GetHashValue(const void* v, const ColumnType& type,
- uint32_t seed) {
- //The choice of hash function needs to be consistent across all hosts of the cluster.
-
- // Use HashCombine with arbitrary constant to ensure we don't return seed.
- if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
-
- switch (type.type) {
- case TYPE_CHAR:
- case TYPE_STRING:
- case TYPE_VARCHAR:
- return RawValue::GetHashValueNonNull<impala::StringValue>(
- reinterpret_cast<const StringValue*>(v), type, seed);
- case TYPE_BOOLEAN:
- return RawValue::GetHashValueNonNull<bool>(
- reinterpret_cast<const bool*>(v), type, seed);
- case TYPE_TINYINT:
- return RawValue::GetHashValueNonNull<int8_t>(
- reinterpret_cast<const int8_t*>(v), type, seed);
- case TYPE_SMALLINT:
- return RawValue::GetHashValueNonNull<int16_t>(
- reinterpret_cast<const int16_t*>(v), type, seed);
- case TYPE_INT:
- return RawValue::GetHashValueNonNull<int32_t>(
- reinterpret_cast<const int32_t*>(v), type, seed);
- case TYPE_BIGINT:
- return RawValue::GetHashValueNonNull<int64_t>(
- reinterpret_cast<const int64_t*>(v), type, seed);
- case TYPE_FLOAT:
- return RawValue::GetHashValueNonNull<float>(
- reinterpret_cast<const float*>(v), type, seed);
- case TYPE_DOUBLE:
- return RawValue::GetHashValueNonNull<double>(
- reinterpret_cast<const double*>(v), type, seed);
- case TYPE_TIMESTAMP:
- return RawValue::GetHashValueNonNull<TimestampValue>(
- reinterpret_cast<const TimestampValue*>(v), type, seed);
- case TYPE_DECIMAL:
- switch(type.GetByteSize()) {
- case 4: return
- RawValue::GetHashValueNonNull<Decimal4Value>(
- reinterpret_cast<const impala::Decimal4Value*>(v), type, seed);
- case 8:
- return RawValue::GetHashValueNonNull<Decimal8Value>(
- reinterpret_cast<const Decimal8Value*>(v), type, seed);
- case 16:
- return RawValue::GetHashValueNonNull<Decimal16Value>(
- reinterpret_cast<const Decimal16Value*>(v), type, seed);
- DCHECK(false);
- }
- default:
- DCHECK(false);
- return 0;
- }
-}
-
-inline uint32_t RawValue::GetHashValueFnv(const void* v, const ColumnType& type,
- uint32_t seed) {
- // Use HashCombine with arbitrary constant to ensure we don't return seed.
- if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
-
- switch (type.type ) {
- case TYPE_STRING:
- case TYPE_VARCHAR: {
- const StringValue* string_value = reinterpret_cast<const StringValue*>(v);
- if (string_value->len == 0) {
- return HashUtil::HashCombine32(HASH_VAL_EMPTY, seed);
- }
- return HashUtil::FnvHash64to32(string_value->ptr, string_value->len, seed);
- }
- case TYPE_BOOLEAN:
- return HashUtil::HashCombine32(*reinterpret_cast<const bool*>(v), seed);
- case TYPE_TINYINT: return HashUtil::FnvHash64to32(v, 1, seed);
- case TYPE_SMALLINT: return HashUtil::FnvHash64to32(v, 2, seed);
- case TYPE_INT: return HashUtil::FnvHash64to32(v, 4, seed);
- case TYPE_BIGINT: return HashUtil::FnvHash64to32(v, 8, seed);
- case TYPE_FLOAT: return HashUtil::FnvHash64to32(v, 4, seed);
- case TYPE_DOUBLE: return HashUtil::FnvHash64to32(v, 8, seed);
- case TYPE_TIMESTAMP: return HashUtil::FnvHash64to32(v, 12, seed);
- case TYPE_CHAR: return HashUtil::FnvHash64to32(StringValue::CharSlotToPtr(v, type),
- type.len, seed);
- case TYPE_DECIMAL: return HashUtil::FnvHash64to32(v, type.GetByteSize(), seed);
- default:
- DCHECK(false);
- return 0;
- }
-}
-
-inline void RawValue::PrintValue(const void* value, const ColumnType& type, int scale,
- std::stringstream* stream) {
- if (value == NULL) {
- *stream << "NULL";
- return;
- }
-
- int old_precision = stream->precision();
- std::ios_base::fmtflags old_flags = stream->flags();
- if (scale > -1) {
- stream->precision(scale);
- // Setting 'fixed' causes precision to set the number of digits printed after the
- // decimal (by default it sets the maximum number of digits total).
- *stream << std::fixed;
- }
-
- const StringValue* string_val = NULL;
- switch (type.type) {
- case TYPE_BOOLEAN: {
- bool val = *reinterpret_cast<const bool*>(value);
- *stream << (val ? "true" : "false");
- return;
- }
- case TYPE_TINYINT:
- // Extra casting for chars since they should not be interpreted as ASCII.
- *stream << static_cast<int>(*reinterpret_cast<const int8_t*>(value));
- break;
- case TYPE_SMALLINT:
- *stream << *reinterpret_cast<const int16_t*>(value);
- break;
- case TYPE_INT:
- *stream << *reinterpret_cast<const int32_t*>(value);
- break;
- case TYPE_BIGINT:
- *stream << *reinterpret_cast<const int64_t*>(value);
- break;
- case TYPE_FLOAT:
- {
- float val = *reinterpret_cast<const float*>(value);
- if (LIKELY(std::isfinite(val))) {
- *stream << val;
- } else if (std::isinf(val)) {
- // 'Infinity' is Java's text representation of inf. By staying close to Java, we
- // allow Hive to read text tables containing non-finite values produced by
- // Impala. (The same logic applies to 'NaN', below).
- *stream << (val < 0 ? "-Infinity" : "Infinity");
- } else if (std::isnan(val)) {
- *stream << "NaN";
- }
- }
- break;
- case TYPE_DOUBLE:
- {
- double val = *reinterpret_cast<const double*>(value);
- if (LIKELY(std::isfinite(val))) {
- *stream << val;
- } else if (std::isinf(val)) {
- // See TYPE_FLOAT for rationale.
- *stream << (val < 0 ? "-Infinity" : "Infinity");
- } else if (std::isnan(val)) {
- *stream << "NaN";
- }
- }
- break;
- case TYPE_VARCHAR:
- case TYPE_STRING:
- string_val = reinterpret_cast<const StringValue*>(value);
- if (type.type == TYPE_VARCHAR) DCHECK(string_val->len <= type.len);
- stream->write(string_val->ptr, string_val->len);
- break;
- case TYPE_TIMESTAMP:
- *stream << *reinterpret_cast<const TimestampValue*>(value);
- break;
- case TYPE_CHAR:
- stream->write(StringValue::CharSlotToPtr(value, type), type.len);
- break;
- case TYPE_DECIMAL:
- switch (type.GetByteSize()) {
- case 4:
- *stream << reinterpret_cast<const Decimal4Value*>(value)->ToString(type);
- break;
- case 8:
- *stream << reinterpret_cast<const Decimal8Value*>(value)->ToString(type);
- break;
- case 16:
- *stream << reinterpret_cast<const Decimal16Value*>(value)->ToString(type);
- break;
- default:
- DCHECK(false) << type;
- }
- break;
- default:
- DCHECK(false);
- }
- stream->precision(old_precision);
- // Undo setting stream to fixed
- stream->flags(old_flags);
-}
-
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/util/bloom-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index 7d8c8f7..6fd53f5 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -83,6 +83,78 @@ void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
filter->ToThrift(thrift);
}
+// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we
+// compile with -fno-strict-aliasing.
+
+void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) {
+ // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
+ // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
+ // part of this method.
+ uint32_t new_bucket[8] __attribute__((aligned(16)));
+ for (int i = 0; i < 8; ++i) {
+ // Rehash 'hash' and use the top LOG_BUCKET_WORD_BITS bits, following Dietzfelbinger.
+ new_bucket[i] =
+ (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS);
+ new_bucket[i] = 1U << new_bucket[i];
+ }
+ for (int i = 0; i < 2; ++i) {
+ __m128i new_bucket_sse =
+ _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i));
+ __m128i* existing_bucket = reinterpret_cast<__m128i*>(&directory_[bucket_idx][4 * i]);
+ *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse);
+ }
+}
+
+__m256i BloomFilter::MakeMask(const uint32_t hash) {
+ const __m256i ones = _mm256_set1_epi32(1);
+ const __m256i rehash = _mm256_setr_epi32(IMPALA_BLOOM_HASH_CONSTANTS);
+ // Load hash into a YMM register, repeated eight times
+ __m256i hash_data = _mm256_set1_epi32(hash);
+ // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
+ // odd constants, then keep the 5 most significant bits from each product.
+ hash_data = _mm256_mullo_epi32(rehash, hash_data);
+ hash_data = _mm256_srli_epi32(hash_data, 27);
+ // Use these 5 bits to shift a single bit to a location in each 32-bit lane
+ return _mm256_sllv_epi32(ones, hash_data);
+}
+
+void BloomFilter::BucketInsertAVX2(
+ const uint32_t bucket_idx, const uint32_t hash) {
+ const __m256i mask = MakeMask(hash);
+ __m256i* const bucket = &reinterpret_cast<__m256i*>(directory_)[bucket_idx];
+ _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
+ // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
+ // dont have to save them off before using XMM registers.
+ _mm256_zeroupper();
+}
+
+bool BloomFilter::BucketFindAVX2(
+ const uint32_t bucket_idx, const uint32_t hash) const {
+ const __m256i mask = MakeMask(hash);
+ const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
+ // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
+ // takes the negation of its first argument and ands that with its second argument. In
+ // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
+ // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
+ const bool result = _mm256_testc_si256(bucket, mask);
+ _mm256_zeroupper();
+ return result;
+}
+
+bool BloomFilter::BucketFind(
+ const uint32_t bucket_idx, const uint32_t hash) const {
+ for (int i = 0; i < BUCKET_WORDS; ++i) {
+ BucketWord hval =
+ (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS);
+ hval = 1U << hval;
+ if (!(directory_[bucket_idx][i] & hval)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+
void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
DCHECK(out != NULL);
DCHECK_EQ(in.log_heap_space, out->log_heap_space);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 7a94995..4342814 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -25,9 +25,9 @@
#include <immintrin.h>
-#include "gutil/macros.h"
-
+#include "common/compiler-util.h"
#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/macros.h"
#include "runtime/buffered-block-mgr.h"
namespace impala {
@@ -173,7 +173,7 @@ class BloomFilter {
// the advantage of requiring fewer random bits: log2(32) * 8 = 5 * 8 = 40 random bits for
// a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard Bloom filter.
-inline void BloomFilter::Insert(const uint32_t hash) {
+inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) {
const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
BucketInsertAVX2(bucket_idx, hash);
@@ -182,7 +182,7 @@ inline void BloomFilter::Insert(const uint32_t hash) {
}
}
-inline bool BloomFilter::Find(const uint32_t hash) const {
+inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const {
const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
return BucketFindAVX2(bucket_idx, hash);
@@ -191,78 +191,6 @@ inline bool BloomFilter::Find(const uint32_t hash) const {
}
}
-// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we
-// compile with -fno-strict-aliasing.
-
-inline void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) {
- // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
- // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
- // part of this method.
- uint32_t new_bucket[8] __attribute__((aligned(16)));
- for (int i = 0; i < 8; ++i) {
- // Rehash 'hash' and use the top LOG_BUCKET_WORD_BITS bits, following Dietzfelbinger.
- new_bucket[i] =
- (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS);
- new_bucket[i] = 1U << new_bucket[i];
- }
- for (int i = 0; i < 2; ++i) {
- __m128i new_bucket_sse =
- _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i));
- __m128i* existing_bucket = reinterpret_cast<__m128i*>(&directory_[bucket_idx][4 * i]);
- *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse);
- }
-}
-
-inline __m256i BloomFilter::MakeMask(const uint32_t hash) {
- const __m256i ones = _mm256_set1_epi32(1);
- const __m256i rehash = _mm256_setr_epi32(IMPALA_BLOOM_HASH_CONSTANTS);
- // Load hash into a YMM register, repeated eight times
- __m256i hash_data = _mm256_set1_epi32(hash);
- // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
- // odd constants, then keep the 5 most significant bits from each product.
- hash_data = _mm256_mullo_epi32(rehash, hash_data);
- hash_data = _mm256_srli_epi32(hash_data, 27);
- // Use these 5 bits to shift a single bit to a location in each 32-bit lane
- return _mm256_sllv_epi32(ones, hash_data);
-}
-
-inline void BloomFilter::BucketInsertAVX2(
- const uint32_t bucket_idx, const uint32_t hash) {
- const __m256i mask = MakeMask(hash);
- __m256i* const bucket = &reinterpret_cast<__m256i*>(directory_)[bucket_idx];
- _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
- // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
- // dont have to save them off before using XMM registers.
- _mm256_zeroupper();
-}
-
-inline bool BloomFilter::BucketFindAVX2(
- const uint32_t bucket_idx, const uint32_t hash) const {
- const __m256i mask = MakeMask(hash);
- const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
- // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
- // takes the negation of its first argument and ands that with its second argument. In
- // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
- // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
- const bool result = _mm256_testc_si256(bucket, mask);
- _mm256_zeroupper();
- return result;
-}
-
-inline bool BloomFilter::BucketFind(
- const uint32_t bucket_idx, const uint32_t hash) const {
- for (int i = 0; i < BUCKET_WORDS; ++i) {
- BucketWord hval =
- (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS);
- hval = 1U << hval;
- if (!(directory_[bucket_idx][i] & hval)) {
- return false;
- }
- }
- return true;
-}
-
} // namespace impala
-#undef IMPALA_BLOOM_HASH_CONSTANTS
#endif // IMPALA_UTIL_BLOOM_H