You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jy...@apache.org on 2016/10/18 15:14:18 UTC

[4/4] incubator-impala git commit: IMPALA-2905: Move QueryResultSet implementations into separate module

IMPALA-2905: Move QueryResultSet implementations into separate module

This mostly mechanical change moves the definition and implementation of
the Beeswax and HS2-specific result sets into their own module. Result
sets are now uniformly created by one of two factory methods, so the
implementation is decoupled from the client.

Change-Id: I6ab883b62d3ec7012240edf8d56889349e7c0e32
Reviewed-on: http://gerrit.cloudera.org:8080/4736
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3f5380dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3f5380dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3f5380dc

Branch: refs/heads/master
Commit: 3f5380dc73f3ab907443a2858d4fe0de6e3685e7
Parents: 080a678
Author: Henry Robinson <he...@cloudera.com>
Authored: Sat Oct 15 16:47:24 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 18 09:30:09 2016 +0000

----------------------------------------------------------------------
 be/src/service/CMakeLists.txt           |   1 +
 be/src/service/impala-beeswax-server.cc |  98 +-----
 be/src/service/impala-hs2-server.cc     | 324 +-----------------
 be/src/service/query-result-set.cc      | 478 +++++++++++++++++++++++++++
 be/src/service/query-result-set.h       |  21 +-
 5 files changed, 503 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index aa12ceb..35130ff 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -33,6 +33,7 @@ add_library(Service
   impala-beeswax-server.cc
   query-exec-state.cc
   query-options.cc
+  query-result-set.cc
   child-query.cc
   impalad-main.cc
 )

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index ee7f958..b50499e 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -47,100 +47,8 @@ using namespace beeswax;
     }                                                           \
   } while (false)
 
-namespace {
-
-/// Ascii output precision for double/float
-constexpr int ASCII_PRECISION = 16;
-}
-
 namespace impala {
 
-// Ascii result set for Beeswax.
-// Beeswax returns rows in ascii, using "\t" as column delimiter.
-class AsciiQueryResultSet : public QueryResultSet {
- public:
-  // Rows are added into rowset.
-  AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)
-    : metadata_(metadata), result_set_(rowset), owned_result_set_(NULL) {
-  }
-
-  // Rows are added into a new rowset that is owned by this result set.
-  AsciiQueryResultSet(const TResultSetMetadata& metadata)
-    : metadata_(metadata), result_set_(new vector<string>()),
-      owned_result_set_(result_set_) {
-  }
-
-  virtual ~AsciiQueryResultSet() { }
-
-  // Convert expr values (col_values) to ASCII using "\t" as column delimiter and store
-  // it in this result set.
-  // TODO: Handle complex types.
-  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
-    int num_col = col_values.size();
-    DCHECK_EQ(num_col, metadata_.columns.size());
-    stringstream out_stream;
-    out_stream.precision(ASCII_PRECISION);
-    for (int i = 0; i < num_col; ++i) {
-      // ODBC-187 - ODBC can only take "\t" as the delimiter
-      out_stream << (i > 0 ? "\t" : "");
-      DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
-      RawValue::PrintValue(col_values[i],
-          ColumnType::FromThrift(metadata_.columns[i].columnType),
-          scales[i], &out_stream);
-    }
-    result_set_->push_back(out_stream.str());
-    return Status::OK();
-  }
-
-  // Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
-  // result set.
-  virtual Status AddOneRow(const TResultRow& row) {
-    int num_col = row.colVals.size();
-    DCHECK_EQ(num_col, metadata_.columns.size());
-    stringstream out_stream;
-    out_stream.precision(ASCII_PRECISION);
-    for (int i = 0; i < num_col; ++i) {
-      // ODBC-187 - ODBC can only take "\t" as the delimiter
-      out_stream << (i > 0 ? "\t" : "");
-      out_stream << row.colVals[i];
-    }
-    result_set_->push_back(out_stream.str());
-    return Status::OK();
-  }
-
-  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
-    const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other);
-    if (start_idx >= o->result_set_->size()) return 0;
-    const int rows_added =
-        min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx);
-    result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx,
-        o->result_set_->begin() + start_idx + rows_added);
-    return rows_added;
-  }
-
-  virtual int64_t ByteSize(int start_idx, int num_rows) {
-    int64_t bytes = 0;
-    const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx);
-    for (int i = start_idx; i < start_idx + end; ++i) {
-      bytes += sizeof(result_set_[i]) + result_set_[i].capacity();
-    }
-    return bytes;
-  }
-
-  virtual size_t size() { return result_set_->size(); }
-
- private:
-  // Metadata of the result set
-  const TResultSetMetadata& metadata_;
-
-  // Points to the result set to be filled. The result set this points to may be owned by
-  // this object, in which case owned_result_set_ is set.
-  vector<string>* result_set_;
-
-  // Set to result_set_ if result_set_ is owned.
-  scoped_ptr<vector<string>> owned_result_set_;
-};
-
 void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
   VLOG_QUERY << "query(): query=" << query.query;
   ScopedSessionState session_handle(this);
