You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ra...@apache.org on 2019/03/04 09:00:30 UTC
[arrow] branch master updated: ARROW-3511: [Gandiva] Link filter
and project operations
This is an automated email from the ASF dual-hosted git repository.
ravindra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a1a8222 ARROW-3511: [Gandiva] Link filter and project operations
a1a8222 is described below
commit a1a8222f081e4be475dcde96f8c4eeb438d8f094
Author: praveenkrishna <pr...@tutanota.com>
AuthorDate: Mon Mar 4 14:30:00 2019 +0530
ARROW-3511: [Gandiva] Link filter and project operations
This patch enables gandiva to link filter node to a project operator. We will be sending the selection vector populated from the filter to a project node so the project operation is performed only on the filtered positions.
Author: praveenkrishna <pr...@tutanota.com>
Closes #2789 from Praveen2112/ARROW-3511 and squashes the following commits:
ee347c63 <praveenkrishna> ARROW-3511: Link filter and project operations
---
cpp/src/gandiva/compiled_expr.h | 37 +--
cpp/src/gandiva/jni/jni_common.cc | 38 ++-
cpp/src/gandiva/llvm_generator.cc | 118 ++++++---
cpp/src/gandiva/llvm_generator.h | 25 +-
cpp/src/gandiva/llvm_generator_test.cc | 5 +-
cpp/src/gandiva/projector.cc | 46 ++--
cpp/src/gandiva/projector.h | 23 ++
cpp/src/gandiva/proto/Types.proto | 5 +-
cpp/src/gandiva/selection_vector.cc | 26 +-
cpp/src/gandiva/selection_vector.h | 35 +++
cpp/src/gandiva/selection_vector_impl.h | 32 ++-
cpp/src/gandiva/tests/filter_project_test.cc | 272 +++++++++++++++++++++
.../apache/arrow/gandiva/evaluator/JniWrapper.java | 2 +
.../apache/arrow/gandiva/evaluator/Projector.java | 60 ++++-
.../arrow/gandiva/evaluator/FilterProjectTest.java | 106 ++++++++
15 files changed, 738 insertions(+), 92 deletions(-)
diff --git a/cpp/src/gandiva/compiled_expr.h b/cpp/src/gandiva/compiled_expr.h
index b7799f1..c131a2c 100644
--- a/cpp/src/gandiva/compiled_expr.h
+++ b/cpp/src/gandiva/compiled_expr.h
@@ -18,33 +18,42 @@
#ifndef GANDIVA_COMPILED_EXPR_H
#define GANDIVA_COMPILED_EXPR_H
+#include <vector>
#include "gandiva/llvm_includes.h"
+#include "gandiva/selection_vector.h"
#include "gandiva/value_validity_pair.h"
namespace gandiva {
using EvalFunc = int (*)(uint8_t** buffers, uint8_t** local_bitmaps,
- int64_t execution_ctx_ptr, int64_t record_count);
+ const uint8_t* selection_buffer, int64_t execution_ctx_ptr,
+ int64_t record_count);
/// \brief Tracks the compiled state for one expression.
class CompiledExpr {
public:
- CompiledExpr(ValueValidityPairPtr value_validity, FieldDescriptorPtr output,
- llvm::Function* ir_function)
- : value_validity_(value_validity),
- output_(output),
- ir_function_(ir_function),
- jit_function_(NULL) {}
+ CompiledExpr(ValueValidityPairPtr value_validity, FieldDescriptorPtr output)
+ : value_validity_(value_validity), output_(output) {}
ValueValidityPairPtr value_validity() const { return value_validity_; }
FieldDescriptorPtr output() const { return output_; }
- llvm::Function* ir_function() const { return ir_function_; }
+ void SetIRFunction(SelectionVector::Mode mode, llvm::Function* ir_function) {
+ ir_functions_[static_cast<int>(mode)] = ir_function;
+ }
- EvalFunc jit_function() const { return jit_function_; }
+ llvm::Function* GetIRFunction(SelectionVector::Mode mode) const {
+ return ir_functions_[static_cast<int>(mode)];
+ }
- void set_jit_function(EvalFunc jit_function) { jit_function_ = jit_function; }
+ void SetJITFunction(SelectionVector::Mode mode, EvalFunc jit_function) {
+ jit_functions_[static_cast<int>(mode)] = jit_function;
+ }
+
+ EvalFunc GetJITFunction(SelectionVector::Mode mode) const {
+ return jit_functions_[static_cast<int>(mode)];
+ }
private:
// value & validities for the expression tree (root)
@@ -53,11 +62,11 @@ class CompiledExpr {
// output field
FieldDescriptorPtr output_;
- // IR function in the generated code
- llvm::Function* ir_function_;
+ // IR functions for various modes in the generated code
+ std::array<llvm::Function*, SelectionVector::kNumModes> ir_functions_;
- // JIT function in the generated code (set after the module is optimised and finalized)
- EvalFunc jit_function_;
+ // JIT functions in the generated code (set after the module is optimised and finalized)
+ std::array<EvalFunc, SelectionVector::kNumModes> jit_functions_;
};
} // namespace gandiva
diff --git a/cpp/src/gandiva/jni/jni_common.cc b/cpp/src/gandiva/jni/jni_common.cc
index 1ba3aad..72061c0 100644
--- a/cpp/src/gandiva/jni/jni_common.cc
+++ b/cpp/src/gandiva/jni/jni_common.cc
@@ -37,6 +37,7 @@
#include "gandiva/jni/id_to_module_map.h"
#include "gandiva/jni/module_holder.h"
#include "gandiva/projector.h"
+#include "gandiva/selection_vector.h"
#include "gandiva/tree_expr_builder.h"
#include "jni/org_apache_arrow_gandiva_evaluator_JniWrapper.h"
@@ -585,7 +586,8 @@ err_out:
JNIEXPORT void JNICALL
Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
JNIEnv* env, jobject cls, jlong module_id, jint num_rows, jlongArray buf_addrs,
- jlongArray buf_sizes, jlongArray out_buf_addrs, jlongArray out_buf_sizes) {
+ jlongArray buf_sizes, jint sel_vec_type, jint sel_vec_rows, jlong sel_vec_addr,
+ jlong sel_vec_size, jlongArray out_buf_addrs, jlongArray out_buf_sizes) {
Status status;
std::shared_ptr<ProjectorHolder> holder = projector_modules_.Lookup(module_id);
if (holder == nullptr) {
@@ -622,6 +624,32 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
break;
}
+ std::shared_ptr<gandiva::SelectionVector> selection_vector;
+ auto selection_buffer = std::make_shared<arrow::Buffer>(
+ reinterpret_cast<uint8_t*>(sel_vec_addr), sel_vec_size);
+ int output_row_count = 0;
+ switch (sel_vec_type) {
+ case types::SV_NONE: {
+ output_row_count = num_rows;
+ break;
+ }
+ case types::SV_INT16: {
+ status = gandiva::SelectionVector::MakeImmutableInt16(
+ sel_vec_rows, selection_buffer, &selection_vector);
+ output_row_count = sel_vec_rows;
+ break;
+ }
+ case types::SV_INT32: {
+ status = gandiva::SelectionVector::MakeImmutableInt32(
+ sel_vec_rows, selection_buffer, &selection_vector);
+ output_row_count = sel_vec_rows;
+ break;
+ }
+ }
+ if (!status.ok()) {
+ break;
+ }
+
auto ret_types = holder->rettypes();
ArrayDataVector output;
int buf_idx = 0;
@@ -640,14 +668,10 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
std::make_shared<arrow::MutableBuffer>(value_buf, data_sz);
auto array_data =
- arrow::ArrayData::Make(field->type(), num_rows, {bitmap_buf, data_buf});
+ arrow::ArrayData::Make(field->type(), output_row_count, {bitmap_buf, data_buf});
output.push_back(array_data);
}
- if (!status.ok()) {
- break;
- }
-
- status = holder->projector()->Evaluate(*in_batch, output);
+ status = holder->projector()->Evaluate(*in_batch, selection_vector.get(), output);
} while (0);
env->ReleaseLongArrayElements(buf_addrs, in_buf_addrs, JNI_ABORT);
diff --git a/cpp/src/gandiva/llvm_generator.cc b/cpp/src/gandiva/llvm_generator.cc
index c6844cf..10f160d 100644
--- a/cpp/src/gandiva/llvm_generator.cc
+++ b/cpp/src/gandiva/llvm_generator.cc
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation (ASF) under one
+// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -53,21 +53,20 @@ Status LLVMGenerator::Make(std::shared_ptr<Configuration> config,
Status LLVMGenerator::Add(const ExpressionPtr expr, const FieldDescriptorPtr output) {
int idx = static_cast<int>(compiled_exprs_.size());
-
// decompose the expression to separate out value and validities.
ExprDecomposer decomposer(function_registry_, annotator_);
ValueValidityPairPtr value_validity;
ARROW_RETURN_NOT_OK(decomposer.Decompose(*expr->root(), &value_validity));
-
// Generate the IR function for the decomposed expression.
- llvm::Function* ir_function = nullptr;
- ARROW_RETURN_NOT_OK(
- CodeGenExprValue(value_validity->value_expr(), output, idx, &ir_function));
+ std::unique_ptr<CompiledExpr> compiled_expr(new CompiledExpr(value_validity, output));
+ for (auto mode : SelectionVector::kAllModes) {
+ llvm::Function* ir_function = nullptr;
+ ARROW_RETURN_NOT_OK(
+ CodeGenExprValue(value_validity->value_expr(), output, idx, &ir_function, mode));
+ compiled_expr->SetIRFunction(mode, ir_function);
+ }
- std::unique_ptr<CompiledExpr> compiled_expr(
- new CompiledExpr(value_validity, output, ir_function));
compiled_exprs_.push_back(std::move(compiled_expr));
-
return Status::OK();
}
@@ -77,23 +76,31 @@ Status LLVMGenerator::Build(const ExpressionVector& exprs) {
auto output = annotator_.AddOutputFieldDescriptor(expr->result());
ARROW_RETURN_NOT_OK(Add(expr, output));
}
-
- // Optimize, compile and finalize the module
+ // optimise, compile and finalize the module
ARROW_RETURN_NOT_OK(engine_->FinalizeModule(optimise_ir_, dump_ir_));
-
// setup the jit functions for each expression.
for (auto& compiled_expr : compiled_exprs_) {
- llvm::Function* ir_func = compiled_expr->ir_function();
- EvalFunc fn = reinterpret_cast<EvalFunc>(engine_->CompiledFunction(ir_func));
- compiled_expr->set_jit_function(fn);
+ for (auto mode : SelectionVector::kAllModes) {
+ auto ir_function = compiled_expr->GetIRFunction(mode);
+ auto jit_function =
+ reinterpret_cast<EvalFunc>(engine_->CompiledFunction(ir_function));
+ compiled_expr->SetJITFunction(mode, jit_function);
+ }
}
-
return Status::OK();
}
/// Execute the compiled module against the provided vectors.
Status LLVMGenerator::Execute(const arrow::RecordBatch& record_batch,
const ArrayDataVector& output_vector) {
+ return Execute(record_batch, nullptr, output_vector);
+}
+
+/// Execute the compiled module against the provided vectors based on the type of
+/// selection vector.
+Status LLVMGenerator::Execute(const arrow::RecordBatch& record_batch,
+ const SelectionVector* selection_vector,
+ const ArrayDataVector& output_vector) {
DCHECK_GT(record_batch.num_rows(), 0);
auto eval_batch = annotator_.PrepareEvalBatch(record_batch, output_vector);
@@ -101,16 +108,27 @@ Status LLVMGenerator::Execute(const arrow::RecordBatch& record_batch,
for (auto& compiled_expr : compiled_exprs_) {
// generate data/offset vectors.
- EvalFunc jit_function = compiled_expr->jit_function();
+ const uint8_t* selection_buffer = nullptr;
+ auto num_output_rows = record_batch.num_rows();
+ auto mode = SelectionVector::MODE_NONE;
+ if (selection_vector != nullptr) {
+ selection_buffer = selection_vector->GetBuffer().data();
+ num_output_rows = selection_vector->GetNumSlots();
+ mode = selection_vector->GetMode();
+ }
+
+ EvalFunc jit_function = compiled_expr->GetJITFunction(mode);
jit_function(eval_batch->GetBufferArray(), eval_batch->GetLocalBitMapArray(),
- (int64_t)eval_batch->GetExecutionContext(), record_batch.num_rows());
+ selection_buffer, (int64_t)eval_batch->GetExecutionContext(),
+ num_output_rows);
+ // check for execution errors
ARROW_RETURN_IF(
eval_batch->GetExecutionContext()->has_error(),
Status::ExecutionError(eval_batch->GetExecutionContext()->get_error()));
// generate validity vectors.
- ComputeBitMapsForExpr(*compiled_expr, *eval_batch);
+ ComputeBitMapsForExpr(*compiled_expr, *eval_batch, selection_vector);
}
return Status::OK();
@@ -212,23 +230,34 @@ llvm::Value* LLVMGenerator::GetLocalBitMapReference(llvm::Value* arg_bitmaps, in
// exit: ; preds = %loop
// ret i32 0
// }
-
Status LLVMGenerator::CodeGenExprValue(DexPtr value_expr, FieldDescriptorPtr output,
- int suffix_idx, llvm::Function** fn) {
+ int suffix_idx, llvm::Function** fn,
+ SelectionVector::Mode selection_vector_mode) {
llvm::IRBuilder<>* builder = ir_builder();
-
// Create fn prototype :
// int expr_1 (long **addrs, long **bitmaps, long *context_ptr, long nrec)
std::vector<llvm::Type*> arguments;
arguments.push_back(types()->i64_ptr_type());
arguments.push_back(types()->i64_ptr_type());
- arguments.push_back(types()->i64_type());
- arguments.push_back(types()->i64_type());
+ switch (selection_vector_mode) {
+ case SelectionVector::MODE_NONE:
+ case SelectionVector::MODE_UINT16:
+ arguments.push_back(types()->ptr_type(types()->i16_type()));
+ break;
+ case SelectionVector::MODE_UINT32:
+ arguments.push_back(types()->i32_ptr_type());
+ break;
+ case SelectionVector::MODE_UINT64:
+ arguments.push_back(types()->i64_ptr_type());
+ }
+ arguments.push_back(types()->i64_type()); // ctxt_ptr
+ arguments.push_back(types()->i64_type()); // nrec
llvm::FunctionType* prototype =
llvm::FunctionType::get(types()->i32_type(), arguments, false /*isVarArg*/);
// Create fn
- std::string func_name = "expr_" + std::to_string(suffix_idx);
+ std::string func_name = "expr_" + std::to_string(suffix_idx) + "_" +
+ std::to_string(static_cast<int>(selection_vector_mode));
engine_->AddFunctionToCompile(func_name);
*fn = llvm::Function::Create(prototype, llvm::GlobalValue::ExternalLinkage, func_name,
module());
@@ -242,6 +271,9 @@ Status LLVMGenerator::CodeGenExprValue(DexPtr value_expr, FieldDescriptorPtr out
llvm::Value* arg_local_bitmaps = &*args;
arg_local_bitmaps->setName("local_bitmaps");
++args;
+ llvm::Value* arg_selection_vector = &*args;
+ arg_selection_vector->setName("selection_vector");
+ ++args;
llvm::Value* arg_context_ptr = &*args;
arg_context_ptr->setName("context_ptr");
++args;
@@ -263,9 +295,17 @@ Status LLVMGenerator::CodeGenExprValue(DexPtr value_expr, FieldDescriptorPtr out
// define loop_var : start with 0, +1 after each iter
llvm::PHINode* loop_var = builder->CreatePHI(types()->i64_type(), 2, "loop_var");
+ llvm::Value* position_var = loop_var;
+ if (selection_vector_mode != SelectionVector::MODE_NONE) {
+ position_var = builder->CreateIntCast(
+ builder->CreateLoad(builder->CreateGEP(arg_selection_vector, loop_var),
+ "uncasted_position_var"),
+ types()->i64_type(), true, "position_var");
+ }
+
// The visitor can add code to both the entry/loop blocks.
Visitor visitor(this, *fn, loop_entry, arg_addrs, arg_local_bitmaps, arg_context_ptr,
- loop_var);
+ position_var);
value_expr->Accept(visitor);
LValuePtr output_value = visitor.result();
@@ -356,7 +396,8 @@ void LLVMGenerator::ClearPackedBitValueIfFalse(llvm::Value* bitmap, llvm::Value*
/// Extract the bitmap addresses, and do an intersection.
void LLVMGenerator::ComputeBitMapsForExpr(const CompiledExpr& compiled_expr,
- const EvalBatch& eval_batch) {
+ const EvalBatch& eval_batch,
+ const SelectionVector* selection_vector) {
auto validities = compiled_expr.value_validity()->validity_exprs();
// Extract all the source bitmap addresses.
@@ -368,9 +409,28 @@ void LLVMGenerator::ComputeBitMapsForExpr(const CompiledExpr& compiled_expr,
// Extract the destination bitmap address.
int out_idx = compiled_expr.output()->validity_idx();
uint8_t* dst_bitmap = eval_batch.GetBuffer(out_idx);
-
// Compute the destination bitmap.
- accumulator.ComputeResult(dst_bitmap);
+ if (selection_vector == nullptr) {
+ accumulator.ComputeResult(dst_bitmap);
+ } else {
+ /// The output bitmap is an intersection of some input/local bitmaps. However, with a
+ /// selection vector, only the bits corresponding to the indices in the selection
+ /// vector need to set in the output bitmap. This is done in two steps :
+ ///
+ /// 1. Do the intersection of input/local bitmaps to generate a temporary bitmap.
+ /// 2. copy just the relevant bits from the temporary bitmap to the output bitmap.
+ LocalBitMapsHolder bit_map_holder(eval_batch.num_records(), 1);
+ uint8_t* temp_bitmap = bit_map_holder.GetLocalBitMap(0);
+ accumulator.ComputeResult(temp_bitmap);
+
+ auto num_out_records = selection_vector->GetNumSlots();
+ // the memset isn't required, doing it just for valgrind.
+ memset(dst_bitmap, 0, arrow::BitUtil::BytesForBits(num_out_records));
+ for (auto i = 0; i < num_out_records; ++i) {
+ auto bit = arrow::BitUtil::GetBit(temp_bitmap, selection_vector->GetIndex(i));
+ arrow::BitUtil::SetBitTo(dst_bitmap, i, bit);
+ }
+ }
}
llvm::Value* LLVMGenerator::AddFunctionCall(const std::string& full_name,
diff --git a/cpp/src/gandiva/llvm_generator.h b/cpp/src/gandiva/llvm_generator.h
index 2c1d5c1..61038fc 100644
--- a/cpp/src/gandiva/llvm_generator.h
+++ b/cpp/src/gandiva/llvm_generator.h
@@ -35,6 +35,7 @@
#include "gandiva/gandiva_aliases.h"
#include "gandiva/llvm_types.h"
#include "gandiva/lvalue.h"
+#include "gandiva/selection_vector.h"
#include "gandiva/value_validity_pair.h"
#include "gandiva/visibility.h"
@@ -49,14 +50,21 @@ class GANDIVA_EXPORT LLVMGenerator {
static Status Make(std::shared_ptr<Configuration> config,
std::unique_ptr<LLVMGenerator>* llvm_generator);
- /// \brief Build the code for the expression trees. Each element in the vector
- /// represents an expression tree
+ /// \brief Build the code for the expression trees for default mode. Each
+ /// element in the vector represents an expression tree
Status Build(const ExpressionVector& exprs);
- /// \brief Execute the built expression against the provided arguments.
+ /// \brief Execute the built expression against the provided arguments for
+ /// default mode.
Status Execute(const arrow::RecordBatch& record_batch,
const ArrayDataVector& output_vector);
+ /// \brief Execute the built expression against the provided arguments for
+ /// all modes. Only works on the records specified in the selection_vector.
+ Status Execute(const arrow::RecordBatch& record_batch,
+ const SelectionVector* selection_vector,
+ const ArrayDataVector& output_vector);
+
LLVMTypes* types() { return engine_->types(); }
llvm::Module* module() { return engine_->module(); }
@@ -148,8 +156,8 @@ class GANDIVA_EXPORT LLVMGenerator {
bool has_arena_allocs_;
};
- // Generate the code for one expression, with the output of the expression going to
- // 'output'.
+ // Generate the code for one expression for default mode, with the output of
+ // the expression going to 'output'.
Status Add(const ExpressionPtr expr, const FieldDescriptorPtr output);
/// Generate code to load the vector at specified index in the 'arg_addrs' array.
@@ -167,7 +175,8 @@ class GANDIVA_EXPORT LLVMGenerator {
/// Generate code for the value array of one expression.
Status CodeGenExprValue(DexPtr value_expr, FieldDescriptorPtr output, int suffix_idx,
- llvm::Function** fn);
+ llvm::Function** fn,
+ SelectionVector::Mode selection_vector_mode);
/// Generate code to load the local bitmap specified index and cast it as bitmap.
llvm::Value* GetLocalBitMapReference(llvm::Value* arg_bitmaps, int idx);
@@ -200,8 +209,10 @@ class GANDIVA_EXPORT LLVMGenerator {
/// \param[in] compiled_expr the compiled expression (includes the bitmap indices to be
/// used for computing the validity bitmap of the result).
/// \param[in] eval_batch (includes input/output buffer addresses)
+ /// \param[in] selection_vector the list of selected positions
void ComputeBitMapsForExpr(const CompiledExpr& compiled_expr,
- const EvalBatch& eval_batch);
+ const EvalBatch& eval_batch,
+ const SelectionVector* selection_vector);
/// Replace the %T in the trace msg with the correct type corresponding to 'type'
/// eg. %d for int32, %ld for int64, ..
diff --git a/cpp/src/gandiva/llvm_generator_test.cc b/cpp/src/gandiva/llvm_generator_test.cc
index fed6339..a486cc1 100644
--- a/cpp/src/gandiva/llvm_generator_test.cc
+++ b/cpp/src/gandiva/llvm_generator_test.cc
@@ -86,7 +86,8 @@ TEST_F(TestLLVMGenerator, TestAdd) {
llvm::Function* ir_func = nullptr;
- status = generator->CodeGenExprValue(func_dex, desc_sum, 0, &ir_func);
+ status = generator->CodeGenExprValue(func_dex, desc_sum, 0, &ir_func,
+ SelectionVector::MODE_NONE);
EXPECT_TRUE(status.ok()) << status.message();
status = generator->engine_->FinalizeModule(true, false);
@@ -107,7 +108,7 @@ TEST_F(TestLLVMGenerator, TestAdd) {
reinterpret_cast<uint8_t*>(a1), reinterpret_cast<uint8_t*>(&in_bitmap),
reinterpret_cast<uint8_t*>(out), reinterpret_cast<uint8_t*>(&out_bitmap),
};
- eval_func(addrs, nullptr, 0 /* dummy context ptr */, num_records);
+ eval_func(addrs, nullptr, nullptr, 0 /* dummy context ptr */, num_records);
uint32_t expected[] = {6, 8, 10, 12};
for (int i = 0; i < num_records; i++) {
diff --git a/cpp/src/gandiva/projector.cc b/cpp/src/gandiva/projector.cc
index 7950fc7..e7b2954 100644
--- a/cpp/src/gandiva/projector.cc
+++ b/cpp/src/gandiva/projector.cc
@@ -92,53 +92,71 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
Status Projector::Evaluate(const arrow::RecordBatch& batch,
const ArrayDataVector& output_data_vecs) {
+ return Evaluate(batch, nullptr, output_data_vecs);
+}
+
+Status Projector::Evaluate(const arrow::RecordBatch& batch,
+ const SelectionVector* selection_vector,
+ const ArrayDataVector& output_data_vecs) {
ARROW_RETURN_NOT_OK(ValidateEvaluateArgsCommon(batch));
- ARROW_RETURN_IF(
- output_data_vecs.size() != output_fields_.size(),
- Status::Invalid("Number of output buffers must match number of fields"));
+
+ if (output_data_vecs.size() != output_fields_.size()) {
+ std::stringstream ss;
+ ss << "number of buffers for output_data_vecs is " << output_data_vecs.size()
+ << ", expected " << output_fields_.size();
+ return Status::Invalid(ss.str());
+ }
int idx = 0;
for (auto& array_data : output_data_vecs) {
- const auto output_field = output_fields_[idx];
if (array_data == nullptr) {
- return Status::Invalid("Output array for field ", output_field->name(),
- " should not be null");
+ std::stringstream ss;
+ ss << "array for output field " << output_fields_[idx]->name() << "is null.";
+ return Status::Invalid(ss.str());
}
+ auto num_rows =
+ selection_vector == nullptr ? batch.num_rows() : selection_vector->GetNumSlots();
+
ARROW_RETURN_NOT_OK(
- ValidateArrayDataCapacity(*array_data, *output_field, batch.num_rows()));
+ ValidateArrayDataCapacity(*array_data, *(output_fields_[idx]), num_rows));
++idx;
}
-
- return llvm_generator_->Execute(batch, output_data_vecs);
+ return llvm_generator_->Execute(batch, selection_vector, output_data_vecs);
}
Status Projector::Evaluate(const arrow::RecordBatch& batch, arrow::MemoryPool* pool,
arrow::ArrayVector* output) {
+ return Evaluate(batch, nullptr, pool, output);
+}
+
+Status Projector::Evaluate(const arrow::RecordBatch& batch,
+ const SelectionVector* selection_vector,
+ arrow::MemoryPool* pool, arrow::ArrayVector* output) {
ARROW_RETURN_NOT_OK(ValidateEvaluateArgsCommon(batch));
ARROW_RETURN_IF(output == nullptr, Status::Invalid("Output must be non-null."));
ARROW_RETURN_IF(pool == nullptr, Status::Invalid("Memory pool must be non-null."));
+ auto num_rows =
+ selection_vector == nullptr ? batch.num_rows() : selection_vector->GetNumSlots();
// Allocate the output data vecs.
ArrayDataVector output_data_vecs;
- output_data_vecs.reserve(output_fields_.size());
for (auto& field : output_fields_) {
ArrayDataPtr output_data;
- ARROW_RETURN_NOT_OK(
- AllocArrayData(field->type(), batch.num_rows(), pool, &output_data));
+ ARROW_RETURN_NOT_OK(AllocArrayData(field->type(), num_rows, pool, &output_data));
output_data_vecs.push_back(output_data);
}
// Execute the expression(s).
- ARROW_RETURN_NOT_OK(llvm_generator_->Execute(batch, output_data_vecs));
+ ARROW_RETURN_NOT_OK(
+ llvm_generator_->Execute(batch, selection_vector, output_data_vecs));
// Create and return array arrays.
output->clear();
for (auto& array_data : output_data_vecs) {
output->push_back(arrow::MakeArray(array_data));
}
-
return Status::OK();
}
diff --git a/cpp/src/gandiva/projector.h b/cpp/src/gandiva/projector.h
index 58bac78..2249854 100644
--- a/cpp/src/gandiva/projector.h
+++ b/cpp/src/gandiva/projector.h
@@ -27,6 +27,7 @@
#include "gandiva/arrow.h"
#include "gandiva/configuration.h"
#include "gandiva/expression.h"
+#include "gandiva/selection_vector.h"
#include "gandiva/visibility.h"
namespace gandiva {
@@ -81,6 +82,28 @@ class GANDIVA_EXPORT Projector {
/// populated by Evaluate.
Status Evaluate(const arrow::RecordBatch& batch, const ArrayDataVector& output);
+ /// Evaluate the specified record batch, and return the allocated and populated output
+ /// arrays. The output arrays will be allocated from the memory pool 'pool', and added
+ /// to the vector 'output'.
+ ///
+ /// \param[in] batch the record batch. schema should be the same as the one in 'Make'
+ /// \param[in] selection_vector selection vector which has filtered row posisitons.
+ /// \param[in] pool memory pool used to allocate output arrays (if required).
+ /// \param[out] output the vector of allocated/populated arrays.
+ Status Evaluate(const arrow::RecordBatch& batch,
+ const SelectionVector* selection_vector, arrow::MemoryPool* pool,
+ arrow::ArrayVector* output);
+
+ /// Evaluate the specified record batch, and populate the output arrays at the filtered
+ /// positions. The output arrays of sufficient capacity must be allocated by the caller.
+ ///
+ /// \param[in] batch the record batch. schema should be the same as the one in 'Make'
+ /// \param[in] selection_vector selection vector which has the filtered row posisitons
+ /// \param[in,out] output vector of arrays, the arrays are allocated by the caller and
+ /// populated by Evaluate.
+ Status Evaluate(const arrow::RecordBatch& batch,
+ const SelectionVector* selection_vector, const ArrayDataVector& output);
+
private:
Projector(std::unique_ptr<LLVMGenerator> llvm_generator, SchemaPtr schema,
const FieldVector& output_fields, std::shared_ptr<Configuration>);
diff --git a/cpp/src/gandiva/proto/Types.proto b/cpp/src/gandiva/proto/Types.proto
index 7474065..9efa80f 100644
--- a/cpp/src/gandiva/proto/Types.proto
+++ b/cpp/src/gandiva/proto/Types.proto
@@ -66,8 +66,9 @@ enum TimeUnit {
}
enum SelectionVectorType {
- SV_INT16 = 0;
- SV_INT32 = 1;
+ SV_NONE = 0;
+ SV_INT16 = 1;
+ SV_INT32 = 2;
}
message ExtGandivaType {
diff --git a/cpp/src/gandiva/selection_vector.cc b/cpp/src/gandiva/selection_vector.cc
index e643cec..85c690e 100644
--- a/cpp/src/gandiva/selection_vector.cc
+++ b/cpp/src/gandiva/selection_vector.cc
@@ -28,6 +28,8 @@
namespace gandiva {
+constexpr SelectionVector::Mode SelectionVector::kAllModes[kNumModes];
+
Status SelectionVector::PopulateFromBitMap(const uint8_t* bitmap, int64_t bitmap_size,
int64_t max_bitmap_index) {
const uint64_t max_idx = static_cast<uint64_t>(max_bitmap_index);
@@ -88,7 +90,6 @@ Status SelectionVector::MakeInt16(int64_t max_slots,
std::shared_ptr<SelectionVector>* selection_vector) {
ARROW_RETURN_NOT_OK(SelectionVectorInt16::ValidateBuffer(max_slots, buffer));
*selection_vector = std::make_shared<SelectionVectorInt16>(max_slots, buffer);
-
return Status::OK();
}
@@ -97,7 +98,14 @@ Status SelectionVector::MakeInt16(int64_t max_slots, arrow::MemoryPool* pool,
std::shared_ptr<arrow::Buffer> buffer;
ARROW_RETURN_NOT_OK(SelectionVectorInt16::AllocateBuffer(max_slots, pool, &buffer));
*selection_vector = std::make_shared<SelectionVectorInt16>(max_slots, buffer);
+ return Status::OK();
+}
+Status SelectionVector::MakeImmutableInt16(
+ int64_t num_slots, std::shared_ptr<arrow::Buffer> buffer,
+ std::shared_ptr<SelectionVector>* selection_vector) {
+ *selection_vector =
+ std::make_shared<SelectionVectorInt16>(num_slots, num_slots, buffer);
return Status::OK();
}
@@ -119,6 +127,14 @@ Status SelectionVector::MakeInt32(int64_t max_slots, arrow::MemoryPool* pool,
return Status::OK();
}
+Status SelectionVector::MakeImmutableInt32(
+ int64_t num_slots, std::shared_ptr<arrow::Buffer> buffer,
+ std::shared_ptr<SelectionVector>* selection_vector) {
+ *selection_vector =
+ std::make_shared<SelectionVectorInt32>(num_slots, num_slots, buffer);
+ return Status::OK();
+}
+
Status SelectionVector::MakeInt64(int64_t max_slots,
std::shared_ptr<arrow::Buffer> buffer,
std::shared_ptr<SelectionVector>* selection_vector) {
@@ -137,8 +153,8 @@ Status SelectionVector::MakeInt64(int64_t max_slots, arrow::MemoryPool* pool,
return Status::OK();
}
-template <typename C_TYPE, typename A_TYPE>
-Status SelectionVectorImpl<C_TYPE, A_TYPE>::AllocateBuffer(
+template <typename C_TYPE, typename A_TYPE, SelectionVector::Mode mode>
+Status SelectionVectorImpl<C_TYPE, A_TYPE, mode>::AllocateBuffer(
int64_t max_slots, arrow::MemoryPool* pool, std::shared_ptr<arrow::Buffer>* buffer) {
auto buffer_len = max_slots * sizeof(C_TYPE);
ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(pool, buffer_len, buffer));
@@ -146,8 +162,8 @@ Status SelectionVectorImpl<C_TYPE, A_TYPE>::AllocateBuffer(
return Status::OK();
}
-template <typename C_TYPE, typename A_TYPE>
-Status SelectionVectorImpl<C_TYPE, A_TYPE>::ValidateBuffer(
+template <typename C_TYPE, typename A_TYPE, SelectionVector::Mode mode>
+Status SelectionVectorImpl<C_TYPE, A_TYPE, mode>::ValidateBuffer(
int64_t max_slots, std::shared_ptr<arrow::Buffer> buffer) {
ARROW_RETURN_IF(!buffer->is_mutable(),
Status::Invalid("buffer for selection vector must be mutable"));
diff --git a/cpp/src/gandiva/selection_vector.h b/cpp/src/gandiva/selection_vector.h
index 2e99417..d30a388 100644
--- a/cpp/src/gandiva/selection_vector.h
+++ b/cpp/src/gandiva/selection_vector.h
@@ -34,6 +34,17 @@ class GANDIVA_EXPORT SelectionVector {
public:
virtual ~SelectionVector() = default;
+ enum Mode : int {
+ MODE_NONE,
+ MODE_UINT16,
+ MODE_UINT32,
+ MODE_UINT64,
+ MODE_MAX = MODE_UINT64, // dummy
+ };
+ static constexpr int kNumModes = static_cast<int>(MODE_MAX) + 1;
+ static constexpr Mode kAllModes[kNumModes] = {MODE_NONE, MODE_UINT16, MODE_UINT32,
+ MODE_UINT64};
+
/// Get the value at a given index.
virtual uint64_t GetIndex(int64_t index) const = 0;
@@ -55,6 +66,12 @@ class GANDIVA_EXPORT SelectionVector {
/// Convert to arrow-array.
virtual ArrayPtr ToArray() const = 0;
+ /// Get the underlying arrow buffer.
+ virtual arrow::Buffer& GetBuffer() const = 0;
+
+ /// Mode of SelectionVector
+ virtual Mode GetMode() const = 0;
+
/// \brief populate selection vector for all the set bits in the bitmap.
///
/// \param[in] bitmap the bitmap
@@ -79,6 +96,15 @@ class GANDIVA_EXPORT SelectionVector {
static Status MakeInt16(int64_t max_slots, arrow::MemoryPool* pool,
std::shared_ptr<SelectionVector>* selection_vector);
+ /// \brief creates a selection vector with pre populated buffer.
+ ///
+ /// \param[in] num_slots size of the selection vector
+ /// \param[in] buffer pre-populated buffer
+ /// \param[out] selection_vector selection vector backed by 'buffer'
+ static Status MakeImmutableInt16(int64_t num_slots,
+ std::shared_ptr<arrow::Buffer> buffer,
+ std::shared_ptr<SelectionVector>* selection_vector);
+
/// \brief make selection vector with int32 type records.
///
/// \param[in] max_slots max number of slots
@@ -96,6 +122,15 @@ class GANDIVA_EXPORT SelectionVector {
static Status MakeInt32(int64_t max_slots, arrow::MemoryPool* pool,
std::shared_ptr<SelectionVector>* selection_vector);
+ /// \brief creates a selection vector with pre populated buffer.
+ ///
+ /// \param[in] num_slots size of the selection vector
+ /// \param[in] buffer pre-populated buffer
+ /// \param[out] selection_vector selection vector backed by 'buffer'
+ static Status MakeImmutableInt32(int64_t num_slots,
+ std::shared_ptr<arrow::Buffer> buffer,
+ std::shared_ptr<SelectionVector>* selection_vector);
+
/// \brief make selection vector with int64 type records.
///
/// \param[in] max_slots max number of slots
diff --git a/cpp/src/gandiva/selection_vector_impl.h b/cpp/src/gandiva/selection_vector_impl.h
index 3d6a7df..86912b5 100644
--- a/cpp/src/gandiva/selection_vector_impl.h
+++ b/cpp/src/gandiva/selection_vector_impl.h
@@ -32,14 +32,22 @@ namespace gandiva {
/// \brief template implementation of selection vector with a specific ctype and arrow
/// type.
-template <typename C_TYPE, typename A_TYPE>
+template <typename C_TYPE, typename A_TYPE, SelectionVector::Mode mode>
class SelectionVectorImpl : public SelectionVector {
public:
SelectionVectorImpl(int64_t max_slots, std::shared_ptr<arrow::Buffer> buffer)
- : max_slots_(max_slots), num_slots_(0), buffer_(buffer) {
+ : max_slots_(max_slots), num_slots_(0), buffer_(buffer), mode_(mode) {
raw_data_ = reinterpret_cast<C_TYPE*>(buffer->mutable_data());
}
+ SelectionVectorImpl(int64_t max_slots, int64_t num_slots,
+ std::shared_ptr<arrow::Buffer> buffer)
+ : max_slots_(max_slots), num_slots_(num_slots), buffer_(buffer), mode_(mode) {
+ if (buffer) {
+ raw_data_ = const_cast<C_TYPE*>(reinterpret_cast<const C_TYPE*>(buffer->data()));
+ }
+ }
+
uint64_t GetIndex(int64_t index) const override { return raw_data_[index]; }
void SetIndex(int64_t index, uint64_t value) override {
@@ -61,6 +69,10 @@ class SelectionVectorImpl : public SelectionVector {
return std::numeric_limits<C_TYPE>::max();
}
+ Mode GetMode() const override { return mode_; }
+
+ arrow::Buffer& GetBuffer() const override { return *buffer_; }
+
static Status AllocateBuffer(int64_t max_slots, arrow::MemoryPool* pool,
std::shared_ptr<arrow::Buffer>* buffer);
@@ -75,18 +87,24 @@ class SelectionVectorImpl : public SelectionVector {
std::shared_ptr<arrow::Buffer> buffer_;
C_TYPE* raw_data_;
+
+ /// SelectionVector mode
+ Mode mode_;
};
-template <typename C_TYPE, typename A_TYPE>
-ArrayPtr SelectionVectorImpl<C_TYPE, A_TYPE>::ToArray() const {
+template <typename C_TYPE, typename A_TYPE, SelectionVector::Mode mode>
+ArrayPtr SelectionVectorImpl<C_TYPE, A_TYPE, mode>::ToArray() const {
auto data_type = arrow::TypeTraits<A_TYPE>::type_singleton();
auto array_data = arrow::ArrayData::Make(data_type, num_slots_, {NULLPTR, buffer_});
return arrow::MakeArray(array_data);
}
-using SelectionVectorInt16 = SelectionVectorImpl<uint16_t, arrow::UInt16Type>;
-using SelectionVectorInt32 = SelectionVectorImpl<uint32_t, arrow::UInt32Type>;
-using SelectionVectorInt64 = SelectionVectorImpl<uint64_t, arrow::UInt64Type>;
+using SelectionVectorInt16 =
+ SelectionVectorImpl<uint16_t, arrow::UInt16Type, SelectionVector::MODE_UINT16>;
+using SelectionVectorInt32 =
+ SelectionVectorImpl<uint32_t, arrow::UInt32Type, SelectionVector::MODE_UINT32>;
+using SelectionVectorInt64 =
+ SelectionVectorImpl<uint64_t, arrow::UInt64Type, SelectionVector::MODE_UINT64>;
} // namespace gandiva
diff --git a/cpp/src/gandiva/tests/filter_project_test.cc b/cpp/src/gandiva/tests/filter_project_test.cc
new file mode 100644
index 0000000..e290029
--- /dev/null
+++ b/cpp/src/gandiva/tests/filter_project_test.cc
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include "arrow/memory_pool.h"
+#include "gandiva/filter.h"
+#include "gandiva/projector.h"
+#include "gandiva/selection_vector.h"
+#include "gandiva/tests/test_util.h"
+#include "gandiva/tree_expr_builder.h"
+
+namespace gandiva {
+
+using arrow::boolean;
+using arrow::float32;
+using arrow::int32;
+
+class TestFilterProject : public ::testing::Test {
+ public:
+ void SetUp() { pool_ = arrow::default_memory_pool(); }
+
+ protected:
+ arrow::MemoryPool* pool_;
+};
+
+TEST_F(TestFilterProject, TestSimple16) {
+ // schema for input fields
+ auto field0 = field("f0", int32());
+ auto field1 = field("f1", int32());
+ auto field2 = field("f2", int32());
+ auto resultField = field("result", int32());
+ auto schema = arrow::schema({field0, field1, field2});
+
+ // Build condition f0 < f1
+ auto node_f0 = TreeExprBuilder::MakeField(field0);
+ auto node_f1 = TreeExprBuilder::MakeField(field1);
+ auto node_f2 = TreeExprBuilder::MakeField(field2);
+ auto less_than_function =
+ TreeExprBuilder::MakeFunction("less_than", {node_f0, node_f1}, arrow::boolean());
+ auto condition = TreeExprBuilder::MakeCondition(less_than_function);
+ auto sum_expr = TreeExprBuilder::MakeExpression("add", {field1, field2}, resultField);
+
+ auto configuration = TestConfiguration();
+
+ std::shared_ptr<Filter> filter;
+ std::shared_ptr<Projector> projector;
+
+ auto status = Filter::Make(schema, condition, configuration, &filter);
+ EXPECT_TRUE(status.ok());
+
+ status = Projector::Make(schema, {sum_expr}, configuration, &projector);
+ EXPECT_TRUE(status.ok());
+
+ // Create a row-batch with some sample data
+ int num_records = 5;
+ auto array0 = MakeArrowArrayInt32({1, 2, 6, 40, 3}, {true, true, true, true, true});
+ auto array1 = MakeArrowArrayInt32({5, 9, 3, 17, 6}, {true, true, true, true, true});
+ auto array2 = MakeArrowArrayInt32({1, 2, 6, 40, 3}, {true, true, true, true, false});
+ // expected output
+ auto result = MakeArrowArrayInt32({6, 11, 0}, {true, true, false});
+ // prepare input record batch
+ auto in_batch = arrow::RecordBatch::Make(schema, num_records, {array0, array1, array2});
+
+ std::shared_ptr<SelectionVector> selection_vector;
+ status = SelectionVector::MakeInt16(num_records, pool_, &selection_vector);
+ EXPECT_TRUE(status.ok());
+ // Evaluate expression
+ status = filter->Evaluate(*in_batch, selection_vector);
+ EXPECT_TRUE(status.ok());
+
+ // Evaluate expression
+ arrow::ArrayVector outputs;
+
+ status = projector->Evaluate(*in_batch, selection_vector.get(), pool_, &outputs);
+ EXPECT_TRUE(status.ok());
+
+ // Validate results
+ EXPECT_ARROW_ARRAY_EQUALS(result, outputs.at(0));
+}
+
+TEST_F(TestFilterProject, TestSimple32) {
+ // schema for input fields
+ auto field0 = field("f0", int32());
+ auto field1 = field("f1", int32());
+ auto field2 = field("f2", int32());
+ auto resultField = field("result", int32());
+ auto schema = arrow::schema({field0, field1, field2});
+
+ // Build condition f0 < f1
+ auto node_f0 = TreeExprBuilder::MakeField(field0);
+ auto node_f1 = TreeExprBuilder::MakeField(field1);
+ auto node_f2 = TreeExprBuilder::MakeField(field2);
+ auto less_than_function =
+ TreeExprBuilder::MakeFunction("less_than", {node_f0, node_f1}, arrow::boolean());
+ auto condition = TreeExprBuilder::MakeCondition(less_than_function);
+ auto sum_expr = TreeExprBuilder::MakeExpression("add", {field1, field2}, resultField);
+
+ auto configuration = TestConfiguration();
+
+ std::shared_ptr<Filter> filter;
+ std::shared_ptr<Projector> projector;
+
+ auto status = Filter::Make(schema, condition, configuration, &filter);
+ EXPECT_TRUE(status.ok());
+
+ status = Projector::Make(schema, {sum_expr}, configuration, &projector);
+ EXPECT_TRUE(status.ok());
+
+ // Create a row-batch with some sample data
+ int num_records = 5;
+ auto array0 = MakeArrowArrayInt32({1, 2, 6, 40, 3}, {true, true, true, true, true});
+ auto array1 = MakeArrowArrayInt32({5, 9, 3, 17, 6}, {true, true, true, true, true});
+ auto array2 = MakeArrowArrayInt32({1, 2, 6, 40, 3}, {true, true, true, true, false});
+ // expected output
+ auto result = MakeArrowArrayInt32({6, 11, 0}, {true, true, false});
+ // prepare input record batch
+ auto in_batch = arrow::RecordBatch::Make(schema, num_records, {array0, array1, array2});
+
+ std::shared_ptr<SelectionVector> selection_vector;
+ status = SelectionVector::MakeInt32(num_records, pool_, &selection_vector);
+ EXPECT_TRUE(status.ok());
+ // Evaluate expression
+ status = filter->Evaluate(*in_batch, selection_vector);
+ EXPECT_TRUE(status.ok());
+
+ // Evaluate expression
+ arrow::ArrayVector outputs;
+
+ status = projector->Evaluate(*in_batch, selection_vector.get(), pool_, &outputs);
+ EXPECT_TRUE(status.ok());
+
+ // Validate results
+ EXPECT_ARROW_ARRAY_EQUALS(result, outputs.at(0));
+}
+
+TEST_F(TestFilterProject, TestSimple64) {
+ // schema for input fields
+ auto field0 = field("f0", int32());
+ auto field1 = field("f1", int32());
+ auto field2 = field("f2", int32());
+ auto resultField = field("result", int32());
+ auto schema = arrow::schema({field0, field1, field2});
+
+ // Build condition f0 < f1
+ auto node_f0 = TreeExprBuilder::MakeField(field0);
+ auto node_f1 = TreeExprBuilder::MakeField(field1);
+ auto node_f2 = TreeExprBuilder::MakeField(field2);
+ auto less_than_function =
+ TreeExprBuilder::MakeFunction("less_than", {node_f0, node_f1}, arrow::boolean());
+ auto condition = TreeExprBuilder::MakeCondition(less_than_function);
+ auto sum_expr = TreeExprBuilder::MakeExpression("add", {field1, field2}, resultField);
+
+ auto configuration = TestConfiguration();
+
+ std::shared_ptr<Filter> filter;
+ std::shared_ptr<Projector> projector;
+
+ auto status = Filter::Make(schema, condition, configuration, &filter);
+ EXPECT_TRUE(status.ok());
+
+ status = Projector::Make(schema, {sum_expr}, configuration, &projector);
+ EXPECT_TRUE(status.ok());
+
+ // Create a row-batch with some sample data
+ int num_records = 5;
+ auto array0 = MakeArrowArrayInt32({1, 2, 6, 40, 3}, {true, true, true, true, true});
+ auto array1 = MakeArrowArrayInt32({5, 9, 3, 17, 6}, {true, true, true, true, true});
+ auto array2 = MakeArrowArrayInt32({1, 2, 6, 40, 3}, {true, true, true, true, false});
+ // expected output
+ auto result = MakeArrowArrayInt32({6, 11, 0}, {true, true, false});
+ // prepare input record batch
+ auto in_batch = arrow::RecordBatch::Make(schema, num_records, {array0, array1, array2});
+
+ std::shared_ptr<SelectionVector> selection_vector;
+ status = SelectionVector::MakeInt64(num_records, pool_, &selection_vector);
+ EXPECT_TRUE(status.ok());
+ // Evaluate expression
+ status = filter->Evaluate(*in_batch, selection_vector);
+ EXPECT_TRUE(status.ok());
+
+ // Evaluate expression
+ arrow::ArrayVector outputs;
+
+ status = projector->Evaluate(*in_batch, selection_vector.get(), pool_, &outputs);
+ EXPECT_TRUE(status.ok());
+
+ // Validate results
+ EXPECT_ARROW_ARRAY_EQUALS(result, outputs.at(0));
+}
+
+TEST_F(TestFilterProject, TestSimpleIf) {
+ // schema for input fields
+ auto fielda = field("a", int32());
+ auto fieldb = field("b", int32());
+ auto fieldc = field("c", int32());
+ auto schema = arrow::schema({fielda, fieldb, fieldc});
+
+ // output fields
+ auto field_result = field("res", int32());
+
+ auto node_a = TreeExprBuilder::MakeField(fielda);
+ auto node_b = TreeExprBuilder::MakeField(fieldb);
+ auto node_c = TreeExprBuilder::MakeField(fieldc);
+
+ auto greater_than_function =
+ TreeExprBuilder::MakeFunction("greater_than", {node_a, node_b}, boolean());
+ auto filter_condition = TreeExprBuilder::MakeCondition(greater_than_function);
+
+ auto project_condition =
+ TreeExprBuilder::MakeFunction("less_than", {node_b, node_c}, boolean());
+ auto if_node = TreeExprBuilder::MakeIf(project_condition, node_b, node_c, int32());
+
+ auto expr = TreeExprBuilder::MakeExpression(if_node, field_result);
+ auto configuration = TestConfiguration();
+
+ // Build a filter for the expressions.
+ std::shared_ptr<Filter> filter;
+ auto status = Filter::Make(schema, filter_condition, configuration, &filter);
+ EXPECT_TRUE(status.ok());
+
+ // Build a projector for the expressions.
+ std::shared_ptr<Projector> projector;
+ status = Projector::Make(schema, {expr}, configuration, &projector);
+ EXPECT_TRUE(status.ok());
+
+ // Create a row-batch with some sample data
+ int num_records = 6;
+ auto array0 =
+ MakeArrowArrayInt32({10, 12, -20, 5, 21, 29}, {true, true, true, true, true, true});
+ auto array1 =
+ MakeArrowArrayInt32({5, 15, 15, 17, 12, 3}, {true, true, true, true, true, true});
+ auto array2 = MakeArrowArrayInt32({1, 25, 11, 30, -21, 30},
+ {true, true, true, true, true, false});
+
+ // Create a selection vector
+ std::shared_ptr<SelectionVector> selection_vector;
+ status = SelectionVector::MakeInt32(num_records, pool_, &selection_vector);
+ EXPECT_TRUE(status.ok());
+
+ // expected output
+ auto exp = MakeArrowArrayInt32({1, -21, 0}, {true, true, false});
+
+ // prepare input record batch
+ auto in_batch = arrow::RecordBatch::Make(schema, num_records, {array0, array1, array2});
+
+ // Evaluate filter
+ status = filter->Evaluate(*in_batch, selection_vector);
+ EXPECT_TRUE(status.ok());
+
+ // Evaluate project
+ arrow::ArrayVector outputs;
+ status = projector->Evaluate(*in_batch, selection_vector.get(), pool_, &outputs);
+ EXPECT_TRUE(status.ok());
+
+ // Validate results
+ EXPECT_ARROW_ARRAY_EQUALS(exp, outputs.at(0));
+}
+} // namespace gandiva
diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
index f00b0fb..adbcdf3 100644
--- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
+++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
@@ -61,6 +61,8 @@ public class JniWrapper {
*/
native void evaluateProjector(long moduleId, int numRows,
long[] bufAddrs, long[] bufSizes,
+ int selectionVectorType, int selectionVectorSize,
+ long selectionVectorBufferAddr, long selectionVectorBufferSize,
long[] outAddrs, long[] outSizes) throws GandivaException;
/**
diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
index d13195c..fb7c6e4 100644
--- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
+++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
@@ -26,6 +26,7 @@ import org.apache.arrow.gandiva.exceptions.UnsupportedTypeException;
import org.apache.arrow.gandiva.expression.ArrowTypeHelper;
import org.apache.arrow.gandiva.expression.ExpressionTree;
import org.apache.arrow.gandiva.ipc.GandivaTypes;
+import org.apache.arrow.gandiva.ipc.GandivaTypes.SelectionVectorType;
import org.apache.arrow.vector.FixedWidthVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ipc.message.ArrowBuffer;
@@ -113,8 +114,10 @@ public class Projector {
*/
public void evaluate(ArrowRecordBatch recordBatch, List<ValueVector> outColumns)
throws GandivaException {
- evaluate(recordBatch.getLength(), recordBatch.getBuffers(), recordBatch.getBuffersLayout(),
- outColumns);
+ evaluate(recordBatch.getLength(), recordBatch.getBuffers(),
+ recordBatch.getBuffersLayout(),
+ SelectionVectorType.SV_NONE.getNumber(), recordBatch.getLength(),
+ 0, 0, outColumns);
}
/**
@@ -134,10 +137,54 @@ public class Projector {
buffersLayout.add(new ArrowBuffer(offset, size));
offset += size;
}
- evaluate(numRows, buffers, buffersLayout, outColumns);
+ evaluate(numRows, buffers, buffersLayout,
+ SelectionVectorType.SV_NONE.getNumber(),
+ numRows, 0, 0, outColumns);
+ }
+
+ public void evaluate(ArrowRecordBatch recordBatch,
+ SelectionVector selectionVector, List<ValueVector> outColumns)
+ throws GandivaException {
+ evaluate(recordBatch.getLength(), recordBatch.getBuffers(),
+ recordBatch.getBuffersLayout(),
+ selectionVector.getType().getNumber(),
+ selectionVector.getRecordCount(),
+ selectionVector.getBuffer().memoryAddress(),
+ selectionVector.getBuffer().capacity(),
+ outColumns);
+ }
+
+ /**
+ * Invoke this function to evaluate a set of expressions against a set of arrow buffers
+ * on the selected positions.
+ * (this is an optimised version that skips taking references).
+ *
+ * @param numRows number of rows.
+ * @param buffers List of input arrow buffers
+ * @param selectionVector Selection vector which stores the selected rows.
+ * @param outColumns Result of applying the project on the data
+ */
+ public void evaluate(int numRows, List<ArrowBuf> buffers,
+ SelectionVector selectionVector,
+ List<ValueVector> outColumns) throws GandivaException {
+ List<ArrowBuffer> buffersLayout = new ArrayList<>();
+ long offset = 0;
+ for (ArrowBuf arrowBuf : buffers) {
+ long size = arrowBuf.readableBytes();
+ buffersLayout.add(new ArrowBuffer(offset, size));
+ offset += size;
+ }
+ evaluate(numRows, buffers, buffersLayout,
+ selectionVector.getType().getNumber(),
+ selectionVector.getRecordCount(),
+ selectionVector.getBuffer().memoryAddress(),
+ selectionVector.getBuffer().capacity(),
+ outColumns);
}
private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buffersLayout,
+ int selectionVectorType, int selectionVectorRecordCount,
+ long selectionVectorAddr, long selectionVectorSize,
List<ValueVector> outColumns) throws GandivaException {
if (this.closed) {
throw new EvaluatorClosedException();
@@ -174,10 +221,13 @@ public class Projector {
outAddrs[idx] = valueVector.getDataBuffer().memoryAddress();
outSizes[idx++] = valueVector.getDataBuffer().capacity();
- valueVector.setValueCount(numRows);
+ valueVector.setValueCount(selectionVectorRecordCount);
}
- wrapper.evaluateProjector(this.moduleId, numRows, bufAddrs, bufSizes, outAddrs, outSizes);
+ wrapper.evaluateProjector(this.moduleId, numRows, bufAddrs, bufSizes,
+ selectionVectorType, selectionVectorRecordCount,
+ selectionVectorAddr, selectionVectorSize,
+ outAddrs, outSizes);
}
/**
diff --git a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/FilterProjectTest.java b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/FilterProjectTest.java
new file mode 100644
index 0000000..33489cc
--- /dev/null
+++ b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/FilterProjectTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.gandiva.evaluator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.gandiva.expression.Condition;
+import org.apache.arrow.gandiva.expression.ExpressionTree;
+import org.apache.arrow.gandiva.expression.TreeBuilder;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ArrowBuf;
+
+public class FilterProjectTest extends BaseEvaluatorTest {
+
+ @Test
+ public void testSimpleSV16() throws GandivaException, Exception {
+ Field a = Field.nullable("a", int32);
+ Field b = Field.nullable("b", int32);
+ Field c = Field.nullable("c", int32);
+ List<Field> args = Lists.newArrayList(a, b);
+
+ Condition condition = TreeBuilder.makeCondition("less_than", args);
+
+ Schema schema = new Schema(args);
+ Filter filter = Filter.make(schema, condition);
+
+ ExpressionTree expression = TreeBuilder.makeExpression("add", Lists.newArrayList(a, b), c);
+ Projector projector = Projector.make(schema, Lists.newArrayList(expression));
+
+ int numRows = 16;
+ byte[] validity = new byte[]{(byte) 255, 0};
+ // second half is "undefined"
+ int[] aValues = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+ int[] bValues = new int[]{2, 1, 4, 3, 6, 5, 8, 7, 10, 9, 12, 11, 14, 13, 14, 15};
+ int[] expected = {3, 7, 11, 15};
+
+ verifyTestCaseFor16(filter, projector, numRows, validity, aValues, bValues, expected);
+ }
+
+ private void verifyTestCaseFor16(Filter filter, Projector projector, int numRows, byte[] validity,
+ int[] aValues, int[] bValues, int[] expected) throws GandivaException {
+ ArrowBuf validitya = buf(validity);
+ ArrowBuf valuesa = intBuf(aValues);
+ ArrowBuf validityb = buf(validity);
+ ArrowBuf valuesb = intBuf(bValues);
+ ArrowRecordBatch batch = new ArrowRecordBatch(
+ numRows,
+ Lists.newArrayList(new ArrowFieldNode(numRows, 0), new ArrowFieldNode(numRows, 0)),
+ Lists.newArrayList(validitya, valuesa, validityb, valuesb));
+
+ ArrowBuf selectionBuffer = buf(numRows * 2);
+ SelectionVectorInt16 selectionVector = new SelectionVectorInt16(selectionBuffer);
+
+ filter.evaluate(batch, selectionVector);
+
+ IntVector intVector = new IntVector(EMPTY_SCHEMA_PATH, allocator);
+ intVector.allocateNew(selectionVector.getRecordCount());
+
+ List<ValueVector> output = new ArrayList<ValueVector>();
+ output.add(intVector);
+ projector.evaluate(batch, selectionVector, output);
+ for (int i = 0; i < selectionVector.getRecordCount(); i++) {
+ assertFalse(intVector.isNull(i));
+ assertEquals(expected[i], intVector.get(i));
+ }
+ // free buffers
+ releaseRecordBatch(batch);
+ releaseValueVectors(output);
+ selectionBuffer.close();
+ filter.close();
+ projector.close();
+ }
+}