You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/08/21 16:38:44 UTC

[3/3] incubator-quickstep git commit: DAG support for destroy aggregation state op.

DAG support for destroy aggregation state op.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b7bc9f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b7bc9f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b7bc9f4a

Branch: refs/heads/partitioned-aggregation
Commit: b7bc9f4a28c0c3c5155dda9b534fe39da43a4d52
Parents: f543784
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Aug 21 11:38:11 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sun Aug 21 11:38:11 2016 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |  1 +
 query_optimizer/ExecutionGenerator.cpp          | 10 ++++++++
 relational_operators/CMakeLists.txt             |  1 +
 .../tests/AggregationOperator_unittest.cpp      | 25 ++++++++++++++++++++
 4 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7bc9f4a/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index a56b714..bd922d0 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -118,6 +118,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_CreateIndexOperator
                       quickstep_relationaloperators_CreateTableOperator
                       quickstep_relationaloperators_DeleteOperator
+                      quickstep_relationaloperators_DestroyAggregationStateOperator
                       quickstep_relationaloperators_DestroyHashOperator
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7bc9f4a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 88103df..6abef2d 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -92,6 +92,7 @@
 #include "relational_operators/CreateIndexOperator.hpp"
 #include "relational_operators/CreateTableOperator.hpp"
 #include "relational_operators/DeleteOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
 #include "relational_operators/DestroyHashOperator.hpp"
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
@@ -1478,6 +1479,15 @@ void ExecutionGenerator::convertAggregate(
       std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
   temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
                                             output_relation);
+
+  const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+      execution_plan_->addRelationalOperator(
+          new DestroyAggregationStateOperator(query_handle_->query_id(),
+                                              aggr_state_index));
+
+  execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+                                       finalize_aggregation_operator_index,
+                                       true);
 }
 
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7bc9f4a/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 0cbb5c7..4b3ac84 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -545,6 +545,7 @@ target_link_libraries(AggregationOperator_unittest
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_WorkOrdersContainer
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_DestroyAggregationStateOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_storage_AggregationOperationState_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7bc9f4a/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index fd4692a..0d5d8f5 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -42,6 +42,7 @@
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/AggregationOperationState.pb.h"
@@ -290,6 +291,9 @@ class AggregationOperatorTest : public ::testing::Test {
                                         *result_table_,
                                         insert_destination_index));
 
+    destroy_aggr_state_op_.reset(
+        new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,
                                           *db_,
@@ -302,6 +306,7 @@ class AggregationOperatorTest : public ::testing::Test {
     // class' checks about operator index are successful.
     op_->setOperatorIndex(kOpIndex);
     finalize_op_->setOperatorIndex(kOpIndex);
+    destroy_aggr_state_op_->setOperatorIndex(kOpIndex);
   }
 
   void setupTestGroupBy(const std::string &stem,
@@ -377,6 +382,9 @@ class AggregationOperatorTest : public ::testing::Test {
                                         *result_table_,
                                         insert_destination_index));
 
+    destroy_aggr_state_op_.reset(
+        new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,
                                           *db_,
@@ -389,6 +397,7 @@ class AggregationOperatorTest : public ::testing::Test {
     // class' checks about operator index are successful.
     op_->setOperatorIndex(kOpIndex);
     finalize_op_->setOperatorIndex(kOpIndex);
+    destroy_aggr_state_op_->setOperatorIndex(kOpIndex);
   }
 
   void execute() {
@@ -421,6 +430,21 @@ class AggregationOperatorTest : public ::testing::Test {
       work_order->execute();
       delete work_order;
     }
+
+    destroy_aggr_state_op_->informAllBlockingDependenciesMet();
+
+    WorkOrdersContainer destroy_aggr_state_op_container(1, 0);
+    const std::size_t destroy_aggr_state_op_index = 0;
+    destroy_aggr_state_op_->getAllWorkOrders(&destroy_aggr_state_op_container,
+                                             query_context_.get(),
+                                             storage_manager_.get(),
+                                             foreman_client_id_,
+                                             &bus_);
+    while (destroy_aggr_state_op_container.hasNormalWorkOrder(destroy_aggr_state_op_index)) {
+      WorkOrder *work_order = destroy_aggr_state_op_container.getNormalWorkOrder(destroy_aggr_state_op_index);
+      work_order->execute();
+      delete work_order;
+    }
   }
 
   template <class T>
@@ -526,6 +550,7 @@ class AggregationOperatorTest : public ::testing::Test {
 
   std::unique_ptr<AggregationOperator> op_;
   std::unique_ptr<FinalizeAggregationOperator> finalize_op_;
+  std::unique_ptr<DestroyAggregationStateOperator> destroy_aggr_state_op_;
 };
 
 const char AggregationOperatorTest::kDatabaseName[] = "database";