@@ -588,9 +496,9 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
   Status fetch_rows_status;
   query_results->data.clear();
   if (!exec_state->eos()) {
-    AsciiQueryResultSet result_set(*(exec_state->result_metadata()),
-        &(query_results->data));
-    fetch_rows_status = exec_state->FetchRows(fetch_size, &result_set);
+    scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet(
+        *exec_state->result_metadata(), &query_results->data));
+    fetch_rows_status = exec_state->FetchRows(fetch_size, result_set.get());
   }
   query_results->__set_has_more(!exec_state->eos());
   query_results->__isset.data = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index de0e2f3..488a1ee 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -85,315 +85,10 @@ namespace impala {
 
 const string IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size";
 
-// Utility functions for computing the size of HS2 Thrift structs in bytes.
-static inline
-int64_t ByteSize(const thrift::TColumnValue& val) {
-  return sizeof(val) + val.stringVal.value.capacity();
-}
-
-static int64_t ByteSize(const thrift::TRow& row) {
-  int64_t bytes = sizeof(row);
-  for (const thrift::TColumnValue& c: row.colVals) {
-    bytes += ByteSize(c);
-  }
-  return bytes;
-}
-
-// Returns the size, in bytes, of a Hive TColumn structure, only taking into account those
-// values in the range [start_idx, end_idx).
-static uint32_t TColumnByteSize(const thrift::TColumn& col, uint32_t start_idx,
-    uint32_t end_idx) {
-  DCHECK_LE(start_idx, end_idx);
-  uint32_t num_rows = end_idx - start_idx;
-  if (num_rows == 0) return 0L;
-
-  if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size();
-  if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size();
-  if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size();
-  if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size();
-  if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size();
-  if (col.__isset.doubleVal) {
-    return (num_rows * sizeof(double)) + col.doubleVal.nulls.size();
-  }
-  if (col.__isset.stringVal) {
-    uint32_t bytes = 0;
-    for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size();
-    return bytes + col.stringVal.nulls.size();
-  }
-
-  return 0;
-}
-
 // Helper function to translate between Beeswax and HiveServer2 type
 static TOperationState::type QueryStateToTOperationState(
     const beeswax::QueryState::type& query_state);
 
-// Result set container for Hive protocol versions >= V6, where results are returned in
-// column-orientation.
-class HS2ColumnarResultSet : public QueryResultSet {
- public:
-  HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
-      : metadata_(metadata), result_set_(rowset), num_rows_(0) {
-    if (rowset == NULL) {
-      owned_result_set_.reset(new TRowSet());
-      result_set_ = owned_result_set_.get();
-    }
-    InitColumns();
-  }
-
-  virtual ~HS2ColumnarResultSet() { }
-
-  // Add a row of expr values
-  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
-    int num_col = col_values.size();
-    DCHECK_EQ(num_col, metadata_.columns.size());
-    for (int i = 0; i < num_col; ++i) {
-      ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_,
-          &(result_set_->columns[i]));
-    }
-    ++num_rows_;
-    return Status::OK();
-  }
-
-  // Add a row from a TResultRow
-  virtual Status AddOneRow(const TResultRow& row) {
-    int num_col = row.colVals.size();
-    DCHECK_EQ(num_col, metadata_.columns.size());
-    for (int i = 0; i < num_col; ++i) {
-      TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_,
-          &(result_set_->columns[i]));
-    }
-    ++num_rows_;
-    return Status::OK();
-  }
-
-  // Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
-  // from 'other' into this result set
-  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
-    const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other);
-    DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size());
-    if (start_idx >= o->num_rows_) return 0;
-    const int rows_added = min<int64_t>(num_rows, o->num_rows_ - start_idx);
-    for (int j = 0; j < metadata_.columns.size(); ++j) {
-      thrift::TColumn* from = &o->result_set_->columns[j];
-      thrift::TColumn* to = &result_set_->columns[j];
-      switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
-        case TPrimitiveType::NULL_TYPE:
-        case TPrimitiveType::BOOLEAN:
-          StitchNulls(num_rows_, rows_added, start_idx, from->boolVal.nulls,
-              &(to->boolVal.nulls));
-          to->boolVal.values.insert(
-              to->boolVal.values.end(),
-              from->boolVal.values.begin() + start_idx,
-              from->boolVal.values.begin() + start_idx + rows_added);
-          break;
-        case TPrimitiveType::TINYINT:
-          StitchNulls(num_rows_, rows_added, start_idx, from->byteVal.nulls,
-              &(to->byteVal.nulls));
-          to->byteVal.values.insert(
-              to->byteVal.values.end(),
-              from->byteVal.values.begin() + start_idx,
-              from->byteVal.values.begin() + start_idx + rows_added);
-          break;
-        case TPrimitiveType::SMALLINT:
-          StitchNulls(num_rows_, rows_added, start_idx, from->i16Val.nulls,
-              &(to->i16Val.nulls));
-          to->i16Val.values.insert(
-              to->i16Val.values.end(),
-              from->i16Val.values.begin() + start_idx,
-              from->i16Val.values.begin() + start_idx + rows_added);
-          break;
-        case TPrimitiveType::INT:
-          StitchNulls(num_rows_, rows_added, start_idx, from->i32Val.nulls,
-              &(to->i32Val.nulls));
-          to->i32Val.values.insert(
-              to->i32Val.values.end(),
-              from->i32Val.values.begin() + start_idx,
-              from->i32Val.values.begin() + start_idx + rows_added);
-          break;
-        case TPrimitiveType::BIGINT:
-          StitchNulls(num_rows_, rows_added, start_idx, from->i64Val.nulls,
-              &(to->i64Val.nulls));
-          to->i64Val.values.insert(
-              to->i64Val.values.end(),
-              from->i64Val.values.begin() + start_idx,
-              from->i64Val.values.begin() + start_idx + rows_added);
-          break;
-        case TPrimitiveType::FLOAT:
-        case TPrimitiveType::DOUBLE:
-          StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls,
-              &(to->doubleVal.nulls));
-          to->doubleVal.values.insert(
-              to->doubleVal.values.end(),
-              from->doubleVal.values.begin() + start_idx,
-              from->doubleVal.values.begin() + start_idx + rows_added);
-          break;
-        case TPrimitiveType::TIMESTAMP:
-        case TPrimitiveType::DECIMAL:
-        case TPrimitiveType::STRING:
-        case TPrimitiveType::VARCHAR:
-        case TPrimitiveType::CHAR:
-          StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls,
-              &(to->stringVal.nulls));
-          to->stringVal.values.insert(to->stringVal.values.end(),
-              from->stringVal.values.begin() + start_idx,
-              from->stringVal.values.begin() + start_idx + rows_added);
-          break;
-        default:
-          DCHECK(false) << "Unsupported type: " << TypeToString(ThriftToType(
-              metadata_.columns[j].columnType.types[0].scalar_type.type));
-          break;
-      }
-    }
-    num_rows_ += rows_added;
-    return rows_added;
-  }
-
-  virtual int64_t ByteSize(int start_idx, int num_rows) {
-    const int end = min(start_idx + num_rows, (int)size());
-    int64_t bytes = 0L;
-    for (const thrift::TColumn& c: result_set_->columns) {
-      bytes += TColumnByteSize(c, start_idx, end);
-    }
-    return bytes;
-  }
-
-  virtual size_t size() { return num_rows_; }
-
- private:
-  // Metadata of the result set
-  const TResultSetMetadata& metadata_;
-
-  // Points to the TRowSet to be filled. The row set this points to may be owned by
-  // this object, in which case owned_result_set_ is set.
-  TRowSet* result_set_;
-
-  // Set to result_set_ if result_set_ is owned.
-  scoped_ptr<TRowSet> owned_result_set_;
-
-  int64_t num_rows_;
-
-  void InitColumns() {
-    result_set_->__isset.columns = true;
-    for (const TColumn& col: metadata_.columns) {
-      DCHECK(col.columnType.types.size() == 1) <<
-          "Structured columns unsupported in HS2 interface";
-      thrift::TColumn column;
-      switch (col.columnType.types[0].scalar_type.type) {
-        case TPrimitiveType::NULL_TYPE:
-        case TPrimitiveType::BOOLEAN:
-          column.__isset.boolVal = true;
-          break;
-        case TPrimitiveType::TINYINT:
-          column.__isset.byteVal = true;
-          break;
-        case TPrimitiveType::SMALLINT:
-          column.__isset.i16Val = true;
-          break;
-        case TPrimitiveType::INT:
-          column.__isset.i32Val = true;
-          break;
-        case TPrimitiveType::BIGINT:
-          column.__isset.i64Val = true;
-          break;
-        case TPrimitiveType::FLOAT:
-        case TPrimitiveType::DOUBLE:
-          column.__isset.doubleVal = true;
-          break;
-        case TPrimitiveType::TIMESTAMP:
-        case TPrimitiveType::DECIMAL:
-        case TPrimitiveType::VARCHAR:
-        case TPrimitiveType::CHAR:
-        case TPrimitiveType::STRING:
-          column.__isset.stringVal = true;
-          break;
-        default:
-          DCHECK(false) << "Unhandled column type: "
-                        << TypeToString(
-                            ThriftToType(col.columnType.types[0].scalar_type.type));
-      }
-      result_set_->columns.push_back(column);
-    }
-  }
-};
-
-// TRow result set for HiveServer2
-class HS2RowOrientedResultSet : public QueryResultSet {
- public:
-  // Rows are added into rowset.
-  HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
-      : metadata_(metadata), result_set_(rowset) {
-    if (rowset == NULL) {
-      owned_result_set_.reset(new TRowSet());
-      result_set_ = owned_result_set_.get();
-    }
-  }
-
-  virtual ~HS2RowOrientedResultSet() { }
-
-  // Convert expr value to HS2 TRow and store it in TRowSet.
-  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) {
-    int num_col = col_values.size();
-    DCHECK_EQ(num_col, metadata_.columns.size());
-    result_set_->rows.push_back(TRow());
-    TRow& trow = result_set_->rows.back();
-    trow.colVals.resize(num_col);
-    for (int i = 0; i < num_col; ++i) {
-      ExprValueToHS2TColumnValue(col_values[i],
-          metadata_.columns[i].columnType, &(trow.colVals[i]));
-    }
-    return Status::OK();
-  }
-
-  // Convert TResultRow to HS2 TRow and store it in TRowSet.
-  virtual Status AddOneRow(const TResultRow& row) {
-    int num_col = row.colVals.size();
-    DCHECK_EQ(num_col, metadata_.columns.size());
-    result_set_->rows.push_back(TRow());
-    TRow& trow = result_set_->rows.back();
-    trow.colVals.resize(num_col);
-    for (int i = 0; i < num_col; ++i) {
-      TColumnValueToHS2TColumnValue(row.colVals[i], metadata_.columns[i].columnType,
-          &(trow.colVals[i]));
-    }
-    return Status::OK();
-  }
-
-  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) {
-    const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other);
-    if (start_idx >= o->result_set_->rows.size()) return 0;
-    const int rows_added =
-        min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx);
-    for (int i = start_idx; i < start_idx + rows_added; ++i) {
-      result_set_->rows.push_back(o->result_set_->rows[i]);
-    }
-    return rows_added;
-  }
-
-  virtual int64_t ByteSize(int start_idx, int num_rows) {
-    int64_t bytes = 0;
-    const int end =
-        min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx);
-    for (int i = start_idx; i < start_idx + end; ++i) {
-      bytes += impala::ByteSize(result_set_->rows[i]);
-    }
-    return bytes;
-  }
-
-  virtual size_t size() { return result_set_->rows.size(); }
-
- private:
-  // Metadata of the result set
-  const TResultSetMetadata& metadata_;
-
-  // Points to the TRowSet to be filled. The row set this points to may be owned by
-  // this object, in which case owned_result_set_ is set.
-  TRowSet* result_set_;
-
-  // Set to result_set_ if result_set_ is owned.
-  scoped_ptr<TRowSet> owned_result_set_;
-};
-
 void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
     TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) {
   TUniqueId session_id;
@@ -473,18 +168,6 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
   status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
-namespace {
-
-QueryResultSet* CreateHS2ResultSet(
-    TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) {
-  if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
-    return new HS2RowOrientedResultSet(metadata, rowset);
-  } else {
-    return new HS2ColumnarResultSet(metadata, rowset);
-  }
-}
-}
-
 Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
     bool fetch_first, TFetchResultsResp* fetch_results) {
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
@@ -522,8 +205,8 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size
   bool is_child_query = exec_state->parent_query_id() != TUniqueId();
   TProtocolVersion::type version = is_child_query ?
       TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version;
-  scoped_ptr<QueryResultSet> result_set(CreateHS2ResultSet(version,
-      *(exec_state->result_metadata()), &(fetch_results->results)));
+  scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet(
+      version, *(exec_state->result_metadata()), &(fetch_results->results)));
   RETURN_IF_ERROR(exec_state->FetchRows(fetch_size, result_set.get()));
   fetch_results->__isset.results = true;
   fetch_results->__set_hasMoreRows(!exec_state->eos());
