You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2018/11/19 10:45:30 UTC

[23/33] impala git commit: IMPALA-7477: Batch-oriented query set construction

IMPALA-7477: Batch-oriented query set construction

Rework the row-by-row construction of query result sets in PlanRootSink
so that it materialises an output column at a time. Make some minor
optimisations like preallocating output vectors and initialising
strings more efficiently.

My intent is both to make this faster and to make the QueryResultSet
interface better before IMPALA-4268 does a bunch of surgery on this
part of the code.

Testing:
Ran core tests.

Perf:
Downloaded tpch_parquet.orders via JDBC driver.
Before: 3.01s, After: 2.57s.

Downloaded l_orderkey from tpch_parquet.lineitem.
Before: 1.21s, After: 1.08s.

Change-Id: I764fa302842438902cd5db2551ec6e3cb77b6874
Reviewed-on: http://gerrit.cloudera.org:8080/11879
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0f63b2c9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0f63b2c9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0f63b2c9

Branch: refs/heads/branch-3.1.0
Commit: 0f63b2c9f9d62b0d22191f454b672a6047206252
Parents: d8b1e43
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sun Aug 19 00:48:47 2018 -0700
Committer: Zoltan Borok-Nagy <bo...@cloudera.com>
Committed: Tue Nov 13 12:51:39 2018 +0100

----------------------------------------------------------------------
 be/src/exec/plan-root-sink.cc      |  26 +--
 be/src/exec/plan-root-sink.h       |   4 -
 be/src/service/hs2-util.cc         | 318 ++++++++++++++++++++++++--------
 be/src/service/hs2-util.h          |  15 +-
 be/src/service/query-result-set.cc | 115 +++++++-----
 be/src/service/query-result-set.h  |  14 +-
 6 files changed, 331 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index a64dbb9..1f5b2e5 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -79,22 +79,11 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
 
     // Otherwise the consumer is ready. Fill out the rows.
     DCHECK(results_ != nullptr);
-    // List of expr values to hold evaluated rows from the query
-    vector<void*> result_row;
-    result_row.resize(output_exprs_.size());
-
-    // List of scales for floating point values in result_row
-    vector<int> scales;
-    scales.resize(result_row.size());
-
     int num_to_fetch = batch->num_rows() - current_batch_row;
     if (num_rows_requested_ > 0) num_to_fetch = min(num_to_fetch, num_rows_requested_);
-    for (int i = 0; i < num_to_fetch; ++i) {
-      TupleRow* row = batch->GetRow(current_batch_row);
-      GetRowValue(row, &result_row, &scales);
-      RETURN_IF_ERROR(results_->AddOneRow(result_row, scales));
-      ++current_batch_row;
-    }
+    RETURN_IF_ERROR(
+        results_->AddRows(output_expr_evals_, batch, current_batch_row, num_to_fetch));
+    current_batch_row += num_to_fetch;
     // Prevent expr result allocations from accumulating.
     expr_results_pool_->Clear();
     // Signal the consumer.
@@ -146,13 +135,4 @@ Status PlanRootSink::GetNext(
   *eos = sender_state_ == SenderState::EOS;
   return state->GetQueryStatus();
 }
-
-void PlanRootSink::GetRowValue(
-    TupleRow* row, vector<void*>* result, vector<int>* scales) {
-  DCHECK_GE(result->size(), output_expr_evals_.size());
-  for (int i = 0; i < output_expr_evals_.size(); ++i) {
-    (*result)[i] = output_expr_evals_[i]->GetValue(row);
-    (*scales)[i] = output_expr_evals_[i]->output_scale();
-  }
-}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 1d64b21..300c993 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -118,10 +118,6 @@ class PlanRootSink : public DataSink {
 
   /// Set by GetNext() to indicate to Send() how many rows it should write to results_.
   int num_rows_requested_ = 0;
-
-  /// Writes a single row into 'result' and 'scales' by evaluating
-  /// output_expr_evals_ over 'row'.
-  void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);
 };
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/hs2-util.cc
----------------------------------------------------------------------
diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc
index b856556..66a76bd 100644
--- a/be/src/service/hs2-util.cc
+++ b/be/src/service/hs2-util.cc
@@ -18,9 +18,12 @@
 #include "service/hs2-util.h"
 
 #include "common/logging.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
 #include "runtime/types.h"
