You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/12 19:12:51 UTC
incubator-quickstep git commit: Fixed empty table when p_key or o_key
specified
Repository: incubator-quickstep
Updated Branches:
refs/heads/SQL-window-aggregation 897ffc6db -> f4eb4a7cc
Fixed empty table when p_key or o_key specified
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f4eb4a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f4eb4a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f4eb4a7c
Branch: refs/heads/SQL-window-aggregation
Commit: f4eb4a7ccff40d278a4f4e32737edc5f93fb7419
Parents: 897ffc6
Author: shixuan-fan <sh...@apache.org>
Authored: Tue Jul 12 19:12:36 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Tue Jul 12 19:12:36 2016 +0000
----------------------------------------------------------------------
.../WindowAggregateFunction.hpp | 1 -
.../WindowAggregateFunctionAvg.cpp | 2 --
.../WindowAggregateFunctionAvg.hpp | 1 -
.../WindowAggregationHandle.hpp | 14 +++++++-------
.../WindowAggregationHandleAvg.cpp | 19 +++++++++++--------
.../WindowAggregationHandleAvg.hpp | 8 +++++---
query_optimizer/ExecutionGenerator.cpp | 5 -----
.../WindowAggregationOperator.cpp | 11 ++++++++---
.../WindowAggregationOperator.hpp | 8 ++++----
storage/WindowAggregationOperationState.cpp | 17 +++++------------
storage/WindowAggregationOperationState.hpp | 4 ++--
storage/WindowAggregationOperationState.proto | 1 -
12 files changed, 42 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index 84d97fc..863ac57 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -131,7 +131,6 @@ class WindowAggregateFunction {
**/
virtual WindowAggregationHandle* createHandle(
const CatalogRelationSchema &relation,
- const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const = 0;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index 06ff1d9..ae065d0 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -73,7 +73,6 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
const CatalogRelationSchema &relation,
- const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const {
DCHECK(canApplyToTypes(argument_types))
@@ -81,7 +80,6 @@ WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
<< " that AVG can not be applied to.";
return new WindowAggregationHandleAvg(relation,
- block_ids,
*argument_types.front(),
std::move(partition_key_types));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 91acf7e..07d6b4f 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -58,7 +58,6 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const CatalogRelationSchema &relation,
- const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 4307007..04475dc 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -98,14 +98,17 @@ class WindowAggregationHandle {
* NULL if all arguments are attributes.
* @param output_destination The destination for output.
**/
- virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ virtual void calculate(const std::vector<block_id> &block_ids,
+ const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> &partition_by_ids,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
StorageManager *storage_manager) = 0;
- virtual std::vector<ValueAccessor*> finalize(StorageManager *storage_manager) = 0;
+ virtual std::vector<ValueAccessor*> finalize(
+ const std::vector<block_id> &block_ids,
+ StorageManager *storage_manager) = 0;
protected:
/**
@@ -118,13 +121,10 @@ class WindowAggregationHandle {
* @param num_following The number of rows/range that follows the current row.
* @param storage_manager A pointer to the storage manager.
**/
- WindowAggregationHandle(const CatalogRelationSchema &relation,
- const std::vector<block_id> block_ids)
- : block_ids_(block_ids),
- relation_(relation) {}
+ WindowAggregationHandle(const CatalogRelationSchema &relation)
+ : relation_(relation) {}
std::vector<ColumnVector*> window_aggregates_;
- const std::vector<block_id> block_ids_;
const CatalogRelationSchema &relation_;
private:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 49523fb..4bc1cc1 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -53,10 +53,9 @@ class StorageManager;
WindowAggregationHandleAvg::WindowAggregationHandleAvg(
const CatalogRelationSchema &relation,
- const std::vector<block_id> &block_ids,
const Type &type,
std::vector<const Type*> &&partition_key_types)
- : WindowAggregationHandle(relation, block_ids),
+ : WindowAggregationHandle(relation),
argument_type_(type) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
@@ -101,7 +100,8 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
}
}
-void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+void WindowAggregationHandleAvg::calculate(const std::vector<block_id> &block_ids,
+ const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> &partition_by_ids,
const bool is_row,
const std::int64_t num_preceding,
@@ -113,7 +113,7 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
// Index of each value accessor indicates the block it belongs to.
std::vector<ValueAccessor*> tuple_accessors;
std::vector<ColumnVectorsValueAccessor*> argument_accessors;
- for (block_id bid : block_ids_) {
+ for (block_id bid : block_ids) {
// Get tuple accessor.
BlockReference block = storage_manager->getBlock(bid, relation_);
const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
@@ -132,7 +132,7 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
// Create a window for each tuple and calculate the window aggregate.
for (std::uint32_t current_block_index = 0;
- current_block_index < block_ids_.size();
+ current_block_index < block_ids.size();
++current_block_index) {
ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
ColumnVectorsValueAccessor* argument_accessor =
@@ -147,7 +147,8 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
argument_accessor->beginIteration();
while (tuple_accessor->next() && argument_accessor->next()) {
- const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+ const TypedValue window_aggregate = this->calculateOneWindow(block_ids,
+ tuple_accessors,
argument_accessors,
partition_by_ids,
current_block_index,
@@ -163,14 +164,15 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
}
std::vector<ValueAccessor*> WindowAggregationHandleAvg::finalize(
+ const std::vector<block_id> &block_ids,
StorageManager *storage_manager) {
std::vector<ValueAccessor*> accessors;
// Create a ValueAccessor for each block, including the new window aggregate
// attribute.
- for (std::size_t block_idx = 0; block_idx < block_ids_.size(); ++block_idx) {
+ for (std::size_t block_idx = 0; block_idx < block_ids.size(); ++block_idx) {
// Get the block information.
- BlockReference block = storage_manager->getBlock(block_ids_[block_idx],
+ BlockReference block = storage_manager->getBlock(block_ids[block_idx],
relation_);
const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
ValueAccessor *block_accessor = tuple_block.createValueAccessor();
@@ -197,6 +199,7 @@ std::vector<ValueAccessor*> WindowAggregationHandleAvg::finalize(
}
TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+ const std::vector<block_id> &block_ids,
std::vector<ValueAccessor*> &tuple_accessors,
std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
const std::vector<attribute_id> &partition_by_ids,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index adb33e0..02373b9 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -55,14 +55,16 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
public:
~WindowAggregationHandleAvg() override {}
- void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ void calculate(const std::vector<block_id> &block_ids,
+ const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> &partition_by_ids,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
StorageManager *storage_manager);
- std::vector<ValueAccessor*> finalize(StorageManager *storage_manager);
+ std::vector<ValueAccessor*> finalize(const std::vector<block_id> &block_ids,
+ StorageManager *storage_manager) override;
private:
friend class WindowAggregateFunctionAvg;
@@ -79,11 +81,11 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
* @param type Type of the avg value.
**/
explicit WindowAggregationHandleAvg(const CatalogRelationSchema &relation,
- const std::vector<block_id> &block_ids,
const Type &type,
std::vector<const Type*> &&partition_key_types);
TypedValue calculateOneWindow(
+ const std::vector<block_id> &block_ids,
std::vector<ValueAccessor*> &tuple_accessors,
std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
const std::vector<attribute_id> &partition_by_ids,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 06d47d2..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1656,11 +1656,6 @@ void ExecutionGenerator::convertWindowAggregate(
findRelationInfoOutputByPhysical(physical_plan->input());
window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
- // Get relation blocks.
- for (block_id bid : input_relation_info->relation->getBlocksSnapshot()) {
- window_aggr_state_proto->add_block_ids(bid);
- }
-
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
physical_plan->window_aggregate_expression();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 4c2e2b5..2b1870a 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -42,11 +42,13 @@ bool WindowAggregationOperator::getAllWorkOrders(
DCHECK(query_context != nullptr);
if (blocking_dependencies_met_ && !generated_) {
+ std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+
container->addNormalWorkOrder(
new WindowAggregationWorkOrder(
query_id_,
query_context->releaseWindowAggregationState(window_aggregation_state_index_),
- block_ids_,
+ std::move(relation_blocks),
query_context->getInsertDestination(output_destination_index_)),
op_index_);
generated_ = true;
@@ -70,7 +72,9 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
proto->set_query_id(query_id_);
proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
window_aggregation_state_index_);
- for (block_id bid : block_ids_) {
+
+ std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+ for (block_id bid : relation_blocks) {
proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
}
proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
@@ -81,7 +85,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
void WindowAggregationWorkOrder::execute() {
- state_->windowAggregateBlocks(output_destination_);
+ state_->windowAggregateBlocks(output_destination_,
+ block_ids_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index 4084ffc..02482a0 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -70,7 +70,7 @@ class WindowAggregationOperator : public RelationalOperator {
const QueryContext::window_aggregation_state_id window_aggregation_state_index,
const QueryContext::insert_destination_id output_destination_index)
: RelationalOperator(query_id),
- block_ids_(input_relation.getBlocksSnapshot()),
+ input_relation_(input_relation),
output_relation_(output_relation),
window_aggregation_state_index_(window_aggregation_state_index),
output_destination_index_(output_destination_index),
@@ -102,7 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
**/
serialization::WorkOrder* createWorkOrderProto();
- const std::vector<block_id> block_ids_;
+ const CatalogRelation &input_relation_;
const CatalogRelation &output_relation_;
const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
const QueryContext::insert_destination_id output_destination_index_;
@@ -125,11 +125,11 @@ class WindowAggregationWorkOrder : public WorkOrder {
**/
WindowAggregationWorkOrder(const std::size_t query_id,
WindowAggregationOperationState *state,
- const std::vector<block_id> &block_ids,
+ std::vector<block_id> &&block_ids,
InsertDestination *output_destination)
: WorkOrder(query_id),
state_(state),
- block_ids_(block_ids),
+ block_ids_(std::move(block_ids)),
output_destination_(output_destination) {}
~WindowAggregationWorkOrder() override {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index b3de423..06e846f 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -47,7 +47,6 @@ namespace quickstep {
WindowAggregationOperationState::WindowAggregationOperationState(
const CatalogRelationSchema &input_relation,
- std::vector<block_id> &&block_ids,
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -56,7 +55,6 @@ WindowAggregationOperationState::WindowAggregationOperationState(
const std::int64_t num_following,
StorageManager *storage_manager)
: input_relation_(input_relation),
- block_ids_(std::move(block_ids)),
arguments_(std::move(arguments)),
is_row_(is_row),
num_preceding_(num_preceding),
@@ -85,7 +83,6 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Create the handle and initial state.
window_aggregation_handle_.reset(
window_aggregate_function->createHandle(input_relation_,
- block_ids_,
std::move(argument_types),
std::move(partition_by_types)));
@@ -113,11 +110,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto, database));
- std::vector<block_id> block_ids;
- for (int block_idx = 0; block_idx < proto.block_ids_size(); ++block_idx) {
- block_ids.push_back(proto.block_ids(block_idx));
- }
-
// Rebuild contructor arguments from their representation in 'proto'.
const WindowAggregateFunction *window_aggregate_function
= &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
@@ -144,7 +136,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
const std::int64_t num_following = proto.num_following();
return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
- std::move(block_ids),
window_aggregate_function,
std::move(arguments),
std::move(partition_by_attributes),
@@ -195,8 +186,10 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
}
void WindowAggregationOperationState::windowAggregateBlocks(
- InsertDestination *output_destination) {
- window_aggregation_handle_->calculate(arguments_,
+ InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids) {
+ window_aggregation_handle_->calculate(block_ids,
+ arguments_,
partition_by_ids_,
is_row_,
num_preceding_,
@@ -204,7 +197,7 @@ void WindowAggregationOperationState::windowAggregateBlocks(
storage_manager_);
std::vector<ValueAccessor*> output_accessors(
- window_aggregation_handle_->finalize(storage_manager_));
+ window_aggregation_handle_->finalize(block_ids, storage_manager_));
for (ValueAccessor* output_accessor : output_accessors) {
output_destination->bulkInsertTuples(output_accessor);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index 8116410..3225b66 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -67,7 +67,6 @@ class WindowAggregationOperationState {
* tables.
*/
WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
- std::vector<block_id> &&block_ids,
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -113,7 +112,8 @@ class WindowAggregationOperationState {
* @param output_destination The output destination for the computed window
* aggregate.
**/
- void windowAggregateBlocks(InsertDestination *output_destination);
+ void windowAggregateBlocks(InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids);
private:
const CatalogRelationSchema &input_relation_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c3b672c..4dc0a6a 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -24,7 +24,6 @@ import "expressions/Expressions.proto";
message WindowAggregationOperationState {
required int32 input_relation_id = 1;
- repeated fixed64 block_ids = 2;
required WindowAggregateFunction function = 3;
repeated Scalar arguments = 4;
repeated Scalar partition_by_attributes = 5;