@@ -763,7 +446,8 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
   // Optionally enable result caching on the QueryExecState.
   if (cache_num_rows > 0) {
     status = exec_state->SetResultCache(
-        CreateHS2ResultSet(session->hs2_version, *exec_state->result_metadata(), nullptr),
+        QueryResultSet::CreateHS2ResultSet(
+            session->hs2_version, *exec_state->result_metadata(), nullptr),
         cache_num_rows);
     if (!status.ok()) {
       UnregisterQuery(exec_state->query_id(), false, &status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/query-result-set.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc
new file mode 100644
index 0000000..3b17af7
--- /dev/null
+++ b/be/src/service/query-result-set.cc
@@ -0,0 +1,478 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/query-result-set.h"
+
+#include <sstream>
+#include <boost/scoped_ptr.hpp>
+
+#include "rpc/thrift-util.h"
+#include "runtime/raw-value.h"
+#include "runtime/types.h"
+#include "service/hs2-util.h"
+
+#include "common/names.h"
+
+using ThriftTColumn = apache::hive::service::cli::thrift::TColumn;
+using ThriftTColumnValue = apache::hive::service::cli::thrift::TColumnValue;
+using apache::hive::service::cli::thrift::TProtocolVersion;
+using apache::hive::service::cli::thrift::TRow;
+using apache::hive::service::cli::thrift::TRowSet;
+
+namespace {
+
+/// Ascii output precision for double/float
+constexpr int ASCII_PRECISION = 16;
+}
+
+namespace impala {
+
+/// Ascii result set for Beeswax. Rows are returned in ascii text encoding, using "\t" as
+/// column delimiter.
+class AsciiQueryResultSet : public QueryResultSet {
+ public:
+  /// Rows are added into 'rowset'.
+  AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)
+    : metadata_(metadata), result_set_(rowset) {}
+
+  virtual ~AsciiQueryResultSet() {}
+
+  /// Convert one row's expr values stored in 'col_values' to ASCII using "\t" as column
+  /// delimiter and store it in this result set.
+  /// TODO: Handle complex types.
+  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+
+  /// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
+  /// result set.
+  virtual Status AddOneRow(const TResultRow& row);
+
+  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+  virtual int64_t ByteSize(int start_idx, int num_rows);
+  virtual size_t size() { return result_set_->size(); }
+
+ private:
+  /// Metadata of the result set
+  const TResultSetMetadata& metadata_;
+
+  /// Points to the result set to be filled. Not owned by this object.
+  vector<string>* result_set_;
+};
+
+/// Result set container for Hive protocol versions >= V6, where results are returned in
+/// column-orientation.
+class HS2ColumnarResultSet : public QueryResultSet {
+ public:
+  HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset);
+
+  virtual ~HS2ColumnarResultSet(){};
+
+  /// Add a row of expr values
+  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+
+  /// Add a row from a TResultRow
+  virtual Status AddOneRow(const TResultRow& row);
+
+  /// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
+  /// from 'other' into this result set
+  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+
+  virtual int64_t ByteSize(int start_idx, int num_rows);
+  virtual size_t size() { return num_rows_; }
+
+ private:
+  /// Metadata of the result set
+  const TResultSetMetadata& metadata_;
+
+  /// Points to the TRowSet to be filled. The row set
+  /// this points to may be owned by
+  /// this object, in which case owned_result_set_ is set.
+  TRowSet* result_set_;
+
+  /// Set to result_set_ if result_set_ is owned.
+  boost::scoped_ptr<TRowSet> owned_result_set_;
+
+  int64_t num_rows_;
+
+  void InitColumns();
+};
+
+/// Row oriented result set for HiveServer2, used to serve HS2 requests with protocol
+/// version <= V5.
+class HS2RowOrientedResultSet : public QueryResultSet {
+ public:
+  /// Rows are added into rowset.
+  HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset);
+
+  virtual ~HS2RowOrientedResultSet() {}
+
+  /// Convert expr values to HS2 TRow and store it in a TRowSet.
+  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+
+  /// Convert TResultRow to HS2 TRow and store it in a TRowSet
+  virtual Status AddOneRow(const TResultRow& row);
+
+  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+  virtual int64_t ByteSize(int start_idx, int num_rows);
+  virtual size_t size() { return result_set_->rows.size(); }
+
+ private:
+  /// Metadata of the result set
+  const TResultSetMetadata& metadata_;
+
+  /// Points to the TRowSet to be filled. The row set
+  /// this points to may be owned by
+  /// this object, in which case owned_result_set_ is set.
+  TRowSet* result_set_;
+
+  /// Set to result_set_ if result_set_ is owned.
+  scoped_ptr<TRowSet> owned_result_set_;
+};
+
+QueryResultSet* QueryResultSet::CreateAsciiQueryResultSet(
+    const TResultSetMetadata& metadata, vector<string>* rowset) {
+  return new AsciiQueryResultSet(metadata, rowset);
+}
+
+QueryResultSet* QueryResultSet::CreateHS2ResultSet(
+    TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) {
+  if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
+    return new HS2RowOrientedResultSet(metadata, rowset);
+  } else {
+    return new HS2ColumnarResultSet(metadata, rowset);
+  }
+}
+
+//////////////////////////////////////////////////////////////////////////////////////////
+
+Status AsciiQueryResultSet::AddOneRow(
+    const vector<void*>& col_values, const vector<int>& scales) {
+  int num_col = col_values.size();
+  DCHECK_EQ(num_col, metadata_.columns.size());
+  stringstream out_stream;
+  out_stream.precision(ASCII_PRECISION);
+  for (int i = 0; i < num_col; ++i) {
+    // ODBC-187 - ODBC can only take "\t" as the delimiter
+    out_stream << (i > 0 ? "\t" : "");
+    DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
+    RawValue::PrintValue(col_values[i],
+        ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], &out_stream);
+  }
+  result_set_->push_back(out_stream.str());
+  return Status::OK();
+}
+
+Status AsciiQueryResultSet::AddOneRow(const TResultRow& row) {
+  int num_col = row.colVals.size();
+  DCHECK_EQ(num_col, metadata_.columns.size());
+  stringstream out_stream;
+  out_stream.precision(ASCII_PRECISION);
+  for (int i = 0; i < num_col; ++i) {
+    // ODBC-187 - ODBC can only take "\t" as the delimiter
+    out_stream << (i > 0 ? "\t" : "");
+    out_stream << row.colVals[i];
+  }
+  result_set_->push_back(out_stream.str());
+  return Status::OK();
+}
+
+int AsciiQueryResultSet::AddRows(
+    const QueryResultSet* other, int start_idx, int num_rows) {
+  const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other);
+  if (start_idx >= o->result_set_->size()) return 0;
+  const int rows_added =
+      min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx);
+  result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx,
+      o->result_set_->begin() + start_idx + rows_added);
+  return rows_added;
+}
+
+int64_t AsciiQueryResultSet::ByteSize(int start_idx, int num_rows) {
+  int64_t bytes = 0;
+  const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx);
+  for (int i = start_idx; i < start_idx + end; ++i) {
+    bytes += sizeof(result_set_[i]) + result_set_[i].capacity();
+  }
+  return bytes;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+// Utility functions for computing the size of HS2 Thrift structs in bytes.
+inline int64_t ByteSize(const ThriftTColumnValue& val) {
+  return sizeof(val) + val.stringVal.value.capacity();
+}
+
+int64_t ByteSize(const TRow& row) {
+  int64_t bytes = sizeof(row);
+  for (const ThriftTColumnValue& c : row.colVals) {
+    bytes += ByteSize(c);
+  }
+  return bytes;
+}
+
+// Returns the size, in bytes, of a Hive TColumn structure, only taking into account those
+// values in the range [start_idx, end_idx).
+uint32_t TColumnByteSize(const ThriftTColumn& col, uint32_t start_idx, uint32_t end_idx) {
+  DCHECK_LE(start_idx, end_idx);
+  uint32_t num_rows = end_idx - start_idx;
+  if (num_rows == 0) return 0L;
+
+  if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size();
+  if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size();
+  if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size();
+  if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size();
+  if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size();
+  if (col.__isset.doubleVal) {
+    return (num_rows * sizeof(double)) + col.doubleVal.nulls.size();
+  }
+  if (col.__isset.stringVal) {
+    uint32_t bytes = 0;
+    for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size();
+    return bytes + col.stringVal.nulls.size();
+  }
+
+  return 0;
+}
+}
+
+// Result set container for Hive protocol versions >= V6, where results are returned in
+// column-orientation.
+HS2ColumnarResultSet::HS2ColumnarResultSet(
+    const TResultSetMetadata& metadata, TRowSet* rowset)
+  : metadata_(metadata), result_set_(rowset), num_rows_(0) {
+  if (rowset == NULL) {
+    owned_result_set_.reset(new TRowSet());
+    result_set_ = owned_result_set_.get();
+  }
+  InitColumns();
+}
+
+// Add a row of expr values
+Status HS2ColumnarResultSet::AddOneRow(
+    const vector<void*>& col_values, const vector<int>& scales) {
+  int num_col = col_values.size();
+  DCHECK_EQ(num_col, metadata_.columns.size());
+  for (int i = 0; i < num_col; ++i) {
+    ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_,
+        &(result_set_->columns[i]));
+  }
+  ++num_rows_;
+  return Status::OK();
+}
+
+// Add a row from a TResultRow
+Status HS2ColumnarResultSet::AddOneRow(const TResultRow& row) {
+  int num_col = row.colVals.size();
+  DCHECK_EQ(num_col, metadata_.columns.size());
+  for (int i = 0; i < num_col; ++i) {
+    TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_,
+        &(result_set_->columns[i]));
+  }
+  ++num_rows_;
+  return Status::OK();
+}
+
+// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
+// from 'other' into this result set
+int HS2ColumnarResultSet::AddRows(
+    const QueryResultSet* other, int start_idx, int num_rows) {
+  const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other);
+  DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size());
+  if (start_idx >= o->num_rows_) return 0;
+  const int rows_added = min<int64_t>(num_rows, o->num_rows_ - start_idx);
+  for (int j = 0; j < metadata_.columns.size(); ++j) {
+    ThriftTColumn* from = &o->result_set_->columns[j];
+    ThriftTColumn* to = &result_set_->columns[j];
+    switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
+      case TPrimitiveType::NULL_TYPE:
+      case TPrimitiveType::BOOLEAN:
+        StitchNulls(
+            num_rows_, rows_added, start_idx, from->boolVal.nulls, &(to->boolVal.nulls));
+        to->boolVal.values.insert(to->boolVal.values.end(),
+            from->boolVal.values.begin() + start_idx,
+            from->boolVal.values.begin() + start_idx + rows_added);
+        break;
+      case TPrimitiveType::TINYINT:
+        StitchNulls(
+            num_rows_, rows_added, start_idx, from->byteVal.nulls, &(to->byteVal.nulls));
+        to->byteVal.values.insert(to->byteVal.values.end(),
+            from->byteVal.values.begin() + start_idx,
+            from->byteVal.values.begin() + start_idx + rows_added);
+        break;
+      case TPrimitiveType::SMALLINT:
+        StitchNulls(
+            num_rows_, rows_added, start_idx, from->i16Val.nulls, &(to->i16Val.nulls));
+        to->i16Val.values.insert(to->i16Val.values.end(),
+            from->i16Val.values.begin() + start_idx,
+            from->i16Val.values.begin() + start_idx + rows_added);
+        break;
+      case TPrimitiveType::INT:
+        StitchNulls(
+            num_rows_, rows_added, start_idx, from->i32Val.nulls, &(to->i32Val.nulls));
+        to->i32Val.values.insert(to->i32Val.values.end(),
+            from->i32Val.values.begin() + start_idx,
+            from->i32Val.values.begin() + start_idx + rows_added);
+        break;
+      case TPrimitiveType::BIGINT:
+        StitchNulls(
+            num_rows_, rows_added, start_idx, from->i64Val.nulls, &(to->i64Val.nulls));
+        to->i64Val.values.insert(to->i64Val.values.end(),
+            from->i64Val.values.begin() + start_idx,
+            from->i64Val.values.begin() + start_idx + rows_added);
+        break;
+      case TPrimitiveType::FLOAT:
+      case TPrimitiveType::DOUBLE:
+        StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls,
+            &(to->doubleVal.nulls));
+        to->doubleVal.values.insert(to->doubleVal.values.end(),
+            from->doubleVal.values.begin() + start_idx,
+            from->doubleVal.values.begin() + start_idx + rows_added);
+        break;
+      case TPrimitiveType::TIMESTAMP:
+      case TPrimitiveType::DECIMAL:
+      case TPrimitiveType::STRING:
+      case TPrimitiveType::VARCHAR:
+      case TPrimitiveType::CHAR:
+        StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls,
+            &(to->stringVal.nulls));
+        to->stringVal.values.insert(to->stringVal.values.end(),
+            from->stringVal.values.begin() + start_idx,
+            from->stringVal.values.begin() + start_idx + rows_added);
+        break;
+      default:
+        DCHECK(false) << "Unsupported type: "
+                      << TypeToString(ThriftToType(
+                             metadata_.columns[j].columnType.types[0].scalar_type.type));
+        break;
+    }
+  }
+  num_rows_ += rows_added;
+  return rows_added;
+}
+
+int64_t HS2ColumnarResultSet::ByteSize(int start_idx, int num_rows) {
+  const int end = min(start_idx + num_rows, (int)size());
+  int64_t bytes = 0L;
+  for (const ThriftTColumn& c : result_set_->columns) {
+    bytes += TColumnByteSize(c, start_idx, end);
+  }
+  return bytes;
+}
+
+void HS2ColumnarResultSet::InitColumns() {
+  result_set_->__isset.columns = true;
+  for (const TColumn& col : metadata_.columns) {
+    DCHECK(col.columnType.types.size() == 1)
+        << "Structured columns unsupported in HS2 interface";
+    ThriftTColumn column;
+    switch (col.columnType.types[0].scalar_type.type) {
+      case TPrimitiveType::NULL_TYPE:
+      case TPrimitiveType::BOOLEAN:
+        column.__isset.boolVal = true;
+        break;
+      case TPrimitiveType::TINYINT:
+        column.__isset.byteVal = true;
+        break;
+      case TPrimitiveType::SMALLINT:
+        column.__isset.i16Val = true;
+        break;
+      case TPrimitiveType::INT:
+        column.__isset.i32Val = true;
+        break;
+      case TPrimitiveType::BIGINT:
+        column.__isset.i64Val = true;
+        break;
+      case TPrimitiveType::FLOAT:
+      case TPrimitiveType::DOUBLE:
+        column.__isset.doubleVal = true;
+        break;
+      case TPrimitiveType::TIMESTAMP:
+      case TPrimitiveType::DECIMAL:
+      case TPrimitiveType::VARCHAR:
+      case TPrimitiveType::CHAR:
+      case TPrimitiveType::STRING:
+        column.__isset.stringVal = true;
+        break;
+      default:
+        DCHECK(false) << "Unhandled column type: "
+                      << TypeToString(
+                             ThriftToType(col.columnType.types[0].scalar_type.type));
+    }
+    result_set_->columns.push_back(column);
+  }
+}
+
+HS2RowOrientedResultSet::HS2RowOrientedResultSet(
+    const TResultSetMetadata& metadata, TRowSet* rowset)
+  : metadata_(metadata), result_set_(rowset) {
+  if (rowset == NULL) {
+    owned_result_set_.reset(new TRowSet());
+    result_set_ = owned_result_set_.get();
+  }
+}
+
+Status HS2RowOrientedResultSet::AddOneRow(
+    const vector<void*>& col_values, const vector<int>& scales) {
+  int num_col = col_values.size();
+  DCHECK_EQ(num_col, metadata_.columns.size());
+  result_set_->rows.push_back(TRow());
+  TRow& trow = result_set_->rows.back();
+  trow.colVals.resize(num_col);
+  for (int i = 0; i < num_col; ++i) {
+    ExprValueToHS2TColumnValue(
+        col_values[i], metadata_.columns[i].columnType, &(trow.colVals[i]));
+  }
+  return Status::OK();
+}
+
+Status HS2RowOrientedResultSet::AddOneRow(const TResultRow& row) {
+  int num_col = row.colVals.size();
+  DCHECK_EQ(num_col, metadata_.columns.size());
+  result_set_->rows.push_back(TRow());
+  TRow& trow = result_set_->rows.back();
+  trow.colVals.resize(num_col);
+  for (int i = 0; i < num_col; ++i) {
+    TColumnValueToHS2TColumnValue(
+        row.colVals[i], metadata_.columns[i].columnType, &(trow.colVals[i]));
+  }
+  return Status::OK();
+}
+
+int HS2RowOrientedResultSet::AddRows(
+    const QueryResultSet* other, int start_idx, int num_rows) {
+  const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other);
+  if (start_idx >= o->result_set_->rows.size()) return 0;
+  const int rows_added =
+      min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx);
+  for (int i = start_idx; i < start_idx + rows_added; ++i) {
+    result_set_->rows.push_back(o->result_set_->rows[i]);
+  }
+  return rows_added;
+}
+
+int64_t HS2RowOrientedResultSet::ByteSize(int start_idx, int num_rows) {
+  int64_t bytes = 0;
+  const int end =
+      min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx);
+  for (int i = start_idx; i < start_idx + end; ++i) {
+    bytes += impala::ByteSize(result_set_->rows[i]);
+  }
+  return bytes;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/query-result-set.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h
index b444ca3..e0c88d7 100644
--- a/be/src/service/query-result-set.h
+++ b/be/src/service/query-result-set.h
@@ -20,15 +20,17 @@
 
 #include "common/status.h"
 #include "gen-cpp/Data_types.h"