+#include "util/bit-util.h"
 
 #include <gutil/strings/substitute.h>
 
@@ -49,7 +52,9 @@ inline bool GetNullBit(const string& nulls, uint32_t row_idx) {
 
 void impala::StitchNulls(uint32_t num_rows_before, uint32_t num_rows_added,
     uint32_t start_idx, const string& from, string* to) {
-  to->reserve((num_rows_before + num_rows_added + 7) / 8);
+  // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated
+  // small increases in size.
+  to->reserve(BitUtil::RoundUpToPowerOfTwo((num_rows_before + num_rows_added + 7) / 8));
 
   // TODO: This is very inefficient, since we could conceivably go one byte at a time
   // (although the operands should stay live in registers in the loop). However doing this
@@ -118,106 +123,257 @@ void impala::TColumnValueToHS2TColumn(const TColumnValue& col_val,
   SetNullBit(row_idx, is_null, nulls);
 }
 
+// Specialised per-type implementations of ExprValuesToHS2TColumn.
+
+// Helper to reserve space in hs2Vals->values and hs2Vals->nulls for the values that the
+// different implementations of ExprValuesToHS2TColumn will write.
+template <typename T>
+void ReserveSpace(int start_idx, int num_rows, uint32_t output_row_idx, T* hs2Vals) {
+  int64_t num_output_rows = output_row_idx + num_rows - start_idx;
+  int64_t num_null_bytes = BitUtil::RoundUpNumBytes(num_output_rows);
+  // Round up reserve() arguments to power-of-two to avoid accidentally quadratic
+  // behaviour from repeated small increases in size.
+  hs2Vals->values.reserve(BitUtil::RoundUpToPowerOfTwo(num_output_rows));
+  hs2Vals->nulls.reserve(BitUtil::RoundUpToPowerOfTwo(num_null_bytes));
+}
+
+// Implementation for BOOL.
+static void BoolExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->boolVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    BooleanVal val = expr_eval->GetBooleanVal(it.Get());
+    column->boolVal.values.push_back(val.val);
+    SetNullBit(output_row_idx, val.is_null, &column->boolVal.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for TINYINT.
+static void TinyIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->byteVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    TinyIntVal val = expr_eval->GetTinyIntVal(it.Get());
+    column->byteVal.values.push_back(val.val);
+    SetNullBit(output_row_idx, val.is_null, &column->byteVal.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for SMALLINT.
+static void SmallIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+    RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->i16Val);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    SmallIntVal val = expr_eval->GetSmallIntVal(it.Get());
+    column->i16Val.values.push_back(val.val);
+    SetNullBit(output_row_idx, val.is_null, &column->i16Val.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for INT.
+static void IntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->i32Val);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    IntVal val = expr_eval->GetIntVal(it.Get());
+    column->i32Val.values.push_back(val.val);
+    SetNullBit(output_row_idx, val.is_null, &column->i32Val.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for BIGINT.
+static void BigIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->i64Val);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    BigIntVal val = expr_eval->GetBigIntVal(it.Get());
+    column->i64Val.values.push_back(val.val);
+    SetNullBit(output_row_idx, val.is_null, &column->i64Val.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for FLOAT.
+static void FloatExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    FloatVal val = expr_eval->GetFloatVal(it.Get());
+    column->doubleVal.values.push_back(val.val);
+    SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for DOUBLE.
+static void DoubleExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    DoubleVal val = expr_eval->GetDoubleVal(it.Get());
+    column->doubleVal.values.push_back(val.val);
+    SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for TIMESTAMP.
+static void TimestampExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+    RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    TimestampVal val = expr_eval->GetTimestampVal(it.Get());
+    column->stringVal.values.emplace_back();
+    if (!val.is_null) {
+      TimestampValue value = TimestampValue::FromTimestampVal(val);
+      RawValue::PrintValue(
+          &value, TYPE_TIMESTAMP, -1, &(column->stringVal.values.back()));
+    }
+    SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for STRING and VARCHAR.
+static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    StringVal val = expr_eval->GetStringVal(it.Get());
+    if (val.is_null) {
+      column->stringVal.values.emplace_back();
+    } else {
+      column->stringVal.values.emplace_back(reinterpret_cast<char*>(val.ptr), val.len);
+    }
+    SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+    ++output_row_idx;
+  }
+}
+
+// Implementation for CHAR.
+static void CharExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+    const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
+    uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+  ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    StringVal val = expr_eval->GetStringVal(it.Get());
+    if (val.is_null) {
+      column->stringVal.values.emplace_back();
+    } else {
+      column->stringVal.values.emplace_back(
+          reinterpret_cast<const char*>(val.ptr), char_type.len);
+    }
+    SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+    ++output_row_idx;
+  }
+}
+
+static void DecimalExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+    const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
+    uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    DecimalVal val = expr_eval->GetDecimalVal(it.Get());
+    const ColumnType& decimalType = ColumnType::FromThrift(type);
+    if (val.is_null) {
+      column->stringVal.values.emplace_back();
+    } else {
+      switch (decimalType.GetByteSize()) {
+        case 4:
+          column->stringVal.values.emplace_back(
+              Decimal4Value(val.val4).ToString(decimalType));
+          break;
+        case 8:
+          column->stringVal.values.emplace_back(
+              Decimal8Value(val.val8).ToString(decimalType));
+          break;
+        case 16:
+          column->stringVal.values.emplace_back(
+              Decimal16Value(val.val16).ToString(decimalType));
+          break;
+        default:
+          DCHECK(false) << "bad type: " << decimalType;
+      }
+    }
+    SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+    ++output_row_idx;
+  }
+}
+
 // For V6 and above
