You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ch...@apache.org on 2016/07/28 02:57:27 UTC

[10/15] incubator-quickstep git commit: Introduced PolicyEnforcerBase and its single-node implementation.

Introduced PolicyEnforcerBase and its single-node implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/076d6991
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/076d6991
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/076d6991

Branch: refs/heads/types-readme
Commit: 076d699104e2d3e12d1e94d0b1e2bc3ba75cfff5
Parents: 2249b34
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Jul 18 12:18:04 2016 -0700
Committer: Craig Chasseur <sp...@gmail.com>
Committed: Wed Jul 27 19:56:58 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt               |  23 +-
 query_execution/ForemanSingleNode.cpp        |   6 +-
 query_execution/ForemanSingleNode.hpp        |   7 +-
 query_execution/PolicyEnforcer.cpp           | 254 ----------------------
 query_execution/PolicyEnforcer.hpp           | 223 -------------------
 query_execution/PolicyEnforcerBase.cpp       | 177 +++++++++++++++
 query_execution/PolicyEnforcerBase.hpp       | 196 +++++++++++++++++
 query_execution/PolicyEnforcerSingleNode.cpp | 110 ++++++++++
 query_execution/PolicyEnforcerSingleNode.hpp | 104 +++++++++
 9 files changed, 612 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8c12a5d..f582ba5 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -34,7 +34,8 @@ if (ENABLE_DISTRIBUTED)
 endif()
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
-add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
             ${queryexecution_QueryContext_proto_srcs}
@@ -85,7 +86,7 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanBase
-                      quickstep_queryexecution_PolicyEnforcer
+                      quickstep_queryexecution_PolicyEnforcerSingleNode
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_WorkerDirectory
@@ -95,22 +96,29 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       quickstep_utility_Macros
                       tmb
                       ${GFLAGS_LIB_NAME})
-target_link_libraries(quickstep_queryexecution_PolicyEnforcer
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
                       glog
                       quickstep_catalog_CatalogDatabase
                       quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionState
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryManagerBase
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_PolicyEnforcerBase
+                      quickstep_queryexecution_QueryExecutionState
+                      quickstep_queryexecution_QueryManagerBase
                       quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
                       tmb
                       ${GFLAGS_LIB_NAME})
@@ -244,7 +252,8 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanBase
                       quickstep_queryexecution_ForemanSingleNode
-                      quickstep_queryexecution_PolicyEnforcer
+                      quickstep_queryexecution_PolicyEnforcerBase
+                      quickstep_queryexecution_PolicyEnforcerSingleNode
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryexecution_QueryExecutionMessages_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 3aa1f0b..cda02a7 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -36,6 +37,7 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
 
@@ -46,6 +48,8 @@ using std::vector;
 
 namespace quickstep {
 
+class QueryHandle;
+
 DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
               "of pending work orders for the worker. This information is used "
               "by the Foreman to assign work orders to worker threads");