+#include "gen-cpp/Results_types.h"
+#include "gen-cpp/TCLIService_types.h"
 
 #include <vector>
 
 namespace impala {
 
-/// Stores client-ready query result rows returned by
-/// QueryExecState::FetchRows(). Subclasses implement AddRows() / AddOneRow() to
-/// specialise how Impala's row batches are converted to client-API result
-/// representations.
+/// Wraps a client-API specific result representation, and implements the logic required
+/// to translate into that format from Impala's row format.
+///
+/// Subclasses implement AddRows() / AddOneRow() to specialise that logic.
 class QueryResultSet {
  public:
   QueryResultSet() {}
@@ -58,6 +60,17 @@ class QueryResultSet {
 
   /// Returns the size of this result set in number of rows.
   virtual size_t size() = 0;
+
+  /// Returns a result set suitable for Beeswax-based clients.
+  static QueryResultSet* CreateAsciiQueryResultSet(
+      const TResultSetMetadata& metadata, std::vector<std::string>* rowset);
+
+  /// Returns a result set suitable for HS2-based clients. If 'rowset' is nullptr, the
+  /// returned object will allocate and manage its own rowset.
+  static QueryResultSet* CreateHS2ResultSet(
+      apache::hive::service::cli::thrift::TProtocolVersion::type version,
+      const TResultSetMetadata& metadata,
+      apache::hive::service::cli::thrift::TRowSet* rowset);
 };
 }