-void impala::ExprValueToHS2TColumn(const void* value, const TColumnType& type,
-    uint32_t row_idx, thrift::TColumn* column) {
-  string* nulls;
+void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+    const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
+    uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) {
+  // Dispatch to a templated function for the loop over rows. This avoids branching on
+  // the type for every row.
+  // TODO: instead of relying on stamped out implementations, we could codegen this loop
+  // to inline the expression evaluation into the loop body.
   switch (type.types[0].scalar_type.type) {
     case TPrimitiveType::NULL_TYPE:
     case TPrimitiveType::BOOLEAN:
-      column->boolVal.values.push_back(
-          value == NULL ? false : *reinterpret_cast<const bool*>(value));
-      nulls = &column->boolVal.nulls;
-      break;
+      BoolExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::TINYINT:
-      column->byteVal.values.push_back(
-          value == NULL ? 0 : *reinterpret_cast<const int8_t*>(value));
-      nulls = &column->byteVal.nulls;
-      break;
+      TinyIntExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::SMALLINT:
-      column->i16Val.values.push_back(
-          value == NULL ? 0 : *reinterpret_cast<const int16_t*>(value));
-      nulls = &column->i16Val.nulls;
-      break;
+      SmallIntExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::INT:
-      column->i32Val.values.push_back(
-          value == NULL ? 0 : *reinterpret_cast<const int32_t*>(value));
-      nulls = &column->i32Val.nulls;
-      break;
+      IntExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::BIGINT:
-      column->i64Val.values.push_back(
-          value == NULL ? 0 : *reinterpret_cast<const int64_t*>(value));
-      nulls = &column->i64Val.nulls;
-      break;
+      BigIntExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::FLOAT:
-      column->doubleVal.values.push_back(
-          value == NULL ? 0.f : *reinterpret_cast<const float*>(value));
-      nulls = &column->doubleVal.nulls;
-      break;
+      FloatExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::DOUBLE:
-      column->doubleVal.values.push_back(
-          value == NULL ? 0.0 : *reinterpret_cast<const double*>(value));
-      nulls = &column->doubleVal.nulls;
-      break;
+      DoubleExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::TIMESTAMP:
-      column->stringVal.values.push_back("");
-      if (value != NULL) {
-        RawValue::PrintValue(value, TYPE_TIMESTAMP, -1,
-            &(column->stringVal.values.back()));
-      }
-      nulls = &column->stringVal.nulls;
-      break;
+      TimestampExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::STRING:
     case TPrimitiveType::VARCHAR:
-      column->stringVal.values.push_back("");
-      if (value != NULL) {
-        const StringValue* str_val = reinterpret_cast<const StringValue*>(value);
-        column->stringVal.values.back().assign(
-            static_cast<char*>(str_val->ptr), str_val->len);
-      }
-      nulls = &column->stringVal.nulls;
-      break;
+      StringExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::CHAR:
-      column->stringVal.values.push_back("");
-      if (value != NULL) {
-        ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len);
-        column->stringVal.values.back().assign(
-            reinterpret_cast<const char*>(value), char_type.len);
-      }
-      nulls = &column->stringVal.nulls;
-      break;
+      CharExprValuesToHS2TColumn(
+          expr_eval, type, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::DECIMAL: {
-      // HiveServer2 requires decimal to be presented as string.
-      column->stringVal.values.push_back("");
-      const ColumnType& decimalType = ColumnType::FromThrift(type);
-      if (value != NULL) {
-        switch (decimalType.GetByteSize()) {
-          case 4:
-            column->stringVal.values.back() =
-                reinterpret_cast<const Decimal4Value*>(value)->ToString(decimalType);
-            break;
-          case 8:
-            column->stringVal.values.back() =
-                reinterpret_cast<const Decimal8Value*>(value)->ToString(decimalType);
-            break;
-          case 16:
-            column->stringVal.values.back() =
-                reinterpret_cast<const Decimal16Value*>(value)->ToString(decimalType);
-            break;
-          default:
-            DCHECK(false) << "bad type: " << decimalType;
-        }
-      }
-      nulls = &column->stringVal.nulls;
-      break;
+      DecimalExprValuesToHS2TColumn(
+          expr_eval, type, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     }
     default:
       DCHECK(false) << "Unhandled type: "
                     << TypeToString(ThriftToType(type.types[0].scalar_type.type));
-      return;
   }
-
-  SetNullBit(row_idx, (value == NULL), nulls);
 }
 
 // For V1 -> V5

http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/hs2-util.h
----------------------------------------------------------------------
diff --git a/be/src/service/hs2-util.h b/be/src/service/hs2-util.h
index 44ceba6..4f0f973 100644
--- a/be/src/service/hs2-util.h
+++ b/be/src/service/hs2-util.h
@@ -20,16 +20,23 @@
 
 namespace impala {
 
-/// Utility methods for converting from Impala (either an Expr result or a TColumnValue) to
-/// Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->).
+class RowBatch;
+class ScalarExprEvaluator;
+
+/// Utility methods for converting from Impala (either an Expr result or a TColumnValue)
+/// to Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->).
 
 /// For V6->
 void TColumnValueToHS2TColumn(const TColumnValue& col_val, const TColumnType& type,
     uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column);
 
+/// Evaluate 'expr_eval' over the row [start_idx, start_idx + num_rows) from 'batch' into
+/// 'column' with 'type' starting at output_row_idx. The caller is responsible for
+/// calling RuntimeState::GetQueryStatus() to check for expression evaluation errors.
 /// For V6->
-void ExprValueToHS2TColumn(const void* value, const TColumnType& type,
-    uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column);
+void ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type,
+    RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column);
 
 /// For V1->V5
 void TColumnValueToHS2TColumnValue(const TColumnValue& col_val, const TColumnType& type,

http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/query-result-set.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc
index aacd849..f254176 100644
--- a/be/src/service/query-result-set.cc
+++ b/be/src/service/query-result-set.cc
@@ -20,10 +20,13 @@
 #include <sstream>
 #include <boost/scoped_ptr.hpp>
 
+#include "exprs/scalar-expr-evaluator.h"
 #include "rpc/thrift-util.h"
 #include "runtime/raw-value.h"
+#include "runtime/row-batch.h"
 #include "runtime/types.h"
 #include "service/hs2-util.h"
+#include "util/bit-util.h"
 
 #include "common/names.h"
 
@@ -51,18 +54,19 @@ class AsciiQueryResultSet : public QueryResultSet {
 
   virtual ~AsciiQueryResultSet() {}
 
-  /// Convert one row's expr values stored in 'col_values' to ASCII using "\t" as column
+  /// Evaluate 'expr_evals' over rows in 'batch', convert to ASCII using "\t" as column
   /// delimiter and store it in this result set.
   /// TODO: Handle complex types.
-  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+  virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
+      int start_idx, int num_rows) override;
 
   /// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
   /// result set.
-  virtual Status AddOneRow(const TResultRow& row);
+  virtual Status AddOneRow(const TResultRow& row) override;
 
-  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
-  virtual int64_t ByteSize(int start_idx, int num_rows);
-  virtual size_t size() { return result_set_->size(); }
+  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
+  virtual int64_t ByteSize(int start_idx, int num_rows) override;
+  virtual size_t size() override { return result_set_->size(); }
 
  private:
   /// Metadata of the result set
@@ -80,18 +84,20 @@ class HS2ColumnarResultSet : public QueryResultSet {
 
   virtual ~HS2ColumnarResultSet() {}
 
-  /// Add a row of expr values
-  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+  /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 columnar
+  /// representation.
+  virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
+      int start_idx, int num_rows) override;
 
   /// Add a row from a TResultRow
-  virtual Status AddOneRow(const TResultRow& row);
+  virtual Status AddOneRow(const TResultRow& row) override;
 
   /// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
   /// from 'other' into this result set
-  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
+  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
 
-  virtual int64_t ByteSize(int start_idx, int num_rows);
-  virtual size_t size() { return num_rows_; }
+  virtual int64_t ByteSize(int start_idx, int num_rows) override;
+  virtual size_t size() override { return num_rows_; }
 
  private:
   /// Metadata of the result set
@@ -119,15 +125,17 @@ class HS2RowOrientedResultSet : public QueryResultSet {
 
   virtual ~HS2RowOrientedResultSet() {}
 
-  /// Convert expr values to HS2 TRow and store it in a TRowSet.
-  virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales);
+  /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 row-oriented
+  /// representation of TRows stored in a TRowSet.
+  virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
+      int start_idx, int num_rows) override;
 
   /// Convert TResultRow to HS2 TRow and store it in a TRowSet
-  virtual Status AddOneRow(const TResultRow& row);
+  virtual Status AddOneRow(const TResultRow& row) override;
 
-  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows);
-  virtual int64_t ByteSize(int start_idx, int num_rows);
-  virtual size_t size() { return result_set_->rows.size(); }
+  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
+  virtual int64_t ByteSize(int start_idx, int num_rows) override;
+  virtual size_t size() override { return result_set_->rows.size(); }
 
  private:
   /// Metadata of the result set
