You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/12 19:12:51 UTC

incubator-quickstep git commit: Fixed empty table when p_key or o_key specified

Repository: incubator-quickstep
Updated Branches:
  refs/heads/SQL-window-aggregation 897ffc6db -> f4eb4a7cc


Fixed empty table when p_key or o_key specified


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f4eb4a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f4eb4a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f4eb4a7c

Branch: refs/heads/SQL-window-aggregation
Commit: f4eb4a7ccff40d278a4f4e32737edc5f93fb7419
Parents: 897ffc6
Author: shixuan-fan <sh...@apache.org>
Authored: Tue Jul 12 19:12:36 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Tue Jul 12 19:12:36 2016 +0000

----------------------------------------------------------------------
 .../WindowAggregateFunction.hpp                  |  1 -
 .../WindowAggregateFunctionAvg.cpp               |  2 --
 .../WindowAggregateFunctionAvg.hpp               |  1 -
 .../WindowAggregationHandle.hpp                  | 14 +++++++-------
 .../WindowAggregationHandleAvg.cpp               | 19 +++++++++++--------
 .../WindowAggregationHandleAvg.hpp               |  8 +++++---
 query_optimizer/ExecutionGenerator.cpp           |  5 -----
 .../WindowAggregationOperator.cpp                | 11 ++++++++---
 .../WindowAggregationOperator.hpp                |  8 ++++----
 storage/WindowAggregationOperationState.cpp      | 17 +++++------------
 storage/WindowAggregationOperationState.hpp      |  4 ++--
 storage/WindowAggregationOperationState.proto    |  1 -
 12 files changed, 42 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index 84d97fc..863ac57 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -131,7 +131,6 @@ class WindowAggregateFunction {
    **/
   virtual WindowAggregationHandle* createHandle(
       const CatalogRelationSchema &relation,
-      const std::vector<block_id> block_ids,
       std::vector<const Type*> &&argument_types,
       std::vector<const Type*> &&partition_key_types) const = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index 06ff1d9..ae065d0 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -73,7 +73,6 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
     const CatalogRelationSchema &relation,
-    const std::vector<block_id> block_ids,
     std::vector<const Type*> &&argument_types,
     std::vector<const Type*> &&partition_key_types) const {
   DCHECK(canApplyToTypes(argument_types))
@@ -81,7 +80,6 @@ WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
       << " that AVG can not be applied to.";
 
   return new WindowAggregationHandleAvg(relation,
-                                        block_ids,
                                         *argument_types.front(),
                                         std::move(partition_key_types));
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 91acf7e..07d6b4f 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -58,7 +58,6 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const CatalogRelationSchema &relation,
-      const std::vector<block_id> block_ids,
       std::vector<const Type*> &&argument_types,
       std::vector<const Type*> &&partition_key_types) const override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 4307007..04475dc 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -98,14 +98,17 @@ class WindowAggregationHandle {
    *                          NULL if all arguments are attributes.
    * @param output_destination The destination for output.
    **/
-  virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+  virtual void calculate(const std::vector<block_id> &block_ids,
+                         const std::vector<std::unique_ptr<const Scalar>> &arguments,
                          const std::vector<attribute_id> &partition_by_ids,
                          const bool is_row,
                          const std::int64_t num_preceding,
                          const std::int64_t num_following,
                          StorageManager *storage_manager) = 0;
 
-  virtual std::vector<ValueAccessor*> finalize(StorageManager *storage_manager) = 0;
+  virtual std::vector<ValueAccessor*> finalize(
+      const std::vector<block_id> &block_ids,
+      StorageManager *storage_manager) = 0;
 
  protected:
   /**
@@ -118,13 +121,10 @@ class WindowAggregationHandle {
    * @param num_following The number of rows/range that follows the current row.
    * @param storage_manager A pointer to the storage manager.
    **/
-  WindowAggregationHandle(const CatalogRelationSchema &relation,
-                          const std::vector<block_id> block_ids)
-      : block_ids_(block_ids),
-        relation_(relation) {}
+  WindowAggregationHandle(const CatalogRelationSchema &relation)
+      : relation_(relation) {}
 
   std::vector<ColumnVector*> window_aggregates_;
-  const std::vector<block_id> block_ids_;
   const CatalogRelationSchema &relation_;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 49523fb..4bc1cc1 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -53,10 +53,9 @@ class StorageManager;
 
 WindowAggregationHandleAvg::WindowAggregationHandleAvg(
     const CatalogRelationSchema &relation,
-    const std::vector<block_id> &block_ids,
     const Type &type,
     std::vector<const Type*> &&partition_key_types)
-    : WindowAggregationHandle(relation, block_ids),
+    : WindowAggregationHandle(relation),
       argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
@@ -101,7 +100,8 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
   }
 }
 