@@ -88,7 +92,7 @@ ForemanSingleNode::ForemanSingleNode(
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
 
-  policy_enforcer_.reset(new PolicyEnforcer(
+  policy_enforcer_.reset(new PolicyEnforcerSingleNode(
       foreman_client_id_,
       num_numa_nodes,
       catalog_database_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index 7506d35..caef5e0 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -24,11 +24,12 @@
 #include <vector>
 
 #include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
+
+namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
@@ -128,7 +129,7 @@ class ForemanSingleNode final : public ForemanBase {
   CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
 
-  std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+  std::unique_ptr<PolicyEnforcerSingleNode> policy_enforcer_;
 
   DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
deleted file mode 100644
index 4cba8c5..0000000
--- a/query_execution/PolicyEnforcer.cpp
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *     University of Wisconsin\u2014Madison.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#include "query_execution/PolicyEnforcer.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/QueryManagerSingleNode.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-namespace quickstep {
-
-DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
-              " can be allocated in a single round of dispatch of messages to"
-              " the workers.");
-
-bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < kMaxConcurrentQueries) {
-    // Ok to admit the query.
-    const std::size_t query_id = query_handle->query_id();
-    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-      // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
-                                     catalog_database_, storage_manager_, bus_));
-      return true;
-    } else {
-      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
-      return false;
-    }
-  } else {
-    // This query will have to wait.
-    waiting_queries_.push(query_handle);
-    return false;
-  }
-}
-
-void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
-  std::size_t query_id;
-  QueryManagerBase::dag_node_index op_index;
-
-  switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage: {
-      serialization::NormalWorkOrderCompletionMessage proto;
-      // Note: This proto message contains the time it took to execute the
-      // WorkOrder. It can be accessed in this scope.
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      worker_directory_->decrementNumQueuedWorkOrders(
-          proto.worker_thread_index());
-      if (profile_individual_workorders_) {
-        recordTimeForWorkOrder(proto);
-      }
-
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
-      break;
-    }
-    case kRebuildWorkOrderCompleteMessage: {
-      serialization::RebuildWorkOrderCompletionMessage proto;
-      // Note: This proto message contains the time it took to execute the
-      // rebuild WorkOrder. It can be accessed in this scope.
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      worker_directory_->decrementNumQueuedWorkOrders(
-          proto.worker_thread_index());
-
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
-      break;
-    }
-    case kCatalogRelationNewBlockMessage: {
-      serialization::CatalogRelationNewBlockMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      const block_id block = proto.block_id();
-
-      CatalogRelation *relation =
-          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
-      relation->addBlock(block);
-
-      if (proto.has_partition_id()) {
-        relation->getPartitionSchemeMutable()->addBlockToPartition(
-            proto.partition_id(), block);
-      }
-      return;
-    }
-    case kDataPipelineMessage: {
-      serialization::DataPipelineMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processDataPipelineMessage(
-          op_index, proto.block_id(), proto.relation_id());
-      break;
-    }
-    case kWorkOrdersAvailableMessage: {
-      serialization::WorkOrdersAvailableMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-
-      // Check if new work orders are available.
-      admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
-      break;
-    }
-    case kWorkOrderFeedbackMessage: {
-      WorkOrder::FeedbackMessage msg(
-          const_cast<void *>(tagged_message.message()),
-          tagged_message.message_bytes());
-      query_id = msg.header().query_id;
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = msg.header().rel_op_index;
-      admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
-  }
-  if (admitted_queries_[query_id]->queryStatus(op_index) ==
-          QueryManagerBase::QueryStatusCode::kQueryExecuted) {
-    removeQuery(query_id);
-    if (!waiting_queries_.empty()) {
-      // Admit the earliest waiting query.
-      QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
-    }
-  }
-}
-
-void PolicyEnforcer::getWorkerMessages(
-    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
-  // Iterate over admitted queries until either there are no more
-  // messages available, or the maximum number of messages have
-  // been collected.
-  DCHECK(worker_messages->empty());
-  // TODO(harshad) - Make this function generic enough so that it
-  // works well when multiple queries are getting executed.
-  std::size_t per_query_share = 0;
-  if (!admitted_queries_.empty()) {
-    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
-  } else {
-    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
-    return;
-  }
-  DCHECK_GT(per_query_share, 0u);
-  std::vector<std::size_t> finished_queries_ids;
-
-  for (const auto &admitted_query_info : admitted_queries_) {
-    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
-    DCHECK(curr_query_manager != nullptr);
-    std::size_t messages_collected_curr_query = 0;
-    while (messages_collected_curr_query < per_query_share) {
-      WorkerMessage *next_worker_message =
-          static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
-      if (next_worker_message != nullptr) {
-        ++messages_collected_curr_query;
-        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
-      } else {
-        // No more work ordes from the current query at this time.
-        // Check if the query's execution is over.
-        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          // If the query has been executed, remove it.
-          finished_queries_ids.push_back(admitted_query_info.first);
-        }
-        break;
-      }
-    }
-  }
-  for (const std::size_t finished_qid : finished_queries_ids) {
-    removeQuery(finished_qid);
-  }
-}
-
-void PolicyEnforcer::removeQuery(const std::size_t query_id) {
-  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
-    LOG(WARNING) << "Removing query with ID " << query_id
-                 << " that hasn't finished its execution";
-  }
-  admitted_queries_.erase(query_id);
-}
-
-bool PolicyEnforcer::admitQueries(
-    const std::vector<QueryHandle*> &query_handles) {
-  for (QueryHandle *curr_query : query_handles) {
-    if (!admitQuery(curr_query)) {
-      return false;
-    }
-  }
-  return true;
-}
-
-void PolicyEnforcer::recordTimeForWorkOrder(
-    const serialization::NormalWorkOrderCompletionMessage &proto) {
-  const std::size_t query_id = proto.query_id();
-  if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
-    workorder_time_recorder_[query_id];
-  }
-  workorder_time_recorder_[query_id].emplace_back(
-      proto.worker_thread_index(),
-      proto.operator_index(),
-      proto.execution_time_in_microseconds());
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
deleted file mode 100644
index 8bd6d92..0000000
--- a/query_execution/PolicyEnforcer.hpp
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *     University of Wisconsin\u2014Madison.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <tuple>
-#include <unordered_map>
-#include <vector>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerDirectory;
-
-namespace serialization { class NormalWorkOrderCompletionMessage; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that ensures that a high level policy is maintained
- *        in sharing resources among concurrent queries.
- **/
-class PolicyEnforcer {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param foreman_client_id The TMB client ID of the Foreman.
-   * @param num_numa_nodes Number of NUMA nodes used by the system.
-   * @param catalog_database The CatalogDatabase used.
-   * @param storage_manager The StorageManager used.
-   * @param bus The TMB.
-   **/
-  PolicyEnforcer(const tmb::client_id foreman_client_id,
-                 const std::size_t num_numa_nodes,
-                 CatalogDatabaseLite *catalog_database,
-                 StorageManager *storage_manager,
-                 WorkerDirectory *worker_directory,
-                 tmb::MessageBus *bus,
-                 const bool profile_individual_workorders = false)
-      : foreman_client_id_(foreman_client_id),
-        num_numa_nodes_(num_numa_nodes),
-        catalog_database_(catalog_database),
-        storage_manager_(storage_manager),
-        worker_directory_(worker_directory),
-        bus_(bus),
-        profile_individual_workorders_(profile_individual_workorders) {}
-
-  /**
-   * @brief Destructor.
-   **/
-  ~PolicyEnforcer() {
-    if (hasQueries()) {
-      LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
-                      "waiting queries";
-    }
-  }
-
-  /**
-   * @brief Admit a query to the system.
-   *
-   * @param query_handle The QueryHandle for the new query.
-   *
-   * @return Whether the query was admitted to the system.
-   **/
-  bool admitQuery(QueryHandle *query_handle);
-
-  /**
-   * @brief Admit multiple queries in the system.
-   *
-   * @note In the current simple implementation, we only allow one active
-   *       query in the system. Other queries will have to wait.
-   *
-   * @param query_handles A vector of QueryHandles for the queries to be
-   *        admitted.
-   *
-   * @return True if all the queries were admitted, false if at least one query
-   *         was not admitted.
-   **/
-  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
-
-  /**
-   * @brief Remove a given query that is under execution.
-   *
-   * @note This function is made public so that it is possible for a query
-   *       to be killed. Otherwise, it should only be used privately by the
-   *       class.
-   *
-   * TODO(harshad) - Extend this function to support removal of waiting queries.
-   *
-   * @param query_id The ID of the query to be removed.
-   **/
-  void removeQuery(const std::size_t query_id);
-
-  /**
-   * @brief Get worker messages to be dispatched. These worker messages come
-   *        from the active queries.
-   *
-   * @param worker_messages The worker messages to be dispatched.
-   **/
-  void getWorkerMessages(
-      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
-
-  /**
-   * @brief Process a message sent to the Foreman, which gets passed on to the
-   *        policy enforcer.
-   *
-   * @param message The message.
-   **/
-  void processMessage(const TaggedMessage &tagged_message);
-
-  /**
-   * @brief Check if there are any queries to be executed.
-   *
-   * @return True if there is at least one active or waiting query, false if
-   *         the policy enforcer doesn't have any query.
-   **/
-  inline bool hasQueries() const {
-    return !(admitted_queries_.empty() && waiting_queries_.empty());
-  }
-
-  /**
-   * @brief Get the profiling results for individual work order execution for a
-   *        given query.
-   *
-   * @note This function should only be called if profiling individual work
-   *       orders option is enabled.
-   *
-   * @param query_id The ID of the query for which the profiling results are
-   *        requested.
-   *
-   * @return A vector of tuples, each being a single profiling entry.
-   **/
-  inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
-      getProfilingResults(const std::size_t query_id) const {
-    DCHECK(profile_individual_workorders_);
-    DCHECK(workorder_time_recorder_.find(query_id) !=
-           workorder_time_recorder_.end());
-    return workorder_time_recorder_.at(query_id);
-  }
-
- private:
-  static constexpr std::size_t kMaxConcurrentQueries = 1;
-
-  /**
-   * @brief Record the execution time for a finished WorkOrder.
-   *
-   * TODO(harshad) - Extend the functionality to rebuild work orders.
-   *
-   * @param proto The completion message proto sent after the WorkOrder
-   *        execution.
-   **/
-  void recordTimeForWorkOrder(
-      const serialization::NormalWorkOrderCompletionMessage &proto);
-
-  const tmb::client_id foreman_client_id_;
-  const std::size_t num_numa_nodes_;
-
-  CatalogDatabaseLite *catalog_database_;
-  StorageManager *storage_manager_;
-  WorkerDirectory *worker_directory_;
-
-  tmb::MessageBus *bus_;
-  const bool profile_individual_workorders_;
-
-  // Key = query ID, value = QueryManagerBase* for the key query.
-  std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
-
-  // The queries which haven't been admitted yet.
-  std::queue<QueryHandle*> waiting_queries_;
-
-  // Key = Query ID.
-  // Value = A tuple indicating a record of executing a work order.
-  // Within a tuple ...
-  // 1st element: Logical worker ID.
-  // 2nd element: Operator ID.
-  // 3rd element: Time in microseconds to execute the work order.
-  std::unordered_map<
-      std::size_t,
-      std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
-      workorder_time_recorder_;
-
-  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
new file mode 100644
index 0000000..d16a502
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -0,0 +1,177 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
+  std::size_t query_id;
+  QueryManagerBase::dag_node_index op_index;
+
+  switch (tagged_message.message_type()) {
+    case kWorkOrderCompleteMessage: {
+      serialization::NormalWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // WorkOrder. It can be accessed in this scope.
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+      if (profile_individual_workorders_) {
+        recordTimeForWorkOrder(proto);
+      }
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
+      break;
+    }
+    case kRebuildWorkOrderCompleteMessage: {
+      serialization::RebuildWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // rebuild WorkOrder. It can be accessed in this scope.
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
+      break;
+    }
+    case kCatalogRelationNewBlockMessage: {
+      serialization::CatalogRelationNewBlockMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+
+      const block_id block = proto.block_id();
+
+      CatalogRelation *relation =
+          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+      relation->addBlock(block);
+
+      if (proto.has_partition_id()) {
+        relation->getPartitionSchemeMutable()->addBlockToPartition(
+            proto.partition_id(), block);
+      }
+      return;
+    }
+    case kDataPipelineMessage: {
+      serialization::DataPipelineMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processDataPipelineMessage(
+          op_index, proto.block_id(), proto.relation_id());
+      break;
+    }
+    case kWorkOrdersAvailableMessage: {
+      serialization::WorkOrdersAvailableMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+
+      // Check if new work orders are available.
+      admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
+      break;
+    }
+    case kWorkOrderFeedbackMessage: {
+      WorkOrder::FeedbackMessage msg(
+          const_cast<void *>(tagged_message.message()),
+          tagged_message.message_bytes());
+      query_id = msg.header().query_id;
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = msg.header().rel_op_index;
+      admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+  }
+  if (admitted_queries_[query_id]->queryStatus(op_index) ==
+          QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+    removeQuery(query_id);
+    if (!waiting_queries_.empty()) {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
+  }
+}
+
+void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+    LOG(WARNING) << "Removing query with ID " << query_id
+                 << " that hasn't finished its execution";
+  }
+  admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcerBase::admitQueries(
+    const std::vector<QueryHandle*> &query_handles) {
+  for (QueryHandle *curr_query : query_handles) {
+    if (!admitQuery(curr_query)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void PolicyEnforcerBase::recordTimeForWorkOrder(
+    const serialization::NormalWorkOrderCompletionMessage &proto) {
+  const std::size_t query_id = proto.query_id();
+  if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+    workorder_time_recorder_[query_id];
+  }
+  workorder_time_recorder_[query_id].emplace_back(
+      proto.worker_thread_index(),
+      proto.operator_index(),
+      proto.execution_time_in_microseconds());
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
new file mode 100644
index 0000000..0482ebc
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -0,0 +1,196 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A base class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param catalog_database The CatalogDatabase used.
+   * @param profile_individual_workorders If true, profile each normal work order.
+   **/
+  PolicyEnforcerBase(CatalogDatabaseLite *catalog_database,
+                     const bool profile_individual_workorders)
+      : catalog_database_(catalog_database),
+        profile_individual_workorders_(profile_individual_workorders) {}
+
+  /**
+   * @brief Virtual Destructor.
+   **/
+  virtual ~PolicyEnforcerBase() {
+    if (hasQueries()) {
+      LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+                      "waiting queries";
+    }
+  }
+
+  /**
+   * @brief Admit multiple queries in the system.
+   *
+   * @note In the current simple implementation, we only allow one active
+   *       query in the system. Other queries will have to wait.
+   *
+   * @param query_handles A vector of QueryHandles for the queries to be
+   *        admitted.
+   *
+   * @return True if all the queries were admitted, false if at least one query
+   *         was not admitted.
+   **/
+  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+  /**
+   * @brief Remove a given query that is under execution.
+   *
+   * @note This function is made public so that it is possible for a query
+   *       to be killed. Otherwise, it should only be used privately by the
+   *       class.
+   *
+   * TODO(harshad) - Extend this function to support removal of waiting queries.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
+  void removeQuery(const std::size_t query_id);
+
+  /**
+   * @brief Process a message sent to the Foreman, which gets passed on to the
+   *        policy enforcer.
+   *
+   * @param message The message.
+   **/
+  void processMessage(const TaggedMessage &tagged_message);
+
+  /**
+   * @brief Check if there are any queries to be executed.
+   *
+   * @return True if there is at least one active or waiting query, false if
+   *         the policy enforcer doesn't have any query.
+   **/
+  inline bool hasQueries() const {
+    return !(admitted_queries_.empty() && waiting_queries_.empty());
+  }
+
+  /**
+   * @brief Get the profiling results for individual work order execution for a
+   *        given query.
+   *
+   * @note This function should only be called if profiling individual work
+   *       orders option is enabled.
+   *
+   * @param query_id The ID of the query for which the profiling results are
+   *        requested.
+   *
+   * @return A vector of tuples, each being a single profiling entry.
+   **/
+  inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+      getProfilingResults(const std::size_t query_id) const {
+    DCHECK(profile_individual_workorders_);
+    DCHECK(workorder_time_recorder_.find(query_id) !=
+           workorder_time_recorder_.end());
+    return workorder_time_recorder_.at(query_id);
+  }
+
+ protected:
+  static constexpr std::size_t kMaxConcurrentQueries = 1;
+
+  /**
+   * @brief Record the execution time for a finished WorkOrder.
+   *
+   * TODO(harshad) - Extend the functionality to rebuild work orders.
+   *
+   * @param proto The completion message proto sent after the WorkOrder
+   *        execution.
+   **/
+  void recordTimeForWorkOrder(
+      const serialization::NormalWorkOrderCompletionMessage &proto);
+
+  CatalogDatabaseLite *catalog_database_;
+
+  const bool profile_individual_workorders_;
+
+  // Key = query ID, value = QueryManagerBase* for the key query.
+  std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
+
+  // The queries which haven't been admitted yet.
+  std::queue<QueryHandle*> waiting_queries_;
+
+  // Key = Query ID.
+  // Value = A tuple indicating a record of executing a work order.
+  // Within a tuple ...
+  // 1st element: Logical worker ID.
+  // 2nd element: Operator ID.
+  // 3rd element: Time in microseconds to execute the work order.
+  std::unordered_map<
+      std::size_t,
+      std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+      workorder_time_recorder_;
+
+ private:
+  /**
+   * @brief Admit a query to the system.
+   *
+   * @param query_handle The QueryHandle for the new query.
+   *
+   * @return Whether the query was admitted to the system.
+   **/
+  virtual bool admitQuery(QueryHandle *query_handle) = 0;
+
+  /**
+   * @brief Decrement the number of queued workorders for the given worker by 1.
+   *
+   * @param worker_index The logical ID of the given worker.
+   **/
+  virtual void decrementNumQueuedWorkOrders(const std::size_t worker_index) = 0;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/PolicyEnforcerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.cpp b/query_execution/PolicyEnforcerSingleNode.cpp
new file mode 100644
index 0000000..549e39f
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.cpp
@@ -0,0 +1,110 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+              " can be allocated in a single round of dispatch of messages to"
+              " the workers.");
+
+void PolicyEnforcerSingleNode::getWorkerMessages(
+    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(worker_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  std::size_t per_query_share = 0;
+  if (!admitted_queries_.empty()) {
+    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  } else {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+  DCHECK_GT(per_query_share, 0u);
+  std::vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      WorkerMessage *next_worker_message =
+          static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
+      if (next_worker_message != nullptr) {
+        ++messages_collected_curr_query;
+        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    removeQuery(finished_qid);
+  }
+}
+
+bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
+                                     catalog_database_, storage_manager_, bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcerSingleNode::decrementNumQueuedWorkOrders(const std::size_t worker_index) {
+  worker_directory_->decrementNumQueuedWorkOrders(worker_index);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/076d6991/query_execution/PolicyEnforcerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.hpp b/query_execution/PolicyEnforcerSingleNode.hpp
new file mode 100644
index 0000000..671fd83
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.hpp
@@ -0,0 +1,104 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerDirectory;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param num_numa_nodes Number of NUMA nodes used by the system.
+   * @param catalog_database The CatalogDatabase used.
+   * @param storage_manager The StorageManager used.
+   * @param bus The TMB.
+   **/
+  PolicyEnforcerSingleNode(const tmb::client_id foreman_client_id,
+                           const std::size_t num_numa_nodes,
+                           CatalogDatabaseLite *catalog_database,
+                           StorageManager *storage_manager,
+                           WorkerDirectory *worker_directory,
+                           tmb::MessageBus *bus,
+                           const bool profile_individual_workorders = false)
+      : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+        foreman_client_id_(foreman_client_id),
+        num_numa_nodes_(num_numa_nodes),
+        storage_manager_(storage_manager),
+        worker_directory_(worker_directory),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcerSingleNode() override {}
+
+  bool admitQuery(QueryHandle *query_handle) override;
+
+  /**
+   * @brief Get worker messages to be dispatched. These worker messages come
+   *        from the active queries.
+   *
+   * @param worker_messages The worker messages to be dispatched.
+   **/
+  void getWorkerMessages(
+      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+ private:
+  void decrementNumQueuedWorkOrders(const std::size_t worker_index) override;
+
+  const tmb::client_id foreman_client_id_;
+  const std::size_t num_numa_nodes_;
+
+  StorageManager *storage_manager_;
+  WorkerDirectory *worker_directory_;
+
+  tmb::MessageBus *bus_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerSingleNode);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_