@@ -158,20 +166,34 @@ QueryResultSet* QueryResultSet::CreateHS2ResultSet(
 
 //////////////////////////////////////////////////////////////////////////////////////////
 
-Status AsciiQueryResultSet::AddOneRow(
-    const vector<void*>& col_values, const vector<int>& scales) {
-  int num_col = col_values.size();
+Status AsciiQueryResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
+    RowBatch* batch, int start_idx, int num_rows) {
+  DCHECK_GE(batch->num_rows(), start_idx + num_rows);
+  int num_col = expr_evals.size();
   DCHECK_EQ(num_col, metadata_.columns.size());
+  vector<int> scales;
+  scales.reserve(num_col);
+  for (ScalarExprEvaluator* expr_eval : expr_evals) {
+    scales.push_back(expr_eval->output_scale());
+  }
+  // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated
+  // small increases in size.
+  result_set_->reserve(
+      BitUtil::RoundUpToPowerOfTwo(result_set_->size() + num_rows - start_idx));
   stringstream out_stream;
   out_stream.precision(ASCII_PRECISION);
-  for (int i = 0; i < num_col; ++i) {
-    // ODBC-187 - ODBC can only take "\t" as the delimiter
-    out_stream << (i > 0 ? "\t" : "");
-    DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
-    RawValue::PrintValue(col_values[i],
-        ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], &out_stream);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    for (int i = 0; i < num_col; ++i) {
+      // ODBC-187 - ODBC can only take "\t" as the delimiter
+      out_stream << (i > 0 ? "\t" : "");
+      DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
+      RawValue::PrintValue(expr_evals[i]->GetValue(it.Get()),
+          ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i],
+          &out_stream);
+    }
+    result_set_->push_back(out_stream.str());
+    out_stream.str("");
   }