-void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+void WindowAggregationHandleAvg::calculate(const std::vector<block_id> &block_ids,
+                                           const std::vector<std::unique_ptr<const Scalar>> &arguments,
                                            const std::vector<attribute_id> &partition_by_ids,
                                            const bool is_row,
                                            const std::int64_t num_preceding,
@@ -113,7 +113,7 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
   // Index of each value accessor indicates the block it belongs to.
   std::vector<ValueAccessor*> tuple_accessors;
   std::vector<ColumnVectorsValueAccessor*> argument_accessors;
-  for (block_id bid : block_ids_) {
+  for (block_id bid : block_ids) {
     // Get tuple accessor.
     BlockReference block = storage_manager->getBlock(bid, relation_);
     const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
@@ -132,7 +132,7 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
 
   // Create a window for each tuple and calculate the window aggregate.
   for (std::uint32_t current_block_index = 0;
-       current_block_index < block_ids_.size();
+       current_block_index < block_ids.size();
        ++current_block_index) {
     ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
     ColumnVectorsValueAccessor* argument_accessor =
@@ -147,7 +147,8 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
       argument_accessor->beginIteration();
       
       while (tuple_accessor->next() && argument_accessor->next()) {
-        const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+        const TypedValue window_aggregate = this->calculateOneWindow(block_ids,
+                                                                     tuple_accessors,
                                                                      argument_accessors,
                                                                      partition_by_ids,
                                                                      current_block_index,
@@ -163,14 +164,15 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
 }
 
 std::vector<ValueAccessor*> WindowAggregationHandleAvg::finalize(
+    const std::vector<block_id> &block_ids,
     StorageManager *storage_manager) {
   std::vector<ValueAccessor*> accessors;
   
   // Create a ValueAccessor for each block, including the new window aggregate
   // attribute.
-  for (std::size_t block_idx = 0; block_idx < block_ids_.size(); ++block_idx) {
+  for (std::size_t block_idx = 0; block_idx < block_ids.size(); ++block_idx) {
     // Get the block information.
-    BlockReference block = storage_manager->getBlock(block_ids_[block_idx],
+    BlockReference block = storage_manager->getBlock(block_ids[block_idx],
                                                      relation_);
     const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
     ValueAccessor *block_accessor = tuple_block.createValueAccessor();
@@ -197,6 +199,7 @@ std::vector<ValueAccessor*> WindowAggregationHandleAvg::finalize(
 }
 
 TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+    const std::vector<block_id> &block_ids,
     std::vector<ValueAccessor*> &tuple_accessors,
     std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
     const std::vector<attribute_id> &partition_by_ids,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index adb33e0..02373b9 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -55,14 +55,16 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
  public:
   ~WindowAggregationHandleAvg() override {}
 
-  void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+  void calculate(const std::vector<block_id> &block_ids,
+                 const std::vector<std::unique_ptr<const Scalar>> &arguments,
                  const std::vector<attribute_id> &partition_by_ids,
                  const bool is_row,
                  const std::int64_t num_preceding,
                  const std::int64_t num_following,
                  StorageManager *storage_manager);
 
-  std::vector<ValueAccessor*> finalize(StorageManager *storage_manager);
+  std::vector<ValueAccessor*> finalize(const std::vector<block_id> &block_ids,
+                                       StorageManager *storage_manager) override;
 
  private:
   friend class WindowAggregateFunctionAvg;
@@ -79,11 +81,11 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
    * @param type Type of the avg value.
    **/
   explicit WindowAggregationHandleAvg(const CatalogRelationSchema &relation,
-                                      const std::vector<block_id> &block_ids,
                                       const Type &type,
                                       std::vector<const Type*> &&partition_key_types);
 
   TypedValue calculateOneWindow(
+      const std::vector<block_id> &block_ids,
       std::vector<ValueAccessor*> &tuple_accessors,
       std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
       const std::vector<attribute_id> &partition_by_ids,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 06d47d2..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1656,11 +1656,6 @@ void ExecutionGenerator::convertWindowAggregate(
       findRelationInfoOutputByPhysical(physical_plan->input());
   window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
 
-  // Get relation blocks.
-  for (block_id bid : input_relation_info->relation->getBlocksSnapshot()) {
-    window_aggr_state_proto->add_block_ids(bid);
-  }
-
   // Get window aggregate function expression.
   const E::AliasPtr &named_window_aggregate_expression =
       physical_plan->window_aggregate_expression();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 4c2e2b5..2b1870a 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -42,11 +42,13 @@ bool WindowAggregationOperator::getAllWorkOrders(
   DCHECK(query_context != nullptr);
 
   if (blocking_dependencies_met_ && !generated_) {
+    std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+    
     container->addNormalWorkOrder(
         new WindowAggregationWorkOrder(
             query_id_,
             query_context->releaseWindowAggregationState(window_aggregation_state_index_),
-            block_ids_,
+            std::move(relation_blocks),
             query_context->getInsertDestination(output_destination_index_)),
         op_index_);
     generated_ = true;
@@ -70,7 +72,9 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
   proto->set_query_id(query_id_);
   proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
                       window_aggregation_state_index_);
-  for (block_id bid : block_ids_) {
+                      
+  std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+  for (block_id bid : relation_blocks) {
     proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
   }
   proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
@@ -81,7 +85,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
 
 
 void WindowAggregationWorkOrder::execute() {
-  state_->windowAggregateBlocks(output_destination_);
+  state_->windowAggregateBlocks(output_destination_,
+                                block_ids_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index 4084ffc..02482a0 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -70,7 +70,7 @@ class WindowAggregationOperator : public RelationalOperator {
                             const QueryContext::window_aggregation_state_id window_aggregation_state_index,
                             const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
-        block_ids_(input_relation.getBlocksSnapshot()),
+        input_relation_(input_relation),
         output_relation_(output_relation),
         window_aggregation_state_index_(window_aggregation_state_index),
         output_destination_index_(output_destination_index),
@@ -102,7 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
    **/
   serialization::WorkOrder* createWorkOrderProto();
 
-  const std::vector<block_id> block_ids_;
+  const CatalogRelation &input_relation_;
   const CatalogRelation &output_relation_;
   const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
   const QueryContext::insert_destination_id output_destination_index_;
@@ -125,11 +125,11 @@ class WindowAggregationWorkOrder : public WorkOrder {
    **/
   WindowAggregationWorkOrder(const std::size_t query_id,
                              WindowAggregationOperationState *state,
-                             const std::vector<block_id> &block_ids,
+                             std::vector<block_id> &&block_ids,
                              InsertDestination *output_destination)
       : WorkOrder(query_id),
         state_(state),
-        block_ids_(block_ids),
+        block_ids_(std::move(block_ids)),
         output_destination_(output_destination)  {}
 
   ~WindowAggregationWorkOrder() override {}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index b3de423..06e846f 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -47,7 +47,6 @@ namespace quickstep {
 
 WindowAggregationOperationState::WindowAggregationOperationState(
     const CatalogRelationSchema &input_relation,
-    std::vector<block_id> &&block_ids,
     const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
     std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -56,7 +55,6 @@ WindowAggregationOperationState::WindowAggregationOperationState(
     const std::int64_t num_following,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
-      block_ids_(std::move(block_ids)),
       arguments_(std::move(arguments)),
       is_row_(is_row),
       num_preceding_(num_preceding),
@@ -85,7 +83,6 @@ WindowAggregationOperationState::WindowAggregationOperationState(
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
       window_aggregate_function->createHandle(input_relation_,
-                                              block_ids_,
                                               std::move(argument_types),
                                               std::move(partition_by_types)));
 
@@ -113,11 +110,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
     StorageManager *storage_manager) {
   DCHECK(ProtoIsValid(proto, database));
 
-  std::vector<block_id> block_ids;
-  for (int block_idx = 0; block_idx < proto.block_ids_size(); ++block_idx) {
-    block_ids.push_back(proto.block_ids(block_idx));
-  }
-
   // Rebuild contructor arguments from their representation in 'proto'.
   const WindowAggregateFunction *window_aggregate_function
       = &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
@@ -144,7 +136,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
   const std::int64_t num_following = proto.num_following();
 
   return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
-                                             std::move(block_ids),
                                              window_aggregate_function,
                                              std::move(arguments),
                                              std::move(partition_by_attributes),
@@ -195,8 +186,10 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
 }
 
 void WindowAggregationOperationState::windowAggregateBlocks(
-    InsertDestination *output_destination) {
-  window_aggregation_handle_->calculate(arguments_,
+    InsertDestination *output_destination,
+    const std::vector<block_id> &block_ids) {
+  window_aggregation_handle_->calculate(block_ids,
+                                        arguments_,
                                         partition_by_ids_,
                                         is_row_,
                                         num_preceding_,
@@ -204,7 +197,7 @@ void WindowAggregationOperationState::windowAggregateBlocks(
                                         storage_manager_);
 
   std::vector<ValueAccessor*> output_accessors(
-      window_aggregation_handle_->finalize(storage_manager_));
+      window_aggregation_handle_->finalize(block_ids, storage_manager_));
 
   for (ValueAccessor* output_accessor : output_accessors) {
     output_destination->bulkInsertTuples(output_accessor);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index 8116410..3225b66 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -67,7 +67,6 @@ class WindowAggregationOperationState {
    *        tables.
    */
   WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
-                                  std::vector<block_id> &&block_ids,
                                   const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
                                   std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -113,7 +112,8 @@ class WindowAggregationOperationState {
    * @param output_destination The output destination for the computed window
    *                           aggregate.
    **/
-  void windowAggregateBlocks(InsertDestination *output_destination);
+  void windowAggregateBlocks(InsertDestination *output_destination,
+                             const std::vector<block_id> &block_ids);
 
  private:
   const CatalogRelationSchema &input_relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4eb4a7c/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c3b672c..4dc0a6a 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -24,7 +24,6 @@ import "expressions/Expressions.proto";
 
 message WindowAggregationOperationState {
   required int32 input_relation_id = 1;
-  repeated fixed64 block_ids = 2;
   required WindowAggregateFunction function = 3;
   repeated Scalar arguments = 4;
   repeated Scalar partition_by_attributes = 5;