You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/08/23 16:42:05 UTC
incubator-quickstep git commit: Introduced DestroyAggregationState
operator
Repository: incubator-quickstep
Updated Branches:
refs/heads/destroy-agg-state-operator [created] 57db5d332
Introduced DestroyAggregationState operator
- Similar to the pattern with DestroyHash, this operator destroys the
AggregationState once the Finalize aggregation operator finishes its
execution.
- Optimizer support for DestroyAggregationState operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/57db5d33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/57db5d33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/57db5d33
Branch: refs/heads/destroy-agg-state-operator
Commit: 57db5d33267dcf1f258d83ac72cb49e00de79962
Parents: cdc1e05
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Aug 23 11:00:57 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Aug 23 11:41:28 2016 -0500
----------------------------------------------------------------------
query_execution/QueryContext.hpp | 41 ++++---
query_optimizer/CMakeLists.txt | 1 +
query_optimizer/ExecutionGenerator.cpp | 10 ++
relational_operators/CMakeLists.txt | 17 ++-
.../DestroyAggregationStateOperator.cpp | 64 ++++++++++
.../DestroyAggregationStateOperator.hpp | 116 +++++++++++++++++++
.../FinalizeAggregationOperator.cpp | 2 +-
.../FinalizeAggregationOperator.hpp | 3 +-
relational_operators/WorkOrder.proto | 35 +++---
relational_operators/WorkOrderFactory.cpp | 68 +++++------
.../tests/AggregationOperator_unittest.cpp | 51 +++++---
11 files changed, 321 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index c54c7ff..c179586 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -1,20 +1,20 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_
@@ -184,7 +184,7 @@ class QueryContext {
/**
* @brief Release the given AggregationOperationState.
*
- * @param id The id of the AggregationOperationState to destroy.
+ * @param id The id of the AggregationOperationState to release.
*
* @return The AggregationOperationState, alreadly created in the constructor.
**/
@@ -195,6 +195,17 @@ class QueryContext {
}
/**
+ * @brief Destroy the given aggregation state.
+ *
+ * @param id The ID of the AggregationOperationState to destroy.
+ **/
+ inline void destroyAggregationState(const aggregation_state_id id) {
+ DCHECK_LT(id, aggregation_states_.size());
+ DCHECK(aggregation_states_[id]);
+ aggregation_states_[id].reset(nullptr);
+ }
+
+ /**
* @brief Whether the given BloomFilter id is valid.
*
* @param id The BloomFilter id.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 56ae52f..32f7885 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -118,6 +118,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_relationaloperators_CreateIndexOperator
quickstep_relationaloperators_CreateTableOperator
quickstep_relationaloperators_DeleteOperator
+ quickstep_relationaloperators_DestroyAggregationStateOperator
quickstep_relationaloperators_DestroyHashOperator
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e03e09..130134c 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -94,6 +94,7 @@
#include "relational_operators/CreateIndexOperator.hpp"
#include "relational_operators/CreateTableOperator.hpp"
#include "relational_operators/DeleteOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
#include "relational_operators/DestroyHashOperator.hpp"
#include "relational_operators/DropTableOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
@@ -1464,6 +1465,15 @@ void ExecutionGenerator::convertAggregate(
std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
output_relation);
+
+ const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+ execution_plan_->addRelationalOperator(
+ new DestroyAggregationStateOperator(query_handle_->query_id(),
+ aggr_state_index));
+
+ execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+ finalize_aggregation_operator_index,
+ true);
}
void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 43a42f9..d507581 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ add_library(quickstep_relationaloperators_AggregationOperator AggregationOperato
add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp)
+add_library(quickstep_relationaloperators_DestroyAggregationStateOperator
+ DestroyAggregationStateOperator.cpp
+ DestroyAggregationStateOperator.hpp)
add_library(quickstep_relationaloperators_DeleteOperator DeleteOperator.cpp DeleteOperator.hpp)
add_library(quickstep_relationaloperators_DestroyHashOperator DestroyHashOperator.cpp DestroyHashOperator.hpp)
add_library(quickstep_relationaloperators_DropTableOperator DropTableOperator.cpp DropTableOperator.hpp)
@@ -136,6 +139,16 @@ target_link_libraries(quickstep_relationaloperators_DeleteOperator
quickstep_threading_ThreadIDBasedMap
quickstep_utility_Macros
tmb)
+target_link_libraries(quickstep_relationaloperators_DestroyAggregationStateOperator
+ glog
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_DestroyHashOperator
glog
quickstep_queryexecution_QueryContext
@@ -451,6 +464,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_AggregationOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_DeleteOperator
+ quickstep_relationaloperators_DestroyAggregationStateOperator
quickstep_relationaloperators_DestroyHashOperator
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
@@ -467,7 +481,6 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_TableGeneratorOperator
quickstep_relationaloperators_TextScanOperator
quickstep_relationaloperators_UpdateOperator
- quickstep_relationaloperators_WindowAggregationOperator
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
@@ -484,6 +497,7 @@ target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_CreateIndexOperator
quickstep_relationaloperators_CreateTableOperator
quickstep_relationaloperators_DeleteOperator
+ quickstep_relationaloperators_DestroyAggregationStateOperator
quickstep_relationaloperators_DestroyHashOperator
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
@@ -533,6 +547,7 @@ target_link_libraries(AggregationOperator_unittest
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_WorkOrdersContainer
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_DestroyAggregationStateOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_WorkOrder
quickstep_storage_AggregationOperationState_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
new file mode 100644
index 0000000..62ca9e7
--- /dev/null
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool DestroyAggregationStateOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ if (blocking_dependencies_met_ && !work_generated_) {
+ work_generated_ = true;
+ container->addNormalWorkOrder(
+ new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, query_context),
+ op_index_);
+ }
+ return work_generated_;
+}
+
+bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (blocking_dependencies_met_ && !work_generated_) {
+ work_generated_ = true;
+
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
+
+ container->addWorkOrderProto(proto, op_index_);
+ }
+ return work_generated_;
+}
+
+void DestroyAggregationStateWorkOrder::execute() {
+ query_context_->destroyAggregationState(aggr_state_index_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/DestroyAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp
new file mode 100644
index 0000000..a4300a4
--- /dev/null
+++ b/relational_operators/DestroyAggregationStateOperator.hpp
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+/**
+ * @brief An operator which destroys a shared aggregation state.
+ **/
+class DestroyAggregationStateOperator : public RelationalOperator {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_id The ID of the query to which this operator belongs.
+ * @param aggr_state_index The index of the AggregationState in QueryContext.
+ **/
+ DestroyAggregationStateOperator(const std::size_t query_id,
+ const QueryContext::aggregation_state_id aggr_state_index)
+ : RelationalOperator(query_id),
+ aggr_state_index_(aggr_state_index),
+ work_generated_(false) {}
+
+ ~DestroyAggregationStateOperator() override {}
+
+ std::string getName() const override {
+ return "DestroyAggregationStateOperator";
+ }
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+ const QueryContext::aggregation_state_id aggr_state_index_;
+ bool work_generated_;
+
+ DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by DestroyAggregationStateOperator.
+ **/
+class DestroyAggregationStateWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_id The ID of the query to which this WorkOrder belongs.
+ * @param aggr_state_index The index of the AggregationState in QueryContext.
+ * @param query_context The QueryContext to use.
+ **/
+ DestroyAggregationStateWorkOrder(const std::size_t query_id,
+ const QueryContext::aggregation_state_id aggr_state_index,
+ QueryContext *query_context)
+ : WorkOrder(query_id),
+ aggr_state_index_(aggr_state_index),
+ query_context_(DCHECK_NOTNULL(query_context)) {}
+
+ ~DestroyAggregationStateWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ const QueryContext::aggregation_state_id aggr_state_index_;
+ QueryContext *query_context_;
+
+ DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 65e62c4..7e337de 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,7 +44,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
container->addNormalWorkOrder(
new FinalizeAggregationWorkOrder(
query_id_,
- query_context->releaseAggregationState(aggr_state_index_),
+ query_context->getAggregationState(aggr_state_index_),
query_context->getInsertDestination(output_destination_index_)),
op_index_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 7ac6712..0aeac2a 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -22,7 +22,6 @@
#include <cstddef>
#include <string>
-#include <memory>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
@@ -133,7 +132,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
void execute() override;
private:
- std::unique_ptr<AggregationOperationState> state_;
+ AggregationOperationState *state_;
InsertDestination *output_destination_;
DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 02aa50e..6c6284b 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -1,19 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
+// Copyright 2011-2015 Quickstep Technologies LLC.
+// Copyright 2015-2016 Pivotal Software, Inc.
+// Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+// University of Wisconsin\u2014Madison.
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
syntax = "proto2";
@@ -42,6 +42,7 @@ enum WorkOrderType {
TEXT_SCAN = 18;
UPDATE = 19;
WINDOW_AGGREGATION = 20;
+ DESTROY_AGGREGATION_STATE = 21;
}
message WorkOrder {
@@ -253,3 +254,9 @@ message WindowAggregationWorkOrder {
optional int32 insert_destination_index = 338;
}
}
+
+message DestroyAggregationStateWorkOrder {
+ extend WorkOrder {
+ optional uint32 aggr_state_index = 339;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 6970486..291cc33 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -1,20 +1,17 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2015-2016 Pivotal Software, Inc.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
#include "relational_operators/WorkOrderFactory.hpp"
@@ -30,6 +27,7 @@
#include "relational_operators/AggregationOperator.hpp"
#include "relational_operators/BuildHashOperator.hpp"
#include "relational_operators/DeleteOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
#include "relational_operators/DestroyHashOperator.hpp"
#include "relational_operators/DropTableOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
@@ -46,7 +44,6 @@
#include "relational_operators/TableGeneratorOperator.hpp"
#include "relational_operators/TextScanOperator.hpp"
#include "relational_operators/UpdateOperator.hpp"
-#include "relational_operators/WindowAggregationOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
@@ -116,6 +113,14 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
shiftboss_client_id,
bus);
}
+ case serialization::DESTROY_AGGREGATION_STATE: {
+ LOG(INFO) << "Creating DestroyAggregationStateWorkOrder";
+ return new DestroyAggregationStateWorkOrder(
+ proto.query_id(),
+ proto.GetExtension(
+ serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
+ query_context);
+ }
case serialization::DESTROY_HASH: {
LOG(INFO) << "Creating DestroyHashWorkOrder";
return new DestroyHashWorkOrder(
@@ -420,22 +425,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
shiftboss_client_id,
bus);
}
- case serialization::WINDOW_AGGREGATION: {
- LOG(INFO) << "Creating WindowAggregationWorkOrder";
- vector<block_id> blocks;
- for (int i = 0; i < proto.ExtensionSize(serialization::WindowAggregationWorkOrder::block_ids); ++i) {
- blocks.push_back(
- proto.GetExtension(serialization::WindowAggregationWorkOrder::block_ids, i));
- }
-
- return new WindowAggregationWorkOrder(
- proto.query_id(),
- query_context->getWindowAggregationState(
- proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
- move(blocks),
- query_context->getInsertDestination(
- proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index)));
- }
default:
LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";
}
@@ -489,6 +478,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
proto.HasExtension(serialization::DeleteWorkOrder::operator_index);
}
+ case serialization::DESTROY_AGGREGATION_STATE: {
+ return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
+ query_context.isValidAggregationStateId(
+ proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index));
+ }
case serialization::DESTROY_HASH: {
return proto.HasExtension(serialization::DestroyHashWorkOrder::join_hash_table_index) &&
query_context.isValidJoinHashTableId(
@@ -533,11 +527,13 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
return false;
}
+ const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
const attribute_id attr_id =
proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i);
- if (!probe_relation.hasAttributeWithId(attr_id)) {
+ if (!build_relation.hasAttributeWithId(attr_id) ||
+ !probe_relation.hasAttributeWithId(attr_id)) {
return false;
}
}
@@ -712,14 +708,6 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
proto.HasExtension(serialization::UpdateWorkOrder::block_id);
}
- case serialization::WINDOW_AGGREGATION: {
- return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&
- query_context.isValidWindowAggregationStateId(
- proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)) &&
- proto.HasExtension(serialization::WindowAggregationWorkOrder::insert_destination_index) &&
- query_context.isValidInsertDestinationId(
- proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index));
- }
default:
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57db5d33/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 7a5b461..0d5d8f5 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -1,20 +1,18 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
#include <cstddef>
@@ -44,6 +42,7 @@
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/AggregationOperationState.pb.h"
@@ -292,6 +291,9 @@ class AggregationOperatorTest : public ::testing::Test {
*result_table_,
insert_destination_index));
+ destroy_aggr_state_op_.reset(
+ new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto,
*db_,
@@ -304,6 +306,7 @@ class AggregationOperatorTest : public ::testing::Test {
// class' checks about operator index are successful.
op_->setOperatorIndex(kOpIndex);
finalize_op_->setOperatorIndex(kOpIndex);
+ destroy_aggr_state_op_->setOperatorIndex(kOpIndex);
}
void setupTestGroupBy(const std::string &stem,
@@ -379,6 +382,9 @@ class AggregationOperatorTest : public ::testing::Test {
*result_table_,
insert_destination_index));
+ destroy_aggr_state_op_.reset(
+ new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto,
*db_,
@@ -391,6 +397,7 @@ class AggregationOperatorTest : public ::testing::Test {
// class' checks about operator index are successful.
op_->setOperatorIndex(kOpIndex);
finalize_op_->setOperatorIndex(kOpIndex);
+ destroy_aggr_state_op_->setOperatorIndex(kOpIndex);
}
void execute() {
@@ -423,6 +430,21 @@ class AggregationOperatorTest : public ::testing::Test {
work_order->execute();
delete work_order;
}
+
+ destroy_aggr_state_op_->informAllBlockingDependenciesMet();
+
+ WorkOrdersContainer destroy_aggr_state_op_container(1, 0);
+ const std::size_t destroy_aggr_state_op_index = 0;
+ destroy_aggr_state_op_->getAllWorkOrders(&destroy_aggr_state_op_container,
+ query_context_.get(),
+ storage_manager_.get(),
+ foreman_client_id_,
+ &bus_);
+ while (destroy_aggr_state_op_container.hasNormalWorkOrder(destroy_aggr_state_op_index)) {
+ WorkOrder *work_order = destroy_aggr_state_op_container.getNormalWorkOrder(destroy_aggr_state_op_index);
+ work_order->execute();
+ delete work_order;
+ }
}
template <class T>
@@ -528,6 +550,7 @@ class AggregationOperatorTest : public ::testing::Test {
std::unique_ptr<AggregationOperator> op_;
std::unique_ptr<FinalizeAggregationOperator> finalize_op_;
+ std::unique_ptr<DestroyAggregationStateOperator> destroy_aggr_state_op_;
};
const char AggregationOperatorTest::kDatabaseName[] = "database";