-  result_set_->push_back(out_stream.str());
   return Status::OK();
 }
 
@@ -263,16 +285,18 @@ HS2ColumnarResultSet::HS2ColumnarResultSet(
   InitColumns();
 }
 
-// Add a row of expr values
-Status HS2ColumnarResultSet::AddOneRow(
-    const vector<void*>& col_values, const vector<int>& scales) {
-  int num_col = col_values.size();
+Status HS2ColumnarResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
+    RowBatch* batch, int start_idx, int num_rows) {
+  DCHECK_GE(batch->num_rows(), start_idx + num_rows);
+  int num_col = expr_evals.size();
   DCHECK_EQ(num_col, metadata_.columns.size());
   for (int i = 0; i < num_col; ++i) {
-    ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_,
+    const TColumnType& type = metadata_.columns[i].columnType;
+    ScalarExprEvaluator* expr_eval = expr_evals[i];
+    ExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, num_rows_,
         &(result_set_->columns[i]));
   }
-  ++num_rows_;
+  num_rows_ += num_rows;
   return Status::OK();
 }
 
@@ -427,16 +451,21 @@ HS2RowOrientedResultSet::HS2RowOrientedResultSet(
   }
 }
 
-Status HS2RowOrientedResultSet::AddOneRow(
-    const vector<void*>& col_values, const vector<int>& scales) {
-  int num_col = col_values.size();
+Status HS2RowOrientedResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
+    RowBatch* batch, int start_idx, int num_rows) {
+  DCHECK_GE(batch->num_rows(), start_idx + num_rows);
+  int num_col = expr_evals.size();
   DCHECK_EQ(num_col, metadata_.columns.size());
-  result_set_->rows.push_back(TRow());
-  TRow& trow = result_set_->rows.back();
-  trow.colVals.resize(num_col);
-  for (int i = 0; i < num_col; ++i) {
-    ExprValueToHS2TColumnValue(
-        col_values[i], metadata_.columns[i].columnType, &(trow.colVals[i]));
+  result_set_->rows.reserve(
+      BitUtil::RoundUpToPowerOfTwo(result_set_->rows.size() + num_rows - start_idx));
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    result_set_->rows.push_back(TRow());
+    TRow& trow = result_set_->rows.back();
+    trow.colVals.resize(num_col);
+    for (int i = 0; i < num_col; ++i) {
+      ExprValueToHS2TColumnValue(expr_evals[i]->GetValue(it.Get()),
+          metadata_.columns[i].columnType, &(trow.colVals[i]));
+    }
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/query-result-set.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h
index e0c88d7..fa39d73 100644
--- a/be/src/service/query-result-set.h
+++ b/be/src/service/query-result-set.h
@@ -27,6 +27,9 @@
 
 namespace impala {
 
+class RowBatch;
+class ScalarExprEvaluator;
+
 /// Wraps a client-API specific result representation, and implements the logic required
 /// to translate into that format from Impala's row format.
 ///
@@ -36,12 +39,11 @@ class QueryResultSet {
   QueryResultSet() {}
   virtual ~QueryResultSet() {}
 
-  /// Add a single row to this result set. The row is a vector of pointers to values,
-  /// whose memory belongs to the caller. 'scales' contains the scales for decimal values
-  /// (# of digits after decimal), with -1 indicating no scale specified or the
-  /// corresponding value is not a decimal.
-  virtual Status AddOneRow(
-      const std::vector<void*>& row, const std::vector<int>& scales) = 0;
+  /// Add 'num_rows' rows to the result set, obtained by evaluating 'expr_evals' over
+  /// the rows in 'batch' starting at start_idx. Batch must contain at least
+  /// ('start_idx' + 'num_rows') rows.
+  virtual Status AddRows(const std::vector<ScalarExprEvaluator*>& expr_evals,
+      RowBatch* batch, int start_idx, int num_rows) = 0;
 
   /// Add the TResultRow to this result set. When a row comes from a DDL/metadata
   /// operation, the row in the form of TResultRow.