You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/29 23:44:21 UTC
[18/25] incubator-quickstep git commit: Introduced PolicyEnforcerBase
and its single-node implementation.
Introduced PolicyEnforcerBase and its single-node implementation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/95c451a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/95c451a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/95c451a5
Branch: refs/heads/dist-exe-test-new
Commit: 95c451a5241c5f929a76b7412c38b9ef1f3bb74f
Parents: 3bb5ca8
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Jul 18 12:18:04 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Fri Jul 29 16:43:23 2016 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 23 +-
query_execution/ForemanSingleNode.cpp | 6 +-
query_execution/ForemanSingleNode.hpp | 7 +-
query_execution/PolicyEnforcer.cpp | 254 ----------------------
query_execution/PolicyEnforcer.hpp | 223 -------------------
query_execution/PolicyEnforcerBase.cpp | 177 +++++++++++++++
query_execution/PolicyEnforcerBase.hpp | 196 +++++++++++++++++
query_execution/PolicyEnforcerSingleNode.cpp | 110 ++++++++++
query_execution/PolicyEnforcerSingleNode.hpp | 104 +++++++++
9 files changed, 612 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8c12a5d..f582ba5 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -34,7 +34,8 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
-add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
${queryexecution_QueryContext_proto_srcs}
@@ -85,7 +86,7 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanBase
- quickstep_queryexecution_PolicyEnforcer
+ quickstep_queryexecution_PolicyEnforcerSingleNode
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_WorkerDirectory
@@ -95,22 +96,29 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
-target_link_libraries(quickstep_queryexecution_PolicyEnforcer
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
glog
quickstep_catalog_CatalogDatabase
quickstep_catalog_CatalogRelation
- quickstep_catalog_CatalogTypedefs
quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionState
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryManagerBase
+ quickstep_relationaloperators_WorkOrder
+ quickstep_storage_StorageBlockInfo
+ quickstep_utility_Macros
+ tmb)
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_QueryManagerBase
quickstep_queryexecution_QueryManagerSingleNode
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
- quickstep_relationaloperators_WorkOrder
- quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
@@ -244,7 +252,8 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_ForemanSingleNode
- quickstep_queryexecution_PolicyEnforcer
+ quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_PolicyEnforcerSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryexecution_QueryExecutionMessages_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 3aa1f0b..cda02a7 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/WorkerDirectory.hpp"
@@ -36,6 +37,7 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
@@ -46,6 +48,8 @@ using std::vector;
namespace quickstep {
+class QueryHandle;
+
DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
"of pending work orders for the worker. This information is used "
"by the Foreman to assign work orders to worker threads");
@@ -88,7 +92,7 @@ ForemanSingleNode::ForemanSingleNode(
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
- policy_enforcer_.reset(new PolicyEnforcer(
+ policy_enforcer_.reset(new PolicyEnforcerSingleNode(
foreman_client_id_,
num_numa_nodes,
catalog_database_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index 7506d35..caef5e0 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -24,11 +24,12 @@
#include <vector>
#include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
+
+namespace tmb { class MessageBus; }
namespace quickstep {
@@ -128,7 +129,7 @@ class ForemanSingleNode final : public ForemanBase {
CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
- std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+ std::unique_ptr<PolicyEnforcerSingleNode> policy_enforcer_;
DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
deleted file mode 100644
index 4cba8c5..0000000
--- a/query_execution/PolicyEnforcer.cpp
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include "query_execution/PolicyEnforcer.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/QueryManagerSingleNode.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-namespace quickstep {
-
-DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
- " can be allocated in a single round of dispatch of messages to"
- " the workers.");
-
-bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
- if (admitted_queries_.size() < kMaxConcurrentQueries) {
- // Ok to admit the query.
- const std::size_t query_id = query_handle->query_id();
- if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
- // Query with the same ID not present, ok to admit.
- admitted_queries_[query_id].reset(
- new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
- catalog_database_, storage_manager_, bus_));
- return true;
- } else {
- LOG(ERROR) << "Query with the same ID " << query_id << " exists";
- return false;
- }
- } else {
- // This query will have to wait.
- waiting_queries_.push(query_handle);
- return false;
- }
-}
-
-void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
- std::size_t query_id;
- QueryManagerBase::dag_node_index op_index;
-
- switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: {
- serialization::NormalWorkOrderCompletionMessage proto;
- // Note: This proto message contains the time it took to execute the
- // WorkOrder. It can be accessed in this scope.
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- worker_directory_->decrementNumQueuedWorkOrders(
- proto.worker_thread_index());
- if (profile_individual_workorders_) {
- recordTimeForWorkOrder(proto);
- }
-
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
- admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
- break;
- }
- case kRebuildWorkOrderCompleteMessage: {
- serialization::RebuildWorkOrderCompletionMessage proto;
- // Note: This proto message contains the time it took to execute the
- // rebuild WorkOrder. It can be accessed in this scope.
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- worker_directory_->decrementNumQueuedWorkOrders(
- proto.worker_thread_index());
-
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
- admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
- break;
- }
- case kCatalogRelationNewBlockMessage: {
- serialization::CatalogRelationNewBlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- const block_id block = proto.block_id();
-
- CatalogRelation *relation =
- static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
- relation->addBlock(block);
-
- if (proto.has_partition_id()) {
- relation->getPartitionSchemeMutable()->addBlockToPartition(
- proto.partition_id(), block);
- }
- return;
- }
- case kDataPipelineMessage: {
- serialization::DataPipelineMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
- admitted_queries_[query_id]->processDataPipelineMessage(
- op_index, proto.block_id(), proto.relation_id());
- break;
- }
- case kWorkOrdersAvailableMessage: {
- serialization::WorkOrdersAvailableMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
-
- // Check if new work orders are available.
- admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
- break;
- }
- case kWorkOrderFeedbackMessage: {
- WorkOrder::FeedbackMessage msg(
- const_cast<void *>(tagged_message.message()),
- tagged_message.message_bytes());
- query_id = msg.header().query_id;
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = msg.header().rel_op_index;
- admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
- break;
- }
- default:
- LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
- }
- if (admitted_queries_[query_id]->queryStatus(op_index) ==
- QueryManagerBase::QueryStatusCode::kQueryExecuted) {
- removeQuery(query_id);
- if (!waiting_queries_.empty()) {
- // Admit the earliest waiting query.
- QueryHandle *new_query = waiting_queries_.front();
- waiting_queries_.pop();
- admitQuery(new_query);
- }
- }
-}
-
-void PolicyEnforcer::getWorkerMessages(
- std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
- // Iterate over admitted queries until either there are no more
- // messages available, or the maximum number of messages have
- // been collected.
- DCHECK(worker_messages->empty());
- // TODO(harshad) - Make this function generic enough so that it
- // works well when multiple queries are getting executed.
- std::size_t per_query_share = 0;
- if (!admitted_queries_.empty()) {
- per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
- } else {
- LOG(WARNING) << "Requesting WorkerMessages when no query is running";
- return;
- }
- DCHECK_GT(per_query_share, 0u);
- std::vector<std::size_t> finished_queries_ids;
-
- for (const auto &admitted_query_info : admitted_queries_) {
- QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
- DCHECK(curr_query_manager != nullptr);
- std::size_t messages_collected_curr_query = 0;
- while (messages_collected_curr_query < per_query_share) {
- WorkerMessage *next_worker_message =
- static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
- if (next_worker_message != nullptr) {
- ++messages_collected_curr_query;
- worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
- } else {
- // No more work ordes from the current query at this time.
- // Check if the query's execution is over.
- if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
- // If the query has been executed, remove it.
- finished_queries_ids.push_back(admitted_query_info.first);
- }
- break;
- }
- }
- }
- for (const std::size_t finished_qid : finished_queries_ids) {
- removeQuery(finished_qid);
- }
-}
-
-void PolicyEnforcer::removeQuery(const std::size_t query_id) {
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
- if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
- LOG(WARNING) << "Removing query with ID " << query_id
- << " that hasn't finished its execution";
- }
- admitted_queries_.erase(query_id);
-}
-
-bool PolicyEnforcer::admitQueries(
- const std::vector<QueryHandle*> &query_handles) {
- for (QueryHandle *curr_query : query_handles) {
- if (!admitQuery(curr_query)) {
- return false;
- }
- }
- return true;
-}
-
-void PolicyEnforcer::recordTimeForWorkOrder(
- const serialization::NormalWorkOrderCompletionMessage &proto) {
- const std::size_t query_id = proto.query_id();
- if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
- workorder_time_recorder_[query_id];
- }
- workorder_time_recorder_[query_id].emplace_back(
- proto.worker_thread_index(),
- proto.operator_index(),
- proto.execution_time_in_microseconds());
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
deleted file mode 100644
index 8bd6d92..0000000
--- a/query_execution/PolicyEnforcer.hpp
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <tuple>
-#include <unordered_map>
-#include <vector>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerDirectory;
-
-namespace serialization { class NormalWorkOrderCompletionMessage; }
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A class that ensures that a high level policy is maintained
- * in sharing resources among concurrent queries.
- **/
-class PolicyEnforcer {
- public:
- /**
- * @brief Constructor.
- *
- * @param foreman_client_id The TMB client ID of the Foreman.
- * @param num_numa_nodes Number of NUMA nodes used by the system.
- * @param catalog_database The CatalogDatabase used.
- * @param storage_manager The StorageManager used.
- * @param bus The TMB.
- **/
- PolicyEnforcer(const tmb::client_id foreman_client_id,
- const std::size_t num_numa_nodes,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- WorkerDirectory *worker_directory,
- tmb::MessageBus *bus,
- const bool profile_individual_workorders = false)
- : foreman_client_id_(foreman_client_id),
- num_numa_nodes_(num_numa_nodes),
- catalog_database_(catalog_database),
- storage_manager_(storage_manager),
- worker_directory_(worker_directory),
- bus_(bus),
- profile_individual_workorders_(profile_individual_workorders) {}
-
- /**
- * @brief Destructor.
- **/
- ~PolicyEnforcer() {
- if (hasQueries()) {
- LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
- "waiting queries";
- }
- }
-
- /**
- * @brief Admit a query to the system.
- *
- * @param query_handle The QueryHandle for the new query.
- *
- * @return Whether the query was admitted to the system.
- **/
- bool admitQuery(QueryHandle *query_handle);
-
- /**
- * @brief Admit multiple queries in the system.
- *
- * @note In the current simple implementation, we only allow one active
- * query in the system. Other queries will have to wait.
- *
- * @param query_handles A vector of QueryHandles for the queries to be
- * admitted.
- *
- * @return True if all the queries were admitted, false if at least one query
- * was not admitted.
- **/
- bool admitQueries(const std::vector<QueryHandle*> &query_handles);
-
- /**
- * @brief Remove a given query that is under execution.
- *
- * @note This function is made public so that it is possible for a query
- * to be killed. Otherwise, it should only be used privately by the
- * class.
- *
- * TODO(harshad) - Extend this function to support removal of waiting queries.
- *
- * @param query_id The ID of the query to be removed.
- **/
- void removeQuery(const std::size_t query_id);
-
- /**
- * @brief Get worker messages to be dispatched. These worker messages come
- * from the active queries.
- *
- * @param worker_messages The worker messages to be dispatched.
- **/
- void getWorkerMessages(
- std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
-
- /**
- * @brief Process a message sent to the Foreman, which gets passed on to the
- * policy enforcer.
- *
- * @param message The message.
- **/
- void processMessage(const TaggedMessage &tagged_message);
-
- /**
- * @brief Check if there are any queries to be executed.
- *
- * @return True if there is at least one active or waiting query, false if
- * the policy enforcer doesn't have any query.
- **/
- inline bool hasQueries() const {
- return !(admitted_queries_.empty() && waiting_queries_.empty());
- }
-
- /**
- * @brief Get the profiling results for individual work order execution for a
- * given query.
- *
- * @note This function should only be called if profiling individual work
- * orders option is enabled.
- *
- * @param query_id The ID of the query for which the profiling results are
- * requested.
- *
- * @return A vector of tuples, each being a single profiling entry.
- **/
- inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
- getProfilingResults(const std::size_t query_id) const {
- DCHECK(profile_individual_workorders_);
- DCHECK(workorder_time_recorder_.find(query_id) !=
- workorder_time_recorder_.end());
- return workorder_time_recorder_.at(query_id);
- }
-
- private:
- static constexpr std::size_t kMaxConcurrentQueries = 1;
-
- /**
- * @brief Record the execution time for a finished WorkOrder.
- *
- * TODO(harshad) - Extend the functionality to rebuild work orders.
- *
- * @param proto The completion message proto sent after the WorkOrder
- * execution.
- **/
- void recordTimeForWorkOrder(
- const serialization::NormalWorkOrderCompletionMessage &proto);
-
- const tmb::client_id foreman_client_id_;
- const std::size_t num_numa_nodes_;
-
- CatalogDatabaseLite *catalog_database_;
- StorageManager *storage_manager_;
- WorkerDirectory *worker_directory_;
-
- tmb::MessageBus *bus_;
- const bool profile_individual_workorders_;
-
- // Key = query ID, value = QueryManagerBase* for the key query.
- std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
-
- // The queries which haven't been admitted yet.
- std::queue<QueryHandle*> waiting_queries_;
-
- // Key = Query ID.
- // Value = A tuple indicating a record of executing a work order.
- // Within a tuple ...
- // 1st element: Logical worker ID.
- // 2nd element: Operator ID.
- // 3rd element: Time in microseconds to execute the work order.
- std::unordered_map<
- std::size_t,
- std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
- workorder_time_recorder_;
-
- DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
new file mode 100644
index 0000000..d16a502
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
+ std::size_t query_id;
+ QueryManagerBase::dag_node_index op_index;
+
+ switch (tagged_message.message_type()) {
+ case kWorkOrderCompleteMessage: {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // WorkOrder. It can be accessed in this scope.
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+ if (profile_individual_workorders_) {
+ recordTimeForWorkOrder(proto);
+ }
+
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
+ break;
+ }
+ case kRebuildWorkOrderCompleteMessage: {
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // rebuild WorkOrder. It can be accessed in this scope.
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: {
+ serialization::CatalogRelationNewBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+
+ const block_id block = proto.block_id();
+
+ CatalogRelation *relation =
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+ relation->addBlock(block);
+
+ if (proto.has_partition_id()) {
+ relation->getPartitionSchemeMutable()->addBlockToPartition(
+ proto.partition_id(), block);
+ }
+ return;
+ }
+ case kDataPipelineMessage: {
+ serialization::DataPipelineMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processDataPipelineMessage(
+ op_index, proto.block_id(), proto.relation_id());
+ break;
+ }
+ case kWorkOrdersAvailableMessage: {
+ serialization::WorkOrdersAvailableMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+
+ // Check if new work orders are available.
+ admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
+ break;
+ }
+ case kWorkOrderFeedbackMessage: {
+ WorkOrder::FeedbackMessage msg(
+ const_cast<void *>(tagged_message.message()),
+ tagged_message.message_bytes());
+ query_id = msg.header().query_id;
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = msg.header().rel_op_index;
+ admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+ }
+ if (admitted_queries_[query_id]->queryStatus(op_index) ==
+ QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+ LOG(WARNING) << "Removing query with ID " << query_id
+ << " that hasn't finished its execution";
+ }
+ admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcerBase::admitQueries(
+ const std::vector<QueryHandle*> &query_handles) {
+ for (QueryHandle *curr_query : query_handles) {
+ if (!admitQuery(curr_query)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void PolicyEnforcerBase::recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto) {
+ const std::size_t query_id = proto.query_id();
+ if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+ workorder_time_recorder_[query_id];
+ }
+ workorder_time_recorder_[query_id].emplace_back(
+ proto.worker_thread_index(),
+ proto.operator_index(),
+ proto.execution_time_in_microseconds());
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
new file mode 100644
index 0000000..0482ebc
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -0,0 +1,196 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A base class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param catalog_database The CatalogDatabase used.
+ * @param profile_individual_workorders If true, profile each normal work order.
+ **/
+ PolicyEnforcerBase(CatalogDatabaseLite *catalog_database,
+ const bool profile_individual_workorders)
+ : catalog_database_(catalog_database),
+ profile_individual_workorders_(profile_individual_workorders) {}
+
+ /**
+ * @brief Virtual Destructor.
+ **/
+ virtual ~PolicyEnforcerBase() {
+ if (hasQueries()) {
+ LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+ "waiting queries";
+ }
+ }
+
+ /**
+ * @brief Admit multiple queries in the system.
+ *
+ * @note In the current simple implementation, we only allow one active
+ * query in the system. Other queries will have to wait.
+ *
+ * @param query_handles A vector of QueryHandles for the queries to be
+ * admitted.
+ *
+ * @return True if all the queries were admitted, false if at least one query
+ * was not admitted.
+ **/
+ bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+ /**
+ * @brief Remove a given query that is under execution.
+ *
+ * @note This function is made public so that it is possible for a query
+ * to be killed. Otherwise, it should only be used privately by the
+ * class.
+ *
+ * TODO(harshad) - Extend this function to support removal of waiting queries.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
+ void removeQuery(const std::size_t query_id);
+
+ /**
+ * @brief Process a message sent to the Foreman, which gets passed on to the
+ * policy enforcer.
+ *
+ * @param message The message.
+ **/
+ void processMessage(const TaggedMessage &tagged_message);
+
+ /**
+ * @brief Check if there are any queries to be executed.
+ *
+ * @return True if there is at least one active or waiting query, false if
+ * the policy enforcer doesn't have any query.
+ **/
+ inline bool hasQueries() const {
+ return !(admitted_queries_.empty() && waiting_queries_.empty());
+ }
+
+ /**
+ * @brief Get the profiling results for individual work order execution for a
+ * given query.
+ *
+ * @note This function should only be called if profiling individual work
+ * orders option is enabled.
+ *
+ * @param query_id The ID of the query for which the profiling results are
+ * requested.
+ *
+ * @return A vector of tuples, each being a single profiling entry.
+ **/
+ inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+ getProfilingResults(const std::size_t query_id) const {
+ DCHECK(profile_individual_workorders_);
+ DCHECK(workorder_time_recorder_.find(query_id) !=
+ workorder_time_recorder_.end());
+ return workorder_time_recorder_.at(query_id);
+ }
+
+ protected:
+ static constexpr std::size_t kMaxConcurrentQueries = 1;
+
+ /**
+ * @brief Record the execution time for a finished WorkOrder.
+ *
+ * TODO(harshad) - Extend the functionality to rebuild work orders.
+ *
+ * @param proto The completion message proto sent after the WorkOrder
+ * execution.
+ **/
+ void recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto);
+
+ CatalogDatabaseLite *catalog_database_;
+
+ const bool profile_individual_workorders_;
+
+ // Key = query ID, value = QueryManagerBase* for the key query.
+ std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
+
+ // The queries which haven't been admitted yet.
+ std::queue<QueryHandle*> waiting_queries_;
+
+ // Key = Query ID.
+ // Value = A tuple indicating a record of executing a work order.
+ // Within a tuple ...
+ // 1st element: Logical worker ID.
+ // 2nd element: Operator ID.
+ // 3rd element: Time in microseconds to execute the work order.
+ std::unordered_map<
+ std::size_t,
+ std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+ workorder_time_recorder_;
+
+ private:
+ /**
+ * @brief Admit a query to the system.
+ *
+ * @param query_handle The QueryHandle for the new query.
+ *
+ * @return Whether the query was admitted to the system.
+ **/
+ virtual bool admitQuery(QueryHandle *query_handle) = 0;
+
+ /**
+ * @brief Decrement the number of queued workorders for the given worker by 1.
+ *
+ * @param worker_index The logical ID of the given worker.
+ **/
+ virtual void decrementNumQueuedWorkOrders(const std::size_t worker_index) = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/PolicyEnforcerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.cpp b/query_execution/PolicyEnforcerSingleNode.cpp
new file mode 100644
index 0000000..549e39f
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.cpp
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+ " can be allocated in a single round of dispatch of messages to"
+ " the workers.");
+
+void PolicyEnforcerSingleNode::getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(worker_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ std::size_t per_query_share = 0;
+ if (!admitted_queries_.empty()) {
+ per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+ } else {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+ DCHECK_GT(per_query_share, 0u);
+ std::vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ WorkerMessage *next_worker_message =
+ static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
+ if (next_worker_message != nullptr) {
+ ++messages_collected_curr_query;
+ worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (const std::size_t finished_qid : finished_queries_ids) {
+ removeQuery(finished_qid);
+ }
+}
+
+bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
+ if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PolicyEnforcerSingleNode::decrementNumQueuedWorkOrders(const std::size_t worker_index) {
+ worker_directory_->decrementNumQueuedWorkOrders(worker_index);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95c451a5/query_execution/PolicyEnforcerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.hpp b/query_execution/PolicyEnforcerSingleNode.hpp
new file mode 100644
index 0000000..671fd83
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.hpp
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerDirectory;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the Foreman.
+ * @param num_numa_nodes Number of NUMA nodes used by the system.
+ * @param catalog_database The CatalogDatabase used.
+ * @param storage_manager The StorageManager used.
+ * @param bus The TMB.
+ **/
+ PolicyEnforcerSingleNode(const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ const bool profile_individual_workorders = false)
+ : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+ foreman_client_id_(foreman_client_id),
+ num_numa_nodes_(num_numa_nodes),
+ storage_manager_(storage_manager),
+ worker_directory_(worker_directory),
+ bus_(bus) {}
+
+ /**
+ * @brief Destructor.
+ **/
+ ~PolicyEnforcerSingleNode() override {}
+
+ bool admitQuery(QueryHandle *query_handle) override;
+
+ /**
+ * @brief Get worker messages to be dispatched. These worker messages come
+ * from the active queries.
+ *
+ * @param worker_messages The worker messages to be dispatched.
+ **/
+ void getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+ private:
+ void decrementNumQueuedWorkOrders(const std::size_t worker_index) override;
+
+ const tmb::client_id foreman_client_id_;
+ const std::size_t num_numa_nodes_;
+
+ StorageManager *storage_manager_;
+ WorkerDirectory *worker_directory_;
+
+ tmb::MessageBus *bus_;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerSingleNode);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_