You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jy...@apache.org on 2016/10/18 15:14:18 UTC
[4/4] incubator-impala git commit: IMPALA-2905: Move QueryResultSet
implementations into separate module
IMPALA-2905: Move QueryResultSet implementations into separate module
This mostly mechanical change moves the definition and implementation of
the Beeswax and HS2-specific result sets into their own module. Result
sets are now uniformly created by one of two factory methods, so the
implementation is decoupled from the client.
Change-Id: I6ab883b62d3ec7012240edf8d56889349e7c0e32
Reviewed-on: http://gerrit.cloudera.org:8080/4736
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3f5380dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3f5380dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3f5380dc
Branch: refs/heads/master
Commit: 3f5380dc73f3ab907443a2858d4fe0de6e3685e7
Parents: 080a678
Author: Henry Robinson <he...@cloudera.com>
Authored: Sat Oct 15 16:47:24 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 18 09:30:09 2016 +0000
----------------------------------------------------------------------
be/src/service/CMakeLists.txt | 1 +
be/src/service/impala-beeswax-server.cc | 98 +-----
be/src/service/impala-hs2-server.cc | 324 +-----------------
be/src/service/query-result-set.cc | 478 +++++++++++++++++++++++++++
be/src/service/query-result-set.h | 21 +-
5 files changed, 503 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index aa12ceb..35130ff 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -33,6 +33,7 @@ add_library(Service
impala-beeswax-server.cc
query-exec-state.cc
query-options.cc
+ query-result-set.cc
child-query.cc
impalad-main.cc
)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index ee7f958..b50499e 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -47,100 +47,8 @@ using namespace beeswax;
} \
} while (false)
-namespace {
-
-/// Ascii output precision for double/float
-constexpr int ASCII_PRECISION = 16;
-}
-
namespace impala {
-// Ascii result set for Beeswax.
-// Beeswax returns rows in ascii, using "\t" as column delimiter.
-class AsciiQueryResultSet : public QueryResultSet {
- public:
- // Rows are added into rowset.
- AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)
- : metadata_(metadata), result_set_(rowset), owned_result_set_(NULL) {
- }
-
- // Rows are added into a new rowset that is owned by this result set.
- AsciiQueryResultSet(const TResultSetMetadata& metadata)
- : metadata_(metadata), result_set_(new vector<string>()),
- owned_result_set_(result_set_) {
- }
-
- virtual ~AsciiQueryResultSet() { }
-
- // Convert expr values (col_values) to ASCII using "\t" as column delimiter and store
- // it in this result set.
- // TODO: Handle complex types.
- virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
- int num_col = col_values.size();
- DCHECK_EQ(num_col, metadata_.columns.size());
- stringstream out_stream;
- out_stream.precision(ASCII_PRECISION);
- for (int i = 0; i < num_col; ++i) {
- // ODBC-187 - ODBC can only take "\t" as the delimiter
- out_stream << (i > 0 ? "\t" : "");
- DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
- RawValue::PrintValue(col_values[i],
- ColumnType::FromThrift(metadata_.columns[i].columnType),
- scales[i], &out_stream);
- }
- result_set_->push_back(out_stream.str());
- return Status::OK();
- }
-
- // Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
- // result set.
- virtual Status AddOneRow(const TResultRow& row) {
- int num_col = row.colVals.size();
- DCHECK_EQ(num_col, metadata_.columns.size());
- stringstream out_stream;
- out_stream.precision(ASCII_PRECISION);
- for (int i = 0; i < num_col; ++i) {
- // ODBC-187 - ODBC can only take "\t" as the delimiter
- out_stream << (i > 0 ? "\t" : "");
- out_stream << row.colVals[i];
- }
- result_set_->push_back(out_stream.str());
- return Status::OK();
- }
-
- virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
- const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other);
- if (start_idx >= o->result_set_->size()) return 0;
- const int rows_added =
- min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx);
- result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx,
- o->result_set_->begin() + start_idx + rows_added);
- return rows_added;
- }
-
- virtual int64_t ByteSize(int start_idx, int num_rows) {
- int64_t bytes = 0;
- const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx);
- for (int i = start_idx; i < start_idx + end; ++i) {
- bytes += sizeof(result_set_[i]) + result_set_[i].capacity();
- }
- return bytes;
- }
-
- virtual size_t size() { return result_set_->size(); }
-
- private:
- // Metadata of the result set
- const TResultSetMetadata& metadata_;
-
- // Points to the result set to be filled. The result set this points to may be owned by
- // this object, in which case owned_result_set_ is set.
- vector<string>* result_set_;
-
- // Set to result_set_ if result_set_ is owned.
- scoped_ptr<vector<string>> owned_result_set_;
-};
-
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
VLOG_QUERY << "query(): query=" << query.query;
ScopedSessionState session_handle(this);
@@ -588,9 +496,9 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
Status fetch_rows_status;
query_results->data.clear();
if (!exec_state->eos()) {
- AsciiQueryResultSet result_set(*(exec_state->result_metadata()),
- &(query_results->data));
- fetch_rows_status = exec_state->FetchRows(fetch_size, &result_set);
+ scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet(
+ *exec_state->result_metadata(), &query_results->data));
+ fetch_rows_status = exec_state->FetchRows(fetch_size, result_set.get());
}
query_results->__set_has_more(!exec_state->eos());
query_results->__isset.data = true;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index de0e2f3..488a1ee 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -85,315 +85,10 @@ namespace impala {
const string IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size";
-// Utility functions for computing the size of HS2 Thrift structs in bytes.
-static inline
-int64_t ByteSize(const thrift::TColumnValue& val) {
- return sizeof(val) + val.stringVal.value.capacity();
-}
-
-static int64_t ByteSize(const thrift::TRow& row) {
- int64_t bytes = sizeof(row);
- for (const thrift::TColumnValue& c: row.colVals) {
- bytes += ByteSize(c);
- }
- return bytes;
-}
-
-// Returns the size, in bytes, of a Hive TColumn structure, only taking into account those
-// values in the range [start_idx, end_idx).
-static uint32_t TColumnByteSize(const thrift::TColumn& col, uint32_t start_idx,
- uint32_t end_idx) {
- DCHECK_LE(start_idx, end_idx);
- uint32_t num_rows = end_idx - start_idx;
- if (num_rows == 0) return 0L;
-
- if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size();
- if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size();
- if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size();
- if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size();
- if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size();
- if (col.__isset.doubleVal) {
- return (num_rows * sizeof(double)) + col.doubleVal.nulls.size();
- }
- if (col.__isset.stringVal) {
- uint32_t bytes = 0;
- for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size();
- return bytes + col.stringVal.nulls.size();
- }
-
- return 0;
-}
-
// Helper function to translate between Beeswax and HiveServer2 type
static TOperationState::type QueryStateToTOperationState(
const beeswax::QueryState::type& query_state);
-// Result set container for Hive protocol versions >= V6, where results are returned in
-// column-orientation.
-class HS2ColumnarResultSet : public QueryResultSet {
- public:
- HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
- : metadata_(metadata), result_set_(rowset), num_rows_(0) {
- if (rowset == NULL) {
- owned_result_set_.reset(new TRowSet());
- result_set_ = owned_result_set_.get();
- }
- InitColumns();
- }
-
- virtual ~HS2ColumnarResultSet() { }
-
- // Add a row of expr values
- virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
- int num_col = col_values.size();
- DCHECK_EQ(num_col, metadata_.columns.size());
- for (int i = 0; i < num_col; ++i) {
- ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_,
- &(result_set_->columns[i]));
- }
- ++num_rows_;
- return Status::OK();
- }
-
- // Add a row from a TResultRow
- virtual Status AddOneRow(const TResultRow& row) {
- int num_col = row.colVals.size();
- DCHECK_EQ(num_col, metadata_.columns.size());
- for (int i = 0; i < num_col; ++i) {
- TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_,
- &(result_set_->columns[i]));
- }
- ++num_rows_;
- return Status::OK();
- }
-
- // Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
- // from 'other' into this result set
- virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
- const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other);
- DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size());
- if (start_idx >= o->num_rows_) return 0;
- const int rows_added = min<int64_t>(num_rows, o->num_rows_ - start_idx);
- for (int j = 0; j < metadata_.columns.size(); ++j) {
- thrift::TColumn* from = &o->result_set_->columns[j];
- thrift::TColumn* to = &result_set_->columns[j];
- switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
- case TPrimitiveType::NULL_TYPE:
- case TPrimitiveType::BOOLEAN:
- StitchNulls(num_rows_, rows_added, start_idx, from->boolVal.nulls,
- &(to->boolVal.nulls));
- to->boolVal.values.insert(
- to->boolVal.values.end(),
- from->boolVal.values.begin() + start_idx,
- from->boolVal.values.begin() + start_idx + rows_added);
- break;
- case TPrimitiveType::TINYINT:
- StitchNulls(num_rows_, rows_added, start_idx, from->byteVal.nulls,
- &(to->byteVal.nulls));
- to->byteVal.values.insert(
- to->byteVal.values.end(),
- from->byteVal.values.begin() + start_idx,
- from->byteVal.values.begin() + start_idx + rows_added);
- break;
- case TPrimitiveType::SMALLINT:
- StitchNulls(num_rows_, rows_added, start_idx, from->i16Val.nulls,
- &(to->i16Val.nulls));
- to->i16Val.values.insert(
- to->i16Val.values.end(),
- from->i16Val.values.begin() + start_idx,
- from->i16Val.values.begin() + start_idx + rows_added);
- break;
- case TPrimitiveType::INT:
- StitchNulls(num_rows_, rows_added, start_idx, from->i32Val.nulls,
- &(to->i32Val.nulls));
- to->i32Val.values.insert(
- to->i32Val.values.end(),
- from->i32Val.values.begin() + start_idx,
- from->i32Val.values.begin() + start_idx + rows_added);
- break;
- case TPrimitiveType::BIGINT:
- StitchNulls(num_rows_, rows_added, start_idx, from->i64Val.nulls,
- &(to->i64Val.nulls));
- to->i64Val.values.insert(
- to->i64Val.values.end(),
- from->i64Val.values.begin() + start_idx,
- from->i64Val.values.begin() + start_idx + rows_added);
- break;
- case TPrimitiveType::FLOAT:
- case TPrimitiveType::DOUBLE:
- StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls,
- &(to->doubleVal.nulls));
- to->doubleVal.values.insert(
- to->doubleVal.values.end(),
- from->doubleVal.values.begin() + start_idx,
- from->doubleVal.values.begin() + start_idx + rows_added);
- break;
- case TPrimitiveType::TIMESTAMP:
- case TPrimitiveType::DECIMAL:
- case TPrimitiveType::STRING:
- case TPrimitiveType::VARCHAR:
- case TPrimitiveType::CHAR:
- StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls,
- &(to->stringVal.nulls));
- to->stringVal.values.insert(to->stringVal.values.end(),
- from->stringVal.values.begin() + start_idx,
- from->stringVal.values.begin() + start_idx + rows_added);
- break;
- default:
- DCHECK(false) << "Unsupported type: " << TypeToString(ThriftToType(
- metadata_.columns[j].columnType.types[0].scalar_type.type));
- break;
- }
- }
- num_rows_ += rows_added;
- return rows_added;
- }
-
- virtual int64_t ByteSize(int start_idx, int num_rows) {
- const int end = min(start_idx + num_rows, (int)size());
- int64_t bytes = 0L;
- for (const thrift::TColumn& c: result_set_->columns) {
- bytes += TColumnByteSize(c, start_idx, end);
- }
- return bytes;
- }
-
- virtual size_t size() { return num_rows_; }
-
- private:
- // Metadata of the result set
- const TResultSetMetadata& metadata_;
-
- // Points to the TRowSet to be filled. The row set this points to may be owned by
- // this object, in which case owned_result_set_ is set.
- TRowSet* result_set_;
-
- // Set to result_set_ if result_set_ is owned.
- scoped_ptr<TRowSet> owned_result_set_;
-
- int64_t num_rows_;
-
- void InitColumns() {
- result_set_->__isset.columns = true;
- for (const TColumn& col: metadata_.columns) {
- DCHECK(col.columnType.types.size() == 1) <<
- "Structured columns unsupported in HS2 interface";
- thrift::TColumn column;
- switch (col.columnType.types[0].scalar_type.type) {
- case TPrimitiveType::NULL_TYPE:
- case TPrimitiveType::BOOLEAN:
- column.__isset.boolVal = true;
- break;
- case TPrimitiveType::TINYINT:
- column.__isset.byteVal = true;
- break;
- case TPrimitiveType::SMALLINT:
- column.__isset.i16Val = true;
- break;
- case TPrimitiveType::INT:
- column.__isset.i32Val = true;
- break;
- case TPrimitiveType::BIGINT:
- column.__isset.i64Val = true;
- break;
- case TPrimitiveType::FLOAT:
- case TPrimitiveType::DOUBLE:
- column.__isset.doubleVal = true;
- break;
- case TPrimitiveType::TIMESTAMP:
- case TPrimitiveType::DECIMAL:
- case TPrimitiveType::VARCHAR:
- case TPrimitiveType::CHAR:
- case TPrimitiveType::STRING:
- column.__isset.stringVal = true;
- break;
- default:
- DCHECK(false) << "Unhandled column type: "
- << TypeToString(
- ThriftToType(col.columnType.types[0].scalar_type.type));
- }
- result_set_->columns.push_back(column);
- }
- }
-};
-
-// TRow result set for HiveServer2
-class HS2RowOrientedResultSet : public QueryResultSet {
- public:
- // Rows are added into rowset.
- HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
- : metadata_(metadata), result_set_(rowset) {
- if (rowset == NULL) {
- owned_result_set_.reset(new TRowSet());
- result_set_ = owned_result_set_.get();
- }
- }
-
- virtual ~HS2RowOrientedResultSet() { }
-
- // Convert expr value to HS2 TRow and store it in TRowSet.
- virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
- int num_col = col_values.size();
- DCHECK_EQ(num_col, metadata_.columns.size());
- result_set_->rows.push_back(TRow());
- TRow& trow = result_set_->rows.back();
- trow.colVals.resize(num_col);
- for (int i = 0; i < num_col; ++i) {
- ExprValueToHS2TColumnValue(col_values[i],
- metadata_.columns[i].columnType, &(trow.colVals[i]));
- }
- return Status::OK();
- }
-
- // Convert TResultRow to HS2 TRow and store it in TRowSet.
- virtual Status AddOneRow(const TResultRow& row) {
- int num_col = row.colVals.size();
- DCHECK_EQ(num_col, metadata_.columns.size());
- result_set_->rows.push_back(TRow());
- TRow& trow = result_set_->rows.back();
- trow.colVals.resize(num_col);
- for (int i = 0; i < num_col; ++i) {
- TColumnValueToHS2TColumnValue(row.colVals[i], metadata_.columns[i].columnType,
- &(trow.colVals[i]));
- }
- return Status::OK();
- }
-
- virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
- const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other);
- if (start_idx >= o->result_set_->rows.size()) return 0;
- const int rows_added =
- min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx);
- for (int i = start_idx; i < start_idx + rows_added; ++i) {
- result_set_->rows.push_back(o->result_set_->rows[i]);
- }
- return rows_added;
- }
-
- virtual int64_t ByteSize(int start_idx, int num_rows) {
- int64_t bytes = 0;
- const int end =
- min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx);
- for (int i = start_idx; i < start_idx + end; ++i) {
- bytes += impala::ByteSize(result_set_->rows[i]);
- }
- return bytes;
- }
-
- virtual size_t size() { return result_set_->rows.size(); }
-
- private:
- // Metadata of the result set
- const TResultSetMetadata& metadata_;
-
- // Points to the TRowSet to be filled. The row set this points to may be owned by
- // this object, in which case owned_result_set_ is set.
- TRowSet* result_set_;
-
- // Set to result_set_ if result_set_ is owned.
- scoped_ptr<TRowSet> owned_result_set_;
-};
-
void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) {
TUniqueId session_id;
@@ -473,18 +168,6 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
}
-namespace {
-
-QueryResultSet* CreateHS2ResultSet(
- TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) {
- if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
- return new HS2RowOrientedResultSet(metadata, rowset);
- } else {
- return new HS2ColumnarResultSet(metadata, rowset);
- }
-}
-}
-
Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
bool fetch_first, TFetchResultsResp* fetch_results) {
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
@@ -522,8 +205,8 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size
bool is_child_query = exec_state->parent_query_id() != TUniqueId();
TProtocolVersion::type version = is_child_query ?
TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version;
- scoped_ptr<QueryResultSet> result_set(CreateHS2ResultSet(version,
- *(exec_state->result_metadata()), &(fetch_results->results)));
+ scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet(
+ version, *(exec_state->result_metadata()), &(fetch_results->results)));
RETURN_IF_ERROR(exec_state->FetchRows(fetch_size, result_set.get()));
fetch_results->__isset.results = true;
fetch_results->__set_hasMoreRows(!exec_state->eos());
@@ -763,7 +446,8 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
// Optionally enable result caching on the QueryExecState.
if (cache_num_rows > 0) {
status = exec_state->SetResultCache(
- CreateHS2ResultSet(session->hs2_version, *exec_state->result_metadata(), nullptr),
+ QueryResultSet::CreateHS2ResultSet(
+ session->hs2_version, *exec_state->result_metadata(), nullptr),
cache_num_rows);
if (!status.ok()) {
UnregisterQuery(exec_state->query_id(), false, &status);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/query-result-set.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc
new file mode 100644
index 0000000..3b17af7
--- /dev/null
+++ b/be/src/service/query-result-set.cc
@@ -0,0 +1,478 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/query-result-set.h"
+
+#include <sstream>
+#include <boost/scoped_ptr.hpp>
+
+#include "rpc/thrift-util.h"
+#include "runtime/raw-value.h"
+#include "runtime/types.h"
+#include "service/hs2-util.h"
+
+#include "common/names.h"
+
+using ThriftTColumn = apache::hive::service::cli::thrift::TColumn;
+using ThriftTColumnValue = apache::hive::service::cli::thrift::TColumnValue;
+using apache::hive::service::cli::thrift::TProtocolVersion;
+using apache::hive::service::cli::thrift::TRow;
+using apache::hive::service::cli::thrift::TRowSet;
+
+namespace {
+
+/// Ascii output precision for double/float
+constexpr int ASCII_PRECISION = 16;
+}
+
+namespace impala {
+
+/// Ascii result set for Beeswax. Rows are returned in ascii text encoding, using "\t" as
+/// column delimiter.
+class AsciiQueryResultSet : public QueryResultSet {
+ public:
+ /// Rows are added into 'rowset'.
+ AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)
+ : metadata_(metadata), result_set_(rowset) {}
+
+ virtual ~AsciiQueryResultSet() {}
+
+ /// Convert one row's expr values stored in 'col_values' to ASCII using "\t" as column
+ /// delimiter and store it in this result set.
+ /// TODO: Handle complex types.
+ virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+
+ /// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
+ /// result set.
+ virtual Status AddOneRow(const TResultRow& row);
+
+ virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+ virtual int64_t ByteSize(int start_idx, int num_rows);
+ virtual size_t size() { return result_set_->size(); }
+
+ private:
+ /// Metadata of the result set
+ const TResultSetMetadata& metadata_;
+
+ /// Points to the result set to be filled. Not owned by this object.
+ vector<string>* result_set_;
+};
+
+/// Result set container for Hive protocol versions >= V6, where results are returned in
+/// column-orientation.
+class HS2ColumnarResultSet : public QueryResultSet {
+ public:
+ HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset);
+
+ virtual ~HS2ColumnarResultSet(){};
+
+ /// Add a row of expr values
+ virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+
+ /// Add a row from a TResultRow
+ virtual Status AddOneRow(const TResultRow& row);
+
+ /// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
+ /// from 'other' into this result set
+ virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+
+ virtual int64_t ByteSize(int start_idx, int num_rows);
+ virtual size_t size() { return num_rows_; }
+
+ private:
+ /// Metadata of the result set
+ const TResultSetMetadata& metadata_;
+
+ /// Points to the TRowSet to be filled. The row set
+ /// this points to may be owned by
+ /// this object, in which case owned_result_set_ is set.
+ TRowSet* result_set_;
+
+ /// Set to result_set_ if result_set_ is owned.
+ boost::scoped_ptr<TRowSet> owned_result_set_;
+
+ int64_t num_rows_;
+
+ void InitColumns();
+};
+
+/// Row oriented result set for HiveServer2, used to serve HS2 requests with protocol
+/// version <= V5.
+class HS2RowOrientedResultSet : public QueryResultSet {
+ public:
+ /// Rows are added into rowset.
+ HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset);
+
+ virtual ~HS2RowOrientedResultSet() {}
+
+ /// Convert expr values to HS2 TRow and store it in a TRowSet.
+ virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+
+ /// Convert TResultRow to HS2 TRow and store it in a TRowSet
+ virtual Status AddOneRow(const TResultRow& row);
+
+ virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+ virtual int64_t ByteSize(int start_idx, int num_rows);
+ virtual size_t size() { return result_set_->rows.size(); }
+
+ private:
+ /// Metadata of the result set
+ const TResultSetMetadata& metadata_;
+
+ /// Points to the TRowSet to be filled. The row set
+ /// this points to may be owned by
+ /// this object, in which case owned_result_set_ is set.
+ TRowSet* result_set_;
+
+ /// Set to result_set_ if result_set_ is owned.
+ scoped_ptr<TRowSet> owned_result_set_;
+};
+
+QueryResultSet* QueryResultSet::CreateAsciiQueryResultSet(
+ const TResultSetMetadata& metadata, vector<string>* rowset) {
+ return new AsciiQueryResultSet(metadata, rowset);
+}
+
+QueryResultSet* QueryResultSet::CreateHS2ResultSet(
+ TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) {
+ if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
+ return new HS2RowOrientedResultSet(metadata, rowset);
+ } else {
+ return new HS2ColumnarResultSet(metadata, rowset);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////////////////////
+
+Status AsciiQueryResultSet::AddOneRow(
+ const vector<void*>& col_values, const vector<int>& scales) {
+ int num_col = col_values.size();
+ DCHECK_EQ(num_col, metadata_.columns.size());
+ stringstream out_stream;
+ out_stream.precision(ASCII_PRECISION);
+ for (int i = 0; i < num_col; ++i) {
+ // ODBC-187 - ODBC can only take "\t" as the delimiter
+ out_stream << (i > 0 ? "\t" : "");
+ DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
+ RawValue::PrintValue(col_values[i],
+ ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], &out_stream);
+ }
+ result_set_->push_back(out_stream.str());
+ return Status::OK();
+}
+
+Status AsciiQueryResultSet::AddOneRow(const TResultRow& row) {
+ int num_col = row.colVals.size();
+ DCHECK_EQ(num_col, metadata_.columns.size());
+ stringstream out_stream;
+ out_stream.precision(ASCII_PRECISION);
+ for (int i = 0; i < num_col; ++i) {
+ // ODBC-187 - ODBC can only take "\t" as the delimiter
+ out_stream << (i > 0 ? "\t" : "");
+ out_stream << row.colVals[i];
+ }
+ result_set_->push_back(out_stream.str());
+ return Status::OK();
+}
+
+int AsciiQueryResultSet::AddRows(
+ const QueryResultSet* other, int start_idx, int num_rows) {
+ const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other);
+ if (start_idx >= o->result_set_->size()) return 0;
+ const int rows_added =
+ min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx);
+ result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx,
+ o->result_set_->begin() + start_idx + rows_added);
+ return rows_added;
+}
+
+int64_t AsciiQueryResultSet::ByteSize(int start_idx, int num_rows) {
+ int64_t bytes = 0;
+ const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx);
+ for (int i = start_idx; i < start_idx + end; ++i) {
+ bytes += sizeof(result_set_[i]) + result_set_[i].capacity();
+ }
+ return bytes;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+// Utility functions for computing the size of HS2 Thrift structs in bytes.
+inline int64_t ByteSize(const ThriftTColumnValue& val) {
+ return sizeof(val) + val.stringVal.value.capacity();
+}
+
+int64_t ByteSize(const TRow& row) {
+ int64_t bytes = sizeof(row);
+ for (const ThriftTColumnValue& c : row.colVals) {
+ bytes += ByteSize(c);
+ }
+ return bytes;
+}
+
+// Returns the size, in bytes, of a Hive TColumn structure, only taking into account those
+// values in the range [start_idx, end_idx).
+uint32_t TColumnByteSize(const ThriftTColumn& col, uint32_t start_idx, uint32_t end_idx) {
+ DCHECK_LE(start_idx, end_idx);
+ uint32_t num_rows = end_idx - start_idx;
+ if (num_rows == 0) return 0L;
+
+ if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size();
+ if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size();
+ if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size();
+ if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size();
+ if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size();
+ if (col.__isset.doubleVal) {
+ return (num_rows * sizeof(double)) + col.doubleVal.nulls.size();
+ }
+ if (col.__isset.stringVal) {
+ uint32_t bytes = 0;
+ for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size();
+ return bytes + col.stringVal.nulls.size();
+ }
+
+ return 0;
+}
+}
+
+// Result set container for Hive protocol versions >= V6, where results are returned in
+// column-orientation.
+HS2ColumnarResultSet::HS2ColumnarResultSet(
+ const TResultSetMetadata& metadata, TRowSet* rowset)
+ : metadata_(metadata), result_set_(rowset), num_rows_(0) {
+ if (rowset == NULL) {
+ owned_result_set_.reset(new TRowSet());
+ result_set_ = owned_result_set_.get();
+ }
+ InitColumns();
+}
+
+// Add a row of expr values
+Status HS2ColumnarResultSet::AddOneRow(
+ const vector<void*>& col_values, const vector<int>& scales) {
+ int num_col = col_values.size();
+ DCHECK_EQ(num_col, metadata_.columns.size());
+ for (int i = 0; i < num_col; ++i) {
+ ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_,
+ &(result_set_->columns[i]));
+ }
+ ++num_rows_;
+ return Status::OK();
+}
+
+// Add a row from a TResultRow
+Status HS2ColumnarResultSet::AddOneRow(const TResultRow& row) {
+ int num_col = row.colVals.size();
+ DCHECK_EQ(num_col, metadata_.columns.size());
+ for (int i = 0; i < num_col; ++i) {
+ TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_,
+ &(result_set_->columns[i]));
+ }
+ ++num_rows_;
+ return Status::OK();
+}
+
+// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
+// from 'other' into this result set
+int HS2ColumnarResultSet::AddRows(
+ const QueryResultSet* other, int start_idx, int num_rows) {
+ const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other);
+ DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size());
+ if (start_idx >= o->num_rows_) return 0;
+ const int rows_added = min<int64_t>(num_rows, o->num_rows_ - start_idx);
+ for (int j = 0; j < metadata_.columns.size(); ++j) {
+ ThriftTColumn* from = &o->result_set_->columns[j];
+ ThriftTColumn* to = &result_set_->columns[j];
+ switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
+ case TPrimitiveType::NULL_TYPE:
+ case TPrimitiveType::BOOLEAN:
+ StitchNulls(
+ num_rows_, rows_added, start_idx, from->boolVal.nulls, &(to->boolVal.nulls));
+ to->boolVal.values.insert(to->boolVal.values.end(),
+ from->boolVal.values.begin() + start_idx,
+ from->boolVal.values.begin() + start_idx + rows_added);
+ break;
+ case TPrimitiveType::TINYINT:
+ StitchNulls(
+ num_rows_, rows_added, start_idx, from->byteVal.nulls, &(to->byteVal.nulls));
+ to->byteVal.values.insert(to->byteVal.values.end(),
+ from->byteVal.values.begin() + start_idx,
+ from->byteVal.values.begin() + start_idx + rows_added);
+ break;
+ case TPrimitiveType::SMALLINT:
+ StitchNulls(
+ num_rows_, rows_added, start_idx, from->i16Val.nulls, &(to->i16Val.nulls));
+ to->i16Val.values.insert(to->i16Val.values.end(),
+ from->i16Val.values.begin() + start_idx,
+ from->i16Val.values.begin() + start_idx + rows_added);
+ break;
+ case TPrimitiveType::INT:
+ StitchNulls(
+ num_rows_, rows_added, start_idx, from->i32Val.nulls, &(to->i32Val.nulls));
+ to->i32Val.values.insert(to->i32Val.values.end(),
+ from->i32Val.values.begin() + start_idx,
+ from->i32Val.values.begin() + start_idx + rows_added);
+ break;
+ case TPrimitiveType::BIGINT:
+ StitchNulls(
+ num_rows_, rows_added, start_idx, from->i64Val.nulls, &(to->i64Val.nulls));
+ to->i64Val.values.insert(to->i64Val.values.end(),
+ from->i64Val.values.begin() + start_idx,
+ from->i64Val.values.begin() + start_idx + rows_added);
+ break;
+ case TPrimitiveType::FLOAT:
+ case TPrimitiveType::DOUBLE:
+ StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls,
+ &(to->doubleVal.nulls));
+ to->doubleVal.values.insert(to->doubleVal.values.end(),
+ from->doubleVal.values.begin() + start_idx,
+ from->doubleVal.values.begin() + start_idx + rows_added);
+ break;
+ case TPrimitiveType::TIMESTAMP:
+ case TPrimitiveType::DECIMAL:
+ case TPrimitiveType::STRING:
+ case TPrimitiveType::VARCHAR:
+ case TPrimitiveType::CHAR:
+ StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls,
+ &(to->stringVal.nulls));
+ to->stringVal.values.insert(to->stringVal.values.end(),
+ from->stringVal.values.begin() + start_idx,
+ from->stringVal.values.begin() + start_idx + rows_added);
+ break;
+ default:
+ DCHECK(false) << "Unsupported type: "
+ << TypeToString(ThriftToType(
+ metadata_.columns[j].columnType.types[0].scalar_type.type));
+ break;
+ }
+ }
+ num_rows_ += rows_added;
+ return rows_added;
+}
+
+int64_t HS2ColumnarResultSet::ByteSize(int start_idx, int num_rows) {
+ const int end = min(start_idx + num_rows, (int)size());
+ int64_t bytes = 0L;
+ for (const ThriftTColumn& c : result_set_->columns) {
+ bytes += TColumnByteSize(c, start_idx, end);
+ }
+ return bytes;
+}
+
+void HS2ColumnarResultSet::InitColumns() {
+ result_set_->__isset.columns = true;
+ for (const TColumn& col : metadata_.columns) {
+ DCHECK(col.columnType.types.size() == 1)
+ << "Structured columns unsupported in HS2 interface";
+ ThriftTColumn column;
+ switch (col.columnType.types[0].scalar_type.type) {
+ case TPrimitiveType::NULL_TYPE:
+ case TPrimitiveType::BOOLEAN:
+ column.__isset.boolVal = true;
+ break;
+ case TPrimitiveType::TINYINT:
+ column.__isset.byteVal = true;
+ break;
+ case TPrimitiveType::SMALLINT:
+ column.__isset.i16Val = true;
+ break;
+ case TPrimitiveType::INT:
+ column.__isset.i32Val = true;
+ break;
+ case TPrimitiveType::BIGINT:
+ column.__isset.i64Val = true;
+ break;
+ case TPrimitiveType::FLOAT:
+ case TPrimitiveType::DOUBLE:
+ column.__isset.doubleVal = true;
+ break;
+ case TPrimitiveType::TIMESTAMP:
+ case TPrimitiveType::DECIMAL:
+ case TPrimitiveType::VARCHAR:
+ case TPrimitiveType::CHAR:
+ case TPrimitiveType::STRING:
+ column.__isset.stringVal = true;
+ break;
+ default:
+ DCHECK(false) << "Unhandled column type: "
+ << TypeToString(
+ ThriftToType(col.columnType.types[0].scalar_type.type));
+ }
+ result_set_->columns.push_back(column);
+ }
+}
+
+HS2RowOrientedResultSet::HS2RowOrientedResultSet(
+ const TResultSetMetadata& metadata, TRowSet* rowset)
+ : metadata_(metadata), result_set_(rowset) {
+ if (rowset == NULL) {
+ owned_result_set_.reset(new TRowSet());
+ result_set_ = owned_result_set_.get();
+ }
+}
+
+Status HS2RowOrientedResultSet::AddOneRow(
+ const vector<void*>& col_values, const vector<int>& scales) {
+ int num_col = col_values.size();
+ DCHECK_EQ(num_col, metadata_.columns.size());
+ result_set_->rows.push_back(TRow());
+ TRow& trow = result_set_->rows.back();
+ trow.colVals.resize(num_col);
+ for (int i = 0; i < num_col; ++i) {
+ ExprValueToHS2TColumnValue(
+ col_values[i], metadata_.columns[i].columnType, &(trow.colVals[i]));
+ }
+ return Status::OK();
+}
+
+Status HS2RowOrientedResultSet::AddOneRow(const TResultRow& row) {
+ int num_col = row.colVals.size();
+ DCHECK_EQ(num_col, metadata_.columns.size());
+ result_set_->rows.push_back(TRow());
+ TRow& trow = result_set_->rows.back();
+ trow.colVals.resize(num_col);
+ for (int i = 0; i < num_col; ++i) {
+ TColumnValueToHS2TColumnValue(
+ row.colVals[i], metadata_.columns[i].columnType, &(trow.colVals[i]));
+ }
+ return Status::OK();
+}
+
+int HS2RowOrientedResultSet::AddRows(
+ const QueryResultSet* other, int start_idx, int num_rows) {
+ const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other);
+ if (start_idx >= o->result_set_->rows.size()) return 0;
+ const int rows_added =
+ min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx);
+ for (int i = start_idx; i < start_idx + rows_added; ++i) {
+ result_set_->rows.push_back(o->result_set_->rows[i]);
+ }
+ return rows_added;
+}
+
+int64_t HS2RowOrientedResultSet::ByteSize(int start_idx, int num_rows) {
+ int64_t bytes = 0;
+ const int end =
+ min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx);
+ for (int i = start_idx; i < start_idx + end; ++i) {
+ bytes += impala::ByteSize(result_set_->rows[i]);
+ }
+ return bytes;
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/query-result-set.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h
index b444ca3..e0c88d7 100644
--- a/be/src/service/query-result-set.h
+++ b/be/src/service/query-result-set.h
@@ -20,15 +20,17 @@
#include "common/status.h"
#include "gen-cpp/Data_types.h"
+#include "gen-cpp/Results_types.h"
+#include "gen-cpp/TCLIService_types.h"
#include <vector>
namespace impala {
-/// Stores client-ready query result rows returned by
-/// QueryExecState::FetchRows(). Subclasses implement AddRows() / AddOneRow() to
-/// specialise how Impala's row batches are converted to client-API result
-/// representations.
+/// Wraps a client-API specific result representation, and implements the logic required
+/// to translate into that format from Impala's row format.
+///
+/// Subclasses implement AddRows() / AddOneRow() to specialise that logic.
class QueryResultSet {
public:
QueryResultSet() {}
@@ -58,6 +60,17 @@ class QueryResultSet {
/// Returns the size of this result set in number of rows.
virtual size_t size() = 0;
+
+ /// Returns a result set suitable for Beeswax-based clients.
+ static QueryResultSet* CreateAsciiQueryResultSet(
+ const TResultSetMetadata& metadata, std::vector<std::string>* rowset);
+
+ /// Returns a result set suitable for HS2-based clients. If 'rowset' is nullptr, the
+ /// returned object will allocate and manage its own rowset.
+ static QueryResultSet* CreateHS2ResultSet(
+ apache::hive::service::cli::thrift::TProtocolVersion::type version,
+ const TResultSetMetadata& metadata,
+ apache::hive::service::cli::thrift::TRowSet* rowset);
};
}