You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2018/11/19 10:45:30 UTC
[23/33] impala git commit: IMPALA-7477: Batch-oriented query set
construction
IMPALA-7477: Batch-oriented query set construction
Rework the row-by-row construction of query result sets in PlanRootSink
so that it materialises an output column at a time. Make some minor
optimisations like preallocating output vectors and initialising
strings more efficiently.
My intent is both to make this faster and to make the QueryResultSet
interface better before IMPALA-4268 does a bunch of surgery on this
part of the code.
Testing:
Ran core tests.
Perf:
Downloaded tpch_parquet.orders via JDBC driver.
Before: 3.01s, After: 2.57s.
Downloaded l_orderkey from tpch_parquet.lineitem.
Before: 1.21s, After: 1.08s.
Change-Id: I764fa302842438902cd5db2551ec6e3cb77b6874
Reviewed-on: http://gerrit.cloudera.org:8080/11879
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0f63b2c9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0f63b2c9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0f63b2c9
Branch: refs/heads/branch-3.1.0
Commit: 0f63b2c9f9d62b0d22191f454b672a6047206252
Parents: d8b1e43
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sun Aug 19 00:48:47 2018 -0700
Committer: Zoltan Borok-Nagy <bo...@cloudera.com>
Committed: Tue Nov 13 12:51:39 2018 +0100
----------------------------------------------------------------------
be/src/exec/plan-root-sink.cc | 26 +--
be/src/exec/plan-root-sink.h | 4 -
be/src/service/hs2-util.cc | 318 ++++++++++++++++++++++++--------
be/src/service/hs2-util.h | 15 +-
be/src/service/query-result-set.cc | 115 +++++++-----
be/src/service/query-result-set.h | 14 +-
6 files changed, 331 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index a64dbb9..1f5b2e5 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -79,22 +79,11 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
// Otherwise the consumer is ready. Fill out the rows.
DCHECK(results_ != nullptr);
- // List of expr values to hold evaluated rows from the query
- vector<void*> result_row;
- result_row.resize(output_exprs_.size());
-
- // List of scales for floating point values in result_row
- vector<int> scales;
- scales.resize(result_row.size());
-
int num_to_fetch = batch->num_rows() - current_batch_row;
if (num_rows_requested_ > 0) num_to_fetch = min(num_to_fetch, num_rows_requested_);
- for (int i = 0; i < num_to_fetch; ++i) {
- TupleRow* row = batch->GetRow(current_batch_row);
- GetRowValue(row, &result_row, &scales);
- RETURN_IF_ERROR(results_->AddOneRow(result_row, scales));
- ++current_batch_row;
- }
+ RETURN_IF_ERROR(
+ results_->AddRows(output_expr_evals_, batch, current_batch_row, num_to_fetch));
+ current_batch_row += num_to_fetch;
// Prevent expr result allocations from accumulating.
expr_results_pool_->Clear();
// Signal the consumer.
@@ -146,13 +135,4 @@ Status PlanRootSink::GetNext(
*eos = sender_state_ == SenderState::EOS;
return state->GetQueryStatus();
}
-
-void PlanRootSink::GetRowValue(
- TupleRow* row, vector<void*>* result, vector<int>* scales) {
- DCHECK_GE(result->size(), output_expr_evals_.size());
- for (int i = 0; i < output_expr_evals_.size(); ++i) {
- (*result)[i] = output_expr_evals_[i]->GetValue(row);
- (*scales)[i] = output_expr_evals_[i]->output_scale();
- }
-}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 1d64b21..300c993 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -118,10 +118,6 @@ class PlanRootSink : public DataSink {
/// Set by GetNext() to indicate to Send() how many rows it should write to results_.
int num_rows_requested_ = 0;
-
- /// Writes a single row into 'result' and 'scales' by evaluating
- /// output_expr_evals_ over 'row'.
- void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/hs2-util.cc
----------------------------------------------------------------------
diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc
index b856556..66a76bd 100644
--- a/be/src/service/hs2-util.cc
+++ b/be/src/service/hs2-util.cc
@@ -18,9 +18,12 @@
#include "service/hs2-util.h"
#include "common/logging.h"
+#include "exprs/scalar-expr-evaluator.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
#include "runtime/types.h"
+#include "util/bit-util.h"
#include <gutil/strings/substitute.h>
@@ -49,7 +52,9 @@ inline bool GetNullBit(const string& nulls, uint32_t row_idx) {
void impala::StitchNulls(uint32_t num_rows_before, uint32_t num_rows_added,
uint32_t start_idx, const string& from, string* to) {
- to->reserve((num_rows_before + num_rows_added + 7) / 8);
+ // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated
+ // small increases in size.
+ to->reserve(BitUtil::RoundUpToPowerOfTwo((num_rows_before + num_rows_added + 7) / 8));
// TODO: This is very inefficient, since we could conceivably go one byte at a time
// (although the operands should stay live in registers in the loop). However doing this
@@ -118,106 +123,257 @@ void impala::TColumnValueToHS2TColumn(const TColumnValue& col_val,
SetNullBit(row_idx, is_null, nulls);
}
+// Specialised per-type implementations of ExprValuesToHS2TColumn.
+
+// Helper to reserve space in hs2Vals->values and hs2Vals->nulls for the values that the
+// different implementations of ExprValuesToHS2TColumn will write.
+template <typename T>
+void ReserveSpace(int start_idx, int num_rows, uint32_t output_row_idx, T* hs2Vals) {
+ int64_t num_output_rows = output_row_idx + num_rows - start_idx;
+ int64_t num_null_bytes = BitUtil::RoundUpNumBytes(num_output_rows);
+ // Round up reserve() arguments to power-of-two to avoid accidentally quadratic
+ // behaviour from repeated small increases in size.
+ hs2Vals->values.reserve(BitUtil::RoundUpToPowerOfTwo(num_output_rows));
+ hs2Vals->nulls.reserve(BitUtil::RoundUpToPowerOfTwo(num_null_bytes));
+}
+
+// Implementation for BOOL.
+static void BoolExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+ int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->boolVal);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ BooleanVal val = expr_eval->GetBooleanVal(it.Get());
+ column->boolVal.values.push_back(val.val);
+ SetNullBit(output_row_idx, val.is_null, &column->boolVal.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for TINYINT.
+static void TinyIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+ int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->byteVal);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ TinyIntVal val = expr_eval->GetTinyIntVal(it.Get());
+ column->byteVal.values.push_back(val.val);
+ SetNullBit(output_row_idx, val.is_null, &column->byteVal.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for SMALLINT.
+static void SmallIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+ RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->i16Val);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ SmallIntVal val = expr_eval->GetSmallIntVal(it.Get());
+ column->i16Val.values.push_back(val.val);
+ SetNullBit(output_row_idx, val.is_null, &column->i16Val.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for INT.
+static void IntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+ int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->i32Val);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ IntVal val = expr_eval->GetIntVal(it.Get());
+ column->i32Val.values.push_back(val.val);
+ SetNullBit(output_row_idx, val.is_null, &column->i32Val.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for BIGINT.
+static void BigIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+ int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->i64Val);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ BigIntVal val = expr_eval->GetBigIntVal(it.Get());
+ column->i64Val.values.push_back(val.val);
+ SetNullBit(output_row_idx, val.is_null, &column->i64Val.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for FLOAT.
+static void FloatExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+ int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ FloatVal val = expr_eval->GetFloatVal(it.Get());
+ column->doubleVal.values.push_back(val.val);
+ SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for DOUBLE.
+static void DoubleExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+ int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ DoubleVal val = expr_eval->GetDoubleVal(it.Get());
+ column->doubleVal.values.push_back(val.val);
+ SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for TIMESTAMP.
+static void TimestampExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+ RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ TimestampVal val = expr_eval->GetTimestampVal(it.Get());
+ column->stringVal.values.emplace_back();
+ if (!val.is_null) {
+ TimestampValue value = TimestampValue::FromTimestampVal(val);
+ RawValue::PrintValue(
+ &value, TYPE_TIMESTAMP, -1, &(column->stringVal.values.back()));
+ }
+ SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for STRING and VARCHAR.
+static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+ int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ StringVal val = expr_eval->GetStringVal(it.Get());
+ if (val.is_null) {
+ column->stringVal.values.emplace_back();
+ } else {
+ column->stringVal.values.emplace_back(reinterpret_cast<char*>(val.ptr), val.len);
+ }
+ SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+ ++output_row_idx;
+ }
+}
+
+// Implementation for CHAR.
+static void CharExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+ const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
+ uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+ ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ StringVal val = expr_eval->GetStringVal(it.Get());
+ if (val.is_null) {
+ column->stringVal.values.emplace_back();
+ } else {
+ column->stringVal.values.emplace_back(
+ reinterpret_cast<const char*>(val.ptr), char_type.len);
+ }
+ SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+ ++output_row_idx;
+ }
+}
+
+static void DecimalExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+ const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
+ uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) {
+ ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ DecimalVal val = expr_eval->GetDecimalVal(it.Get());
+ const ColumnType& decimalType = ColumnType::FromThrift(type);
+ if (val.is_null) {
+ column->stringVal.values.emplace_back();
+ } else {
+ switch (decimalType.GetByteSize()) {
+ case 4:
+ column->stringVal.values.emplace_back(
+ Decimal4Value(val.val4).ToString(decimalType));
+ break;
+ case 8:
+ column->stringVal.values.emplace_back(
+ Decimal8Value(val.val8).ToString(decimalType));
+ break;
+ case 16:
+ column->stringVal.values.emplace_back(
+ Decimal16Value(val.val16).ToString(decimalType));
+ break;
+ default:
+ DCHECK(false) << "bad type: " << decimalType;
+ }
+ }
+ SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+ ++output_row_idx;
+ }
+}
+
// For V6 and above
-void impala::ExprValueToHS2TColumn(const void* value, const TColumnType& type,
- uint32_t row_idx, thrift::TColumn* column) {
- string* nulls;
+void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+ const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
+ uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) {
+ // Dispatch to a templated function for the loop over rows. This avoids branching on
+ // the type for every row.
+ // TODO: instead of relying on stamped out implementations, we could codegen this loop
+ // to inline the expression evaluation into the loop body.
switch (type.types[0].scalar_type.type) {
case TPrimitiveType::NULL_TYPE:
case TPrimitiveType::BOOLEAN:
- column->boolVal.values.push_back(
- value == NULL ? false : *reinterpret_cast<const bool*>(value));
- nulls = &column->boolVal.nulls;
- break;
+ BoolExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::TINYINT:
- column->byteVal.values.push_back(
- value == NULL ? 0 : *reinterpret_cast<const int8_t*>(value));
- nulls = &column->byteVal.nulls;
- break;
+ TinyIntExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::SMALLINT:
- column->i16Val.values.push_back(
- value == NULL ? 0 : *reinterpret_cast<const int16_t*>(value));
- nulls = &column->i16Val.nulls;
- break;
+ SmallIntExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::INT:
- column->i32Val.values.push_back(
- value == NULL ? 0 : *reinterpret_cast<const int32_t*>(value));
- nulls = &column->i32Val.nulls;
- break;
+ IntExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::BIGINT:
- column->i64Val.values.push_back(
- value == NULL ? 0 : *reinterpret_cast<const int64_t*>(value));
- nulls = &column->i64Val.nulls;
- break;
+ BigIntExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::FLOAT:
- column->doubleVal.values.push_back(
- value == NULL ? 0.f : *reinterpret_cast<const float*>(value));
- nulls = &column->doubleVal.nulls;
- break;
+ FloatExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::DOUBLE:
- column->doubleVal.values.push_back(
- value == NULL ? 0.0 : *reinterpret_cast<const double*>(value));
- nulls = &column->doubleVal.nulls;
- break;
+ DoubleExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::TIMESTAMP:
- column->stringVal.values.push_back("");
- if (value != NULL) {
- RawValue::PrintValue(value, TYPE_TIMESTAMP, -1,
- &(column->stringVal.values.back()));
- }
- nulls = &column->stringVal.nulls;
- break;
+ TimestampExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::STRING:
case TPrimitiveType::VARCHAR:
- column->stringVal.values.push_back("");
- if (value != NULL) {
- const StringValue* str_val = reinterpret_cast<const StringValue*>(value);
- column->stringVal.values.back().assign(
- static_cast<char*>(str_val->ptr), str_val->len);
- }
- nulls = &column->stringVal.nulls;
- break;
+ StringExprValuesToHS2TColumn(
+ expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::CHAR:
- column->stringVal.values.push_back("");
- if (value != NULL) {
- ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len);
- column->stringVal.values.back().assign(
- reinterpret_cast<const char*>(value), char_type.len);
- }
- nulls = &column->stringVal.nulls;
- break;
+ CharExprValuesToHS2TColumn(
+ expr_eval, type, batch, start_idx, num_rows, output_row_idx, column);
+ return;
case TPrimitiveType::DECIMAL: {
- // HiveServer2 requires decimal to be presented as string.
- column->stringVal.values.push_back("");
- const ColumnType& decimalType = ColumnType::FromThrift(type);
- if (value != NULL) {
- switch (decimalType.GetByteSize()) {
- case 4:
- column->stringVal.values.back() =
- reinterpret_cast<const Decimal4Value*>(value)->ToString(decimalType);
- break;
- case 8:
- column->stringVal.values.back() =
- reinterpret_cast<const Decimal8Value*>(value)->ToString(decimalType);
- break;
- case 16:
- column->stringVal.values.back() =
- reinterpret_cast<const Decimal16Value*>(value)->ToString(decimalType);
- break;
- default:
- DCHECK(false) << "bad type: " << decimalType;
- }
- }
- nulls = &column->stringVal.nulls;
- break;
+ DecimalExprValuesToHS2TColumn(
+ expr_eval, type, batch, start_idx, num_rows, output_row_idx, column);
+ return;
}
default:
DCHECK(false) << "Unhandled type: "
<< TypeToString(ThriftToType(type.types[0].scalar_type.type));
- return;
}
-
- SetNullBit(row_idx, (value == NULL), nulls);
}
// For V1 -> V5
http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/hs2-util.h
----------------------------------------------------------------------
diff --git a/be/src/service/hs2-util.h b/be/src/service/hs2-util.h
index 44ceba6..4f0f973 100644
--- a/be/src/service/hs2-util.h
+++ b/be/src/service/hs2-util.h
@@ -20,16 +20,23 @@
namespace impala {
-/// Utility methods for converting from Impala (either an Expr result or a TColumnValue) to
-/// Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->).
+class RowBatch;
+class ScalarExprEvaluator;
+
+/// Utility methods for converting from Impala (either an Expr result or a TColumnValue)
+/// to Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->).
/// For V6->
void TColumnValueToHS2TColumn(const TColumnValue& col_val, const TColumnType& type,
uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column);
+/// Evaluate 'expr_eval' over the row [start_idx, start_idx + num_rows) from 'batch' into
+/// 'column' with 'type' starting at output_row_idx. The caller is responsible for
+/// calling RuntimeState::GetQueryStatus() to check for expression evaluation errors.
/// For V6->
-void ExprValueToHS2TColumn(const void* value, const TColumnType& type,
- uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column);
+void ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type,
+ RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx,
+ apache::hive::service::cli::thrift::TColumn* column);
/// For V1->V5
void TColumnValueToHS2TColumnValue(const TColumnValue& col_val, const TColumnType& type,
http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/query-result-set.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc
index aacd849..f254176 100644
--- a/be/src/service/query-result-set.cc
+++ b/be/src/service/query-result-set.cc
@@ -20,10 +20,13 @@
#include <sstream>
#include <boost/scoped_ptr.hpp>
+#include "exprs/scalar-expr-evaluator.h"
#include "rpc/thrift-util.h"
#include "runtime/raw-value.h"
+#include "runtime/row-batch.h"
#include "runtime/types.h"
#include "service/hs2-util.h"
+#include "util/bit-util.h"
#include "common/names.h"
@@ -51,18 +54,19 @@ class AsciiQueryResultSet : public QueryResultSet {
virtual ~AsciiQueryResultSet() {}
- /// Convert one row's expr values stored in 'col_values' to ASCII using "\t" as column
+ /// Evaluate 'expr_evals' over rows in 'batch', convert to ASCII using "\t" as column
/// delimiter and store it in this result set.
/// TODO: Handle complex types.
- virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+ virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
+ int start_idx, int num_rows) override;
/// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
/// result set.
- virtual Status AddOneRow(const TResultRow& row);
+ virtual Status AddOneRow(const TResultRow& row) override;
- virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
- virtual int64_t ByteSize(int start_idx, int num_rows);
- virtual size_t size() { return result_set_->size(); }
+ virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
+ virtual int64_t ByteSize(int start_idx, int num_rows) override;
+ virtual size_t size() override { return result_set_->size(); }
private:
/// Metadata of the result set
@@ -80,18 +84,20 @@ class HS2ColumnarResultSet : public QueryResultSet {
virtual ~HS2ColumnarResultSet() {}
- /// Add a row of expr values
- virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+ /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 columnar
+ /// representation.
+ virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
+ int start_idx, int num_rows) override;
/// Add a row from a TResultRow
- virtual Status AddOneRow(const TResultRow& row);
+ virtual Status AddOneRow(const TResultRow& row) override;
/// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
/// from 'other' into this result set
- virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+ virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
- virtual int64_t ByteSize(int start_idx, int num_rows);
- virtual size_t size() { return num_rows_; }
+ virtual int64_t ByteSize(int start_idx, int num_rows) override;
+ virtual size_t size() override { return num_rows_; }
private:
/// Metadata of the result set
@@ -119,15 +125,17 @@ class HS2RowOrientedResultSet : public QueryResultSet {
virtual ~HS2RowOrientedResultSet() {}
- /// Convert expr values to HS2 TRow and store it in a TRowSet.
- virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+ /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 row-oriented
+ /// representation of TRows stored in a TRowSet.
+ virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
+ int start_idx, int num_rows) override;
/// Convert TResultRow to HS2 TRow and store it in a TRowSet
- virtual Status AddOneRow(const TResultRow& row);
+ virtual Status AddOneRow(const TResultRow& row) override;
- virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
- virtual int64_t ByteSize(int start_idx, int num_rows);
- virtual size_t size() { return result_set_->rows.size(); }
+ virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
+ virtual int64_t ByteSize(int start_idx, int num_rows) override;
+ virtual size_t size() override { return result_set_->rows.size(); }
private:
/// Metadata of the result set
@@ -158,20 +166,34 @@ QueryResultSet* QueryResultSet::CreateHS2ResultSet(
//////////////////////////////////////////////////////////////////////////////////////////
-Status AsciiQueryResultSet::AddOneRow(
- const vector<void*>& col_values, const vector<int>& scales) {
- int num_col = col_values.size();
+Status AsciiQueryResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
+ RowBatch* batch, int start_idx, int num_rows) {
+ DCHECK_GE(batch->num_rows(), start_idx + num_rows);
+ int num_col = expr_evals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
+ vector<int> scales;
+ scales.reserve(num_col);
+ for (ScalarExprEvaluator* expr_eval : expr_evals) {
+ scales.push_back(expr_eval->output_scale());
+ }
+ // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated
+ // small increases in size.
+ result_set_->reserve(
+ BitUtil::RoundUpToPowerOfTwo(result_set_->size() + num_rows - start_idx));
stringstream out_stream;
out_stream.precision(ASCII_PRECISION);
- for (int i = 0; i < num_col; ++i) {
- // ODBC-187 - ODBC can only take "\t" as the delimiter
- out_stream << (i > 0 ? "\t" : "");
- DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
- RawValue::PrintValue(col_values[i],
- ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], &out_stream);
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ for (int i = 0; i < num_col; ++i) {
+ // ODBC-187 - ODBC can only take "\t" as the delimiter
+ out_stream << (i > 0 ? "\t" : "");
+ DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
+ RawValue::PrintValue(expr_evals[i]->GetValue(it.Get()),
+ ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i],
+ &out_stream);
+ }
+ result_set_->push_back(out_stream.str());
+ out_stream.str("");
}
- result_set_->push_back(out_stream.str());
return Status::OK();
}
@@ -263,16 +285,18 @@ HS2ColumnarResultSet::HS2ColumnarResultSet(
InitColumns();
}
-// Add a row of expr values
-Status HS2ColumnarResultSet::AddOneRow(
- const vector<void*>& col_values, const vector<int>& scales) {
- int num_col = col_values.size();
+Status HS2ColumnarResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
+ RowBatch* batch, int start_idx, int num_rows) {
+ DCHECK_GE(batch->num_rows(), start_idx + num_rows);
+ int num_col = expr_evals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
for (int i = 0; i < num_col; ++i) {
- ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_,
+ const TColumnType& type = metadata_.columns[i].columnType;
+ ScalarExprEvaluator* expr_eval = expr_evals[i];
+ ExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, num_rows_,
&(result_set_->columns[i]));
}
- ++num_rows_;
+ num_rows_ += num_rows;
return Status::OK();
}
@@ -427,16 +451,21 @@ HS2RowOrientedResultSet::HS2RowOrientedResultSet(
}
}
-Status HS2RowOrientedResultSet::AddOneRow(
- const vector<void*>& col_values, const vector<int>& scales) {
- int num_col = col_values.size();
+Status HS2RowOrientedResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
+ RowBatch* batch, int start_idx, int num_rows) {
+ DCHECK_GE(batch->num_rows(), start_idx + num_rows);
+ int num_col = expr_evals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
- result_set_->rows.push_back(TRow());
- TRow& trow = result_set_->rows.back();
- trow.colVals.resize(num_col);
- for (int i = 0; i < num_col; ++i) {
- ExprValueToHS2TColumnValue(
- col_values[i], metadata_.columns[i].columnType, &(trow.colVals[i]));
+ result_set_->rows.reserve(
+ BitUtil::RoundUpToPowerOfTwo(result_set_->rows.size() + num_rows - start_idx));
+ FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+ result_set_->rows.push_back(TRow());
+ TRow& trow = result_set_->rows.back();
+ trow.colVals.resize(num_col);
+ for (int i = 0; i < num_col; ++i) {
+ ExprValueToHS2TColumnValue(expr_evals[i]->GetValue(it.Get()),
+ metadata_.columns[i].columnType, &(trow.colVals[i]));
+ }
}
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/query-result-set.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h
index e0c88d7..fa39d73 100644
--- a/be/src/service/query-result-set.h
+++ b/be/src/service/query-result-set.h
@@ -27,6 +27,9 @@
namespace impala {
+class RowBatch;
+class ScalarExprEvaluator;
+
/// Wraps a client-API specific result representation, and implements the logic required
/// to translate into that format from Impala's row format.
///
@@ -36,12 +39,11 @@ class QueryResultSet {
QueryResultSet() {}
virtual ~QueryResultSet() {}
- /// Add a single row to this result set. The row is a vector of pointers to values,
- /// whose memory belongs to the caller. 'scales' contains the scales for decimal values
- /// (# of digits after decimal), with -1 indicating no scale specified or the
- /// corresponding value is not a decimal.
- virtual Status AddOneRow(
- const std::vector<void*>& row, const std::vector<int>& scales) = 0;
+ /// Add 'num_rows' rows to the result set, obtained by evaluating 'expr_evals' over
+ /// the rows in 'batch' starting at start_idx. Batch must contain at least
+ /// ('start_idx' + 'num_rows') rows.
+ virtual Status AddRows(const std::vector<ScalarExprEvaluator*>& expr_evals,
+ RowBatch* batch, int start_idx, int num_rows) = 0;
/// Add the TResultRow to this result set. When a row comes from a DDL/metadata
/// operation, the row in the form of TResultRow.