You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sp...@apache.org on 2016/12/11 17:45:43 UTC

[20/51] [abbrv] [partial] incubator-quickstep git commit: remove c++ files

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
deleted file mode 100644
index 81684ba..0000000
--- a/query_execution/BlockLocator.cpp
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/BlockLocator.hpp"
-
-#include <cstdlib>
-#include <string>
-#include <utility>
-
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "threading/ThreadUtil.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-
-using tmb::TaggedMessage;
-using tmb::client_id;
-
-namespace quickstep {
-
-void BlockLocator::run() {
-  if (cpu_id_ >= 0) {
-    ThreadUtil::BindToCPU(cpu_id_);
-  }
-
-  for (;;) {
-    // Receive() is a blocking call, causing this thread to sleep until next
-    // message is received.
-    const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
-    const TaggedMessage &tagged_message = annotated_message.tagged_message;
-    const client_id sender = annotated_message.sender;
-    LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
-              << "' message from TMB Client " << sender;
-    switch (tagged_message.message_type()) {
-      case kBlockDomainRegistrationMessage: {
-        serialization::BlockDomainRegistrationMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
-        break;
-      }
-      case kAddBlockLocationMessage: {
-        serialization::BlockLocationMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const block_id block = proto.block_id();
-        const block_id_domain domain = proto.block_domain();
-
-        const auto result_block_locations = block_locations_[block].insert(domain);
-        const auto result_domain_blocks = domain_blocks_[domain].insert(block);
-        DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
-
-        if (result_domain_blocks.second) {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
-        } else {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
-        }
-        break;
-      }
-      case kDeleteBlockLocationMessage: {
-        serialization::BlockLocationMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const block_id block = proto.block_id();
-        const block_id_domain domain = proto.block_domain();
-
-        const auto cit = block_locations_[block].find(domain);
-        if (cit != block_locations_[block].end()) {
-          block_locations_[block].erase(domain);
-          domain_blocks_[domain].erase(block);
-
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
-        } else {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
-        }
-        break;
-      }
-      case kLocateBlockMessage: {
-        serialization::BlockMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processLocateBlockMessage(sender, proto.block_id());
-        break;
-      }
-      case kGetPeerDomainNetworkAddressesMessage: {
-        serialization::BlockMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id());
-        break;
-      }
-      case kBlockDomainUnregistrationMessage: {
-        serialization::BlockDomainMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const block_id_domain domain = proto.block_domain();
-
-        domain_network_addresses_.erase(domain);
-
-        for (const block_id block : domain_blocks_[domain]) {
-          block_locations_[block].erase(domain);
-        }
-        domain_blocks_.erase(domain);
-
-        LOG(INFO) << "Unregistered Domain " << domain;
-        break;
-      }
-      case kPoisonMessage: {
-        return;
-      }
-    }
-  }
-}
-
-void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver,
-                                                         const std::string &network_address) {
-  DCHECK_LT(block_domain_, kMaxDomain);
-
-  domain_network_addresses_.emplace(++block_domain_, network_address);
-  domain_blocks_[block_domain_];
-
-  serialization::BlockDomainMessage proto;
-  proto.set_block_domain(block_domain_);
-
-  const int proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kBlockDomainRegistrationResponseMessage);
-  free(proto_bytes);
-
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent BlockDomainRegistrationResponseMessage (typed '"
-            << kBlockDomainRegistrationResponseMessage
-            << "') to Worker (id '" << receiver << "')";
-  CHECK(tmb::MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         locator_client_id_,
-                                         receiver,
-                                         move(message)));
-}
-
-void BlockLocator::processLocateBlockMessage(const client_id receiver,
-                                             const block_id block) {
-  serialization::LocateBlockResponseMessage proto;
-
-  for (const block_id_domain domain : block_locations_[block]) {
-    proto.add_block_domains(domain);
-  }
-
-  const int proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kLocateBlockResponseMessage);
-  free(proto_bytes);
-
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
-            << "') to StorageManager (id '" << receiver << "')";
-  CHECK(tmb::MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         locator_client_id_,
-                                         receiver,
-                                         move(message)));
-}
-
-void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
-                                                               const block_id block) {
-  serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
-
-  for (const block_id_domain domain : block_locations_[block]) {
-    proto.add_domain_network_addresses(domain_network_addresses_[domain]);
-  }
-
-  const int proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kGetPeerDomainNetworkAddressesResponseMessage);
-  free(proto_bytes);
-
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
-            << kGetPeerDomainNetworkAddressesResponseMessage
-            << "') to StorageManager (id '" << receiver << "')";
-  CHECK(tmb::MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         locator_client_id_,
-                                         receiver,
-                                         move(message)));
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
deleted file mode 100644
index a83a394..0000000
--- a/query_execution/BlockLocator.hpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
-
-#include <atomic>
-#include <string>
-#include <unordered_map>
-#include <unordered_set>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class for keeping trace of blocks loaded in a Worker's buffer pool
- *        in the distributed version.
- **/
-class BlockLocator : public Thread {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param bus A pointer to the TMB.
-   * @param cpu_id The ID of the CPU to which the BlockLocator thread can be pinned.
-   *
-   * @note If cpu_id is not specified, BlockLocator thread can be possibly moved
-   *       around on different CPUs by the OS.
-  **/
-  BlockLocator(tmb::MessageBus *bus,
-               const int cpu_id = -1)
-      : bus_(DCHECK_NOTNULL(bus)),
-        cpu_id_(cpu_id),
-        block_domain_(0) {
-    locator_client_id_ = bus_->Connect();
-
-    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
-    bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
-
-    bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
-    bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
-
-    bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage);
-    bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage);
-
-    bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
-    bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
-
-    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
-    bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
-  }
-
-  ~BlockLocator() override {}
-
-  /**
-   * @brief Get the TMB client ID of BlockLocator thread.
-   *
-   * @return TMB client ID of BlockLocator thread.
-   **/
-  tmb::client_id getBusClientID() const {
-    return locator_client_id_;
-  }
-
- protected:
-  void run() override;
-
- private:
-  void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
-  void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
-  void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
-
-  tmb::MessageBus *bus_;
-
-  // The ID of the CPU that the BlockLocator thread can optionally be pinned to.
-  const int cpu_id_;
-
-  alignas(kCacheLineBytes) std::atomic<block_id_domain> block_domain_;
-
-  // From a block domain to its network info in the ip:port format, i.e.,
-  // "0.0.0.0:0".
-  std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
-
-  // From a block to its domains.
-  std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
-
-  // From a block domain to all blocks loaded in its buffer pool.
-  std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
-
-  tmb::client_id locator_client_id_;
-
-  DISALLOW_COPY_AND_ASSIGN(BlockLocator);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocatorUtil.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp
deleted file mode 100644
index d2d1e96..0000000
--- a/query_execution/BlockLocatorUtil.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/BlockLocatorUtil.hpp"
-
-#include <cstdlib>
-#include <string>
-#include <utility>
-
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/message_style.h"
-#include "tmb/tagged_message.h"
-
-using tmb::TaggedMessage;
-using tmb::MessageBus;
-using tmb::client_id;
-
-namespace quickstep {
-namespace block_locator {
-
-namespace S = ::quickstep::serialization;
-
-block_id_domain getBlockDomain(const std::string &network_address,
-                               const client_id cli_id,
-                               client_id *locator_client_id,
-                               MessageBus *bus) {
-  tmb::Address address;
-  address.All(true);
-  // NOTE(zuyu): The singleton BlockLocator would need only one copy of the message.
-  tmb::MessageStyle style;
-
-  S::BlockDomainRegistrationMessage proto;
-  proto.set_domain_network_address(network_address);
-
-  const int proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kBlockDomainRegistrationMessage);
-  std::free(proto_bytes);
-
-  DLOG(INFO) << "Client (id '" << cli_id
-             << "') broadcasts BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
-             << "') to BlockLocator.";
-
-  CHECK(MessageBus::SendStatus::kOK ==
-      bus->Send(cli_id, address, style, std::move(message)));
-
-  const tmb::AnnotatedMessage annotated_message(bus->Receive(cli_id, 0, true));
-  const TaggedMessage &tagged_message = annotated_message.tagged_message;
-  CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
-
-  *locator_client_id = annotated_message.sender;
-
-  DLOG(INFO) << "Client (id '" << cli_id
-             << "') received BlockDomainRegistrationResponseMessage (typed '"
-             << kBlockDomainRegistrationResponseMessage
-             << "') from BlockLocator (id '" << *locator_client_id << "').";
-
-  S::BlockDomainMessage response_proto;
-  CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-  return static_cast<block_id_domain>(response_proto.block_domain());
-}
-
-}  // namespace block_locator
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocatorUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.hpp b/query_execution/BlockLocatorUtil.hpp
deleted file mode 100644
index 74f65e4..0000000
--- a/query_execution/BlockLocatorUtil.hpp
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_
-
-#include <string>
-
-#include "storage/StorageBlockInfo.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-namespace block_locator {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief Broadcast to find BlockLocator to get a block domain for
- * StorageManager with the given network address.
- *
- * @param network_address The network address of the StorageManager.
- * @param cli_id The client ID of the block domain requester.
- * @param locator_client_id The client ID of BlockLocator to set.
- * @param bus A pointer to the TMB.
- *
- * @return The requested block domain.
- **/
-block_id_domain getBlockDomain(const std::string &network_address,
-                               const tmb::client_id cli_id,
-                               tmb::client_id *locator_client_id,
-                               tmb::MessageBus *bus);
-
-/** @} */
-
-}  // namespace block_locator
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
deleted file mode 100644
index 719d9f3..0000000
--- a/query_execution/CMakeLists.txt
+++ /dev/null
@@ -1,453 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryContext_proto_srcs queryexecution_QueryContext_proto_hdrs
-                         QueryContext.proto)
-QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs
-                         queryexecution_QueryExecutionMessages_proto_hdrs
-                         QueryExecutionMessages.proto)
-
-if (BUILD_SHARED_LIBS)
-  set(GFLAGS_LIB_NAME gflags_nothreads-shared)
-else()
-  set(GFLAGS_LIB_NAME gflags_nothreads-static)
-endif()
-
-# Declare micro-libs:
-add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
-if (ENABLE_DISTRIBUTED)
-  add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
-  add_library(quickstep_queryexecution_BlockLocatorUtil BlockLocatorUtil.cpp BlockLocatorUtil.hpp)
-endif(ENABLE_DISTRIBUTED)
-add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
-if (ENABLE_DISTRIBUTED)
-  add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
-endif(ENABLE_DISTRIBUTED)
-add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
-add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
-if (ENABLE_DISTRIBUTED)
-  add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp)
-endif(ENABLE_DISTRIBUTED)
-add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
-add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
-add_library(quickstep_queryexecution_QueryContext_proto
-            ${queryexecution_QueryContext_proto_srcs}
-            ${queryexecution_QueryContext_proto_hdrs})
-add_library(quickstep_queryexecution_QueryExecutionMessages_proto
-            ${queryexecution_QueryExecutionMessages_proto_srcs}
-            ${queryexecution_QueryExecutionMessages_proto_hdrs})
-add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryExecutionState.hpp)
-add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp)
-add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp)
-add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
-if (ENABLE_DISTRIBUTED)
-  add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
-endif(ENABLE_DISTRIBUTED)
-add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
-if (ENABLE_DISTRIBUTED)
-  add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
-  add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
-endif(ENABLE_DISTRIBUTED)
-add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
-add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
-add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
-add_library(quickstep_queryexecution_WorkerDirectory ../empty_src.cpp WorkerDirectory.hpp)
-add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessage.hpp)
-add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
-
-# Link dependencies:
-target_link_libraries(quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_utility_Macros)
-if (ENABLE_DISTRIBUTED)
-  target_link_libraries(quickstep_queryexecution_BlockLocator
-                        glog
-                        quickstep_queryexecution_QueryExecutionMessages_proto
-                        quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_queryexecution_QueryExecutionUtil
-                        quickstep_storage_StorageBlockInfo
-                        quickstep_storage_StorageConstants
-                        quickstep_threading_Thread
-                        quickstep_threading_ThreadUtil
-                        quickstep_utility_Macros
-                        tmb)
-  target_link_libraries(quickstep_queryexecution_BlockLocatorUtil
-                        glog
-                        quickstep_queryexecution_QueryExecutionMessages_proto
-                        quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_storage_StorageBlockInfo
-                        tmb)
-endif(ENABLE_DISTRIBUTED)
-target_link_libraries(quickstep_queryexecution_ForemanBase
-                      glog
-                      quickstep_queryexecution_PolicyEnforcerBase
-                      quickstep_threading_Thread
-                      quickstep_utility_Macros
-                      tmb)
-if (ENABLE_DISTRIBUTED)
-  target_link_libraries(quickstep_queryexecution_ForemanDistributed
-                        glog
-                        quickstep_catalog_CatalogDatabase
-                        quickstep_catalog_CatalogRelation
-                        quickstep_catalog_CatalogTypedefs
-                        quickstep_catalog_Catalog_proto
-                        quickstep_queryexecution_AdmitRequestMessage
-                        quickstep_queryexecution_ForemanBase
-                        quickstep_queryexecution_PolicyEnforcerBase
-                        quickstep_queryexecution_PolicyEnforcerDistributed
-                        quickstep_queryexecution_QueryExecutionMessages_proto
-                        quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_queryexecution_QueryExecutionUtil
-                        quickstep_queryexecution_ShiftbossDirectory
-                        quickstep_threading_ThreadUtil
-                        quickstep_utility_EqualsAnyConstant
-                        quickstep_utility_Macros
-                        tmb
-                        ${GFLAGS_LIB_NAME})
-endif(ENABLE_DISTRIBUTED)
-target_link_libraries(quickstep_queryexecution_ForemanSingleNode
-                      glog
-                      quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_ForemanBase
-                      quickstep_queryexecution_PolicyEnforcerBase
-                      quickstep_queryexecution_PolicyEnforcerSingleNode
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryExecutionUtil
-                      quickstep_queryexecution_WorkerDirectory
-                      quickstep_queryexecution_WorkerMessage
-                      quickstep_threading_ThreadUtil
-                      quickstep_utility_EqualsAnyConstant
-                      quickstep_utility_Macros
-                      tmb
-                      ${GFLAGS_LIB_NAME})
-target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
-                      glog
-                      quickstep_catalog_CatalogDatabase
-                      quickstep_catalog_CatalogRelation
-                      quickstep_catalog_PartitionScheme
-                      quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryManagerBase
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_utility_Macros
-                      tmb
-                      ${GFLAGS_LIB_NAME})
-if (ENABLE_DISTRIBUTED)
-  target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
-                        glog
-                        quickstep_catalog_CatalogRelation
-                        quickstep_catalog_Catalog_proto
-                        quickstep_queryexecution_PolicyEnforcerBase
-                        quickstep_queryexecution_QueryContext_proto
-                        quickstep_queryexecution_QueryExecutionMessages_proto
-                        quickstep_queryexecution_QueryExecutionState
-                        quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_queryexecution_QueryExecutionUtil
-                        quickstep_queryexecution_QueryManagerBase
-                        quickstep_queryexecution_QueryManagerDistributed
-                        quickstep_queryexecution_ShiftbossDirectory
-                        quickstep_queryoptimizer_QueryHandle
-                        quickstep_storage_StorageBlockInfo
-                        quickstep_utility_Macros
-                        tmb
-                        ${GFLAGS_LIB_NAME})
-endif(ENABLE_DISTRIBUTED)
-target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
-                      glog
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_PolicyEnforcerBase
-                      quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryexecution_QueryManagerBase
-                      quickstep_queryexecution_QueryManagerSingleNode
-                      quickstep_queryexecution_WorkerDirectory
-                      quickstep_queryexecution_WorkerMessage
-                      quickstep_queryoptimizer_QueryHandle
-                      quickstep_utility_Macros
-                      tmb
-                      ${GFLAGS_LIB_NAME})
-target_link_libraries(quickstep_queryexecution_QueryContext
-                      glog
-                      quickstep_catalog_CatalogDatabaseLite
-                      quickstep_catalog_CatalogRelationSchema
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_ExpressionFactories
-                      quickstep_expressions_predicate_Predicate
-                      quickstep_expressions_scalar_Scalar
-                      quickstep_expressions_tablegenerator_GeneratorFunctionFactory
-                      quickstep_expressions_tablegenerator_GeneratorFunctionHandle
-                      quickstep_expressions_tablegenerator_GeneratorFunction_proto
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_storage_AggregationOperationState
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableFactory
-                      quickstep_storage_InsertDestination
-                      quickstep_storage_InsertDestination_proto
-                      quickstep_storage_WindowAggregationOperationState
-                      quickstep_types_TypedValue
-                      quickstep_types_containers_Tuple
-                      quickstep_utility_Macros
-                      quickstep_utility_SortConfiguration
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_lipfilter_LIPFilterDeployment
-                      quickstep_utility_lipfilter_LIPFilterFactory)
-target_link_libraries(quickstep_queryexecution_QueryContext_proto
-                      quickstep_expressions_Expressions_proto
-                      quickstep_expressions_tablegenerator_GeneratorFunction_proto
-                      quickstep_storage_AggregationOperationState_proto
-                      quickstep_storage_HashTable_proto
-                      quickstep_storage_InsertDestination_proto
-                      quickstep_storage_WindowAggregationOperationState_proto
-                      quickstep_types_containers_Tuple_proto
-                      quickstep_utility_SortConfiguration_proto
-                      quickstep_utility_lipfilter_LIPFilter_proto
-                      ${PROTOBUF_LIBRARY})
-target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_catalog_Catalog_proto
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_relationaloperators_WorkOrder_proto
-                      ${PROTOBUF_LIBRARY})
-target_link_libraries(quickstep_queryexecution_QueryExecutionState
-                      glog
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_threading_ThreadIDBasedMap
-                      tmb)
-target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
-                      quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_utility_Macros
-                      tmb)
-target_link_libraries(quickstep_queryexecution_QueryManagerBase
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryoptimizer_QueryHandle
-                      quickstep_queryoptimizer_QueryPlan
-                      quickstep_relationaloperators_RelationalOperator
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_utility_DAG
-                      quickstep_utility_Macros)
-if (ENABLE_DISTRIBUTED)
-  target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
-                        quickstep_queryexecution_QueryContext
-                        quickstep_queryexecution_QueryExecutionMessages_proto
-                        quickstep_queryexecution_QueryExecutionState
-                        quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_queryexecution_QueryExecutionUtil
-                        quickstep_queryexecution_QueryManagerBase
-                        quickstep_queryexecution_ShiftbossDirectory
-                        quickstep_queryexecution_WorkOrderProtosContainer
-                        quickstep_relationaloperators_RelationalOperator
-                        quickstep_relationaloperators_WorkOrder_proto
-                        quickstep_utility_DAG
-                        quickstep_utility_Macros
-                        tmb)
-endif(ENABLE_DISTRIBUTED)
-target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryexecution_QueryManagerBase
-                      quickstep_queryexecution_WorkOrdersContainer
-                      quickstep_queryexecution_WorkerMessage
-                      quickstep_queryoptimizer_QueryHandle
-                      quickstep_relationaloperators_RebuildWorkOrder
-                      quickstep_relationaloperators_RelationalOperator
-                      quickstep_storage_InsertDestination
-                      quickstep_storage_StorageBlock
-                      quickstep_utility_DAG
-                      quickstep_utility_Macros
-                      tmb)
-if (ENABLE_DISTRIBUTED)
-  target_link_libraries(quickstep_queryexecution_Shiftboss
-                        glog
-                        quickstep_catalog_CatalogDatabaseCache
-                        quickstep_catalog_CatalogTypedefs
-                        quickstep_queryexecution_QueryContext
-                        quickstep_queryexecution_QueryExecutionMessages_proto
-                        quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_queryexecution_QueryExecutionUtil
-                        quickstep_queryexecution_WorkerDirectory
-                        quickstep_queryexecution_WorkerMessage
-                        quickstep_relationaloperators_RebuildWorkOrder
-                        quickstep_relationaloperators_WorkOrderFactory
-                        quickstep_storage_InsertDestination
-                        quickstep_storage_StorageBlock
-                        quickstep_storage_StorageManager
-                        quickstep_threading_Thread
-                        quickstep_threading_ThreadUtil
-                        quickstep_utility_Macros
-                        tmb)
-  target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
-                        quickstep_utility_Macros
-                        tmb)
-endif(ENABLE_DISTRIBUTED)
-target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
-                      glog
-                      quickstep_relationaloperators_WorkOrder_proto
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_queryexecution_WorkOrdersContainer
-                      glog
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_utility_Macros
-                      quickstep_utility_PtrVector)
-target_link_libraries(quickstep_queryexecution_Worker
-                      glog
-                      quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryExecutionUtil
-                      quickstep_queryexecution_WorkerMessage
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_threading_Thread
-                      quickstep_threading_ThreadIDBasedMap
-                      quickstep_threading_ThreadUtil
-                      quickstep_utility_Macros
-                      tmb)
-target_link_libraries(quickstep_queryexecution_WorkerDirectory
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
-                      quickstep_queryexecution_WorkerDirectory
-                      quickstep_utility_Macros)
-
-# Module all-in-one library:
-add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
-target_link_libraries(quickstep_queryexecution
-                      quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_ForemanBase
-                      quickstep_queryexecution_ForemanSingleNode
-                      quickstep_queryexecution_PolicyEnforcerBase
-                      quickstep_queryexecution_PolicyEnforcerSingleNode
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryExecutionUtil
-                      quickstep_queryexecution_QueryManagerBase
-                      quickstep_queryexecution_QueryManagerSingleNode
-                      quickstep_queryexecution_WorkOrderProtosContainer
-                      quickstep_queryexecution_WorkOrdersContainer
-                      quickstep_queryexecution_Worker
-                      quickstep_queryexecution_WorkerDirectory
-                      quickstep_queryexecution_WorkerMessage
-                      quickstep_queryexecution_WorkerSelectionPolicy)
-if (ENABLE_DISTRIBUTED)
-  target_link_libraries(quickstep_queryexecution
-                        quickstep_queryexecution_BlockLocator
-                        quickstep_queryexecution_BlockLocatorUtil
-                        quickstep_queryexecution_ForemanDistributed
-                        quickstep_queryexecution_PolicyEnforcerDistributed
-                        quickstep_queryexecution_QueryManagerDistributed
-                        quickstep_queryexecution_Shiftboss
-                        quickstep_queryexecution_ShiftbossDirectory)
-endif(ENABLE_DISTRIBUTED)
-
-# Tests:
-if (ENABLE_DISTRIBUTED)
-  add_executable(BlockLocator_unittest
-                 "${CMAKE_CURRENT_SOURCE_DIR}/tests/BlockLocator_unittest.cpp")
-  target_link_libraries(BlockLocator_unittest
-                        ${GFLAGS_LIB_NAME}
-                        glog
-                        gtest
-                        quickstep_catalog_CatalogAttribute
-                        quickstep_catalog_CatalogRelation
-                        quickstep_queryexecution_BlockLocator
-                        quickstep_queryexecution_BlockLocatorUtil
-                        quickstep_queryexecution_QueryExecutionMessages_proto
-                        quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_queryexecution_QueryExecutionUtil
-                        quickstep_storage_StorageBlob
-                        quickstep_storage_StorageBlock
-                        quickstep_storage_StorageBlockInfo
-                        quickstep_storage_StorageConstants
-                        quickstep_storage_StorageManager
-                        quickstep_types_TypeFactory
-                        quickstep_types_TypeID
-                        tmb
-                        ${LIBS})
-  add_test(BlockLocator_unittest BlockLocator_unittest)
-endif(ENABLE_DISTRIBUTED)
-
-add_executable(QueryManagerSingleNode_unittest
-  "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")
-target_link_libraries(QueryManagerSingleNode_unittest
-                      glog
-                      gtest
-                      gtest_main
-                      quickstep_catalog_CatalogDatabase
-                      quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryManagerSingleNode
-                      quickstep_queryexecution_WorkOrdersContainer
-                      quickstep_queryexecution_WorkerDirectory
-                      quickstep_queryexecution_WorkerMessage
-                      quickstep_queryoptimizer_QueryHandle
-                      quickstep_queryoptimizer_QueryPlan
-                      quickstep_relationaloperators_RelationalOperator
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_storage_InsertDestination
-                      quickstep_storage_InsertDestination_proto
-                      quickstep_storage_StorageBlock
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageManager
-                      quickstep_utility_DAG
-                      quickstep_utility_Macros
-                      tmb)
-add_test(QueryManagerSingleNode_unittest QueryManagerSingleNode_unittest)
-
-add_executable(WorkOrdersContainer_unittest
-               "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkOrdersContainer_unittest.cpp")
-target_link_libraries(WorkOrdersContainer_unittest
-                      glog
-                      gtest
-                      gtest_main
-                      quickstep_queryexecution_WorkOrdersContainer
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_utility_Macros
-                      quickstep_utility_PtrVector)
-add_test(WorkOrdersContainer_unittest WorkOrdersContainer_unittest)
-
-add_executable(WorkerDirectory_unittest
-  "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkerDirectory_unittest.cpp")
-target_link_libraries(WorkerDirectory_unittest
-                      gtest
-                      gtest_main
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_WorkerDirectory)
-add_test(WorkerDirectory_unittest WorkerDirectory_unittest)
-
-add_executable(WorkerSelectionPolicy_unittest
-  "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkerSelectionPolicy_unittest.cpp")
-target_link_libraries(WorkerSelectionPolicy_unittest
-                      gtest
-                      gtest_main
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_WorkerDirectory
-                      quickstep_queryexecution_WorkerSelectionPolicy)
-add_test(WorkerSelectionPolicy_unittest WorkerSelectionPolicy_unittest)
-
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/block_locator_test_data/)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp
deleted file mode 100644
index ee6c7ce..0000000
--- a/query_execution/ForemanBase.hpp
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
-
-#include <cstdio>
-#include <memory>
-#include <vector>
-
-#include "query_execution/PolicyEnforcerBase.hpp"
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-struct WorkOrderTimeEntry;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A base class that Foreman implements. This class is used to derive
- *        for implementations for both the single-node and distributed versions.
- **/
-class ForemanBase : public Thread {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param bus A pointer to the TMB.
-   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
-   *
-   * @note If cpu_id is not specified, Foreman thread can be possibly moved
-   *       around on different CPUs by the OS.
-  **/
-  ForemanBase(tmb::MessageBus *bus,
-              const int cpu_id)
-      : bus_(DCHECK_NOTNULL(bus)),
-        cpu_id_(cpu_id) {
-    foreman_client_id_ = bus_->Connect();
-  }
-
-  ~ForemanBase() override {}
-
-  /**
-   * @brief Print the results of profiling individual work orders for a given
-   *        query.
-   *
-   * TODO(harshad) - Add the name of the operator to the output.
-   *
-   * @param query_id The ID of the query for which the results are to be printed.
-   * @param out The file stream.
-   **/
-  virtual void printWorkOrderProfilingResults(const std::size_t query_id,
-                                              std::FILE *out) const = 0;
-
-  /**
-   * @brief Get the results of profiling individual work orders for a given
-   *        query.
-   *
-   * @param query_id The ID of the query for which the results are to be printed.
-   * @return A vector of records, each being a single profiling entry.
-   **/
-  const std::vector<WorkOrderTimeEntry>& getWorkOrderProfilingResults(
-      const std::size_t query_id) const {
-    return policy_enforcer_->getProfilingResults(query_id);
-  }
-
-  /**
-   * @brief Get the TMB client ID of Foreman thread.
-   *
-   * @return TMB client ID of foreman thread.
-   **/
-  tmb::client_id getBusClientID() const {
-    return foreman_client_id_;
-  }
-
- protected:
-  void run() override = 0;
-
-  tmb::MessageBus *bus_;
-
-  tmb::client_id foreman_client_id_;
-
-  // The ID of the CPU that the Foreman thread can optionally be pinned to.
-  const int cpu_id_;
-
-  std::unique_ptr<PolicyEnforcerBase> policy_enforcer_;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(ForemanBase);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
deleted file mode 100644
index d619657..0000000
--- a/query_execution/ForemanDistributed.cpp
+++ /dev/null
@@ -1,347 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include "query_execution/ForemanDistributed.hpp"
-
-#include <cstddef>
-#include <cstdio>
-#include <cstdlib>
-#include <memory>
-#include <unordered_map>
-#include <unordered_set>
-#include <utility>
-#include <vector>
-
-#include "catalog/Catalog.pb.h"
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/PolicyEnforcerBase.hpp"
-#include "query_execution/PolicyEnforcerDistributed.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/ShiftbossDirectory.hpp"
-#include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/message_style.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::AnnotatedMessage;
-using tmb::MessageBus;
-using tmb::TaggedMessage;
-using tmb::client_id;
-
-namespace quickstep {
-
-namespace S = serialization;
-
-class QueryHandle;
-
-ForemanDistributed::ForemanDistributed(
-    MessageBus *bus,
-    CatalogDatabaseLite *catalog_database,
-    const int cpu_id)
-    : ForemanBase(bus, cpu_id),
-      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
-  const std::vector<QueryExecutionMessageType> sender_message_types{
-      kShiftbossRegistrationResponseMessage,
-      kQueryInitiateMessage,
-      kWorkOrderMessage,
-      kInitiateRebuildMessage,
-      kQueryTeardownMessage,
-      kSaveQueryResultMessage,
-      kQueryExecutionSuccessMessage,
-      kPoisonMessage};
-
-  for (const auto message_type : sender_message_types) {
-    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
-  }
-
-  const std::vector<QueryExecutionMessageType> receiver_message_types{
-      kShiftbossRegistrationMessage,
-      kAdmitRequestMessage,
-      kQueryInitiateResponseMessage,
-      kCatalogRelationNewBlockMessage,
-      kDataPipelineMessage,
-      kInitiateRebuildResponseMessage,
-      kWorkOrderCompleteMessage,
-      kRebuildWorkOrderCompleteMessage,
-      kWorkOrderFeedbackMessage,
-      kSaveQueryResultResponseMessage,
-      kPoisonMessage};
-
-  for (const auto message_type : receiver_message_types) {
-    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
-  }
-
-  policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
-      foreman_client_id_,
-      catalog_database_,
-      &shiftboss_directory_,
-      bus_);
-}
-
-void ForemanDistributed::run() {
-  if (cpu_id_ >= 0) {
-    // We can pin the foreman thread to a CPU if specified.
-    ThreadUtil::BindToCPU(cpu_id_);
-  }
-
-  // Ensure that at least one Shiftboss to register.
-  if (shiftboss_directory_.empty()) {
-    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
-    const TaggedMessage &tagged_message = annotated_message.tagged_message;
-    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
-    DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
-               << "' message from client " << annotated_message.sender;
-
-    S::ShiftbossRegistrationMessage proto;
-    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-    processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
-    DCHECK_EQ(1u, shiftboss_directory_.size());
-  }
-
-  // Event loop
-  for (;;) {
-    // Receive() causes this thread to sleep until next message is received.
-    const AnnotatedMessage annotated_message =
-        bus_->Receive(foreman_client_id_, 0, true);
-    const TaggedMessage &tagged_message = annotated_message.tagged_message;
-    const tmb::message_type_id message_type = tagged_message.message_type();
-    DLOG(INFO) << "ForemanDistributed received typed '" << message_type
-               << "' message from client " << annotated_message.sender;
-    switch (message_type) {
-      case kShiftbossRegistrationMessage: {
-        S::ShiftbossRegistrationMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
-        break;
-      }
-      case kAdmitRequestMessage: {
-        const AdmitRequestMessage *request_message =
-            static_cast<const AdmitRequestMessage*>(tagged_message.message());
-
-        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
-        DCHECK(!query_handles.empty());
-
-        bool all_queries_admitted = true;
-        if (query_handles.size() == 1u) {
-          all_queries_admitted =
-              policy_enforcer_->admitQuery(query_handles.front());
-        } else {
-          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
-        }
-        if (!all_queries_admitted) {
-          LOG(WARNING) << "The scheduler could not admit all the queries";
-          // TODO(harshad) - Inform the main thread about the failure.
-        }
-        break;
-      }
-      case kQueryInitiateResponseMessage: {
-        S::QueryInitiateResponseMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-        CHECK(policy_enforcer_->existQuery(proto.query_id()));
-        break;
-      }
-      case kCatalogRelationNewBlockMessage:  // Fall through
-      case kDataPipelineMessage:
-      case kRebuildWorkOrderCompleteMessage:
-      case kWorkOrderCompleteMessage:
-      case kWorkOrderFeedbackMessage: {
-        policy_enforcer_->processMessage(tagged_message);
-        break;
-      }
-      case kInitiateRebuildResponseMessage: {
-        // A unique case in the distributed version.
-        static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
-            processInitiateRebuildResponseMessage(tagged_message);
-        break;
-      }
-      case kSaveQueryResultResponseMessage: {
-        S::SaveQueryResultResponseMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const std::size_t query_id = proto.query_id();
-        query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index());
-
-        // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-        if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) {
-          processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
-          query_result_saved_shiftbosses_.erase(query_id);
-        }
-        break;
-      }
-      case kPoisonMessage: {
-        if (policy_enforcer_->hasQueries()) {
-          LOG(WARNING) << "ForemanDistributed thread exiting while some queries are "
-                          "under execution or waiting to be admitted";
-        }
-
-        // Shutdown all Shiftbosses.
-        tmb::Address shiftboss_addresses;
-        for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
-          shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
-        }
-
-        tmb::MessageStyle broadcast_style;
-        broadcast_style.Broadcast(true);
-
-        TaggedMessage poison_message(kPoisonMessage);
-
-        const MessageBus::SendStatus send_status =
-            bus_->Send(foreman_client_id_,
-                       shiftboss_addresses,
-                       broadcast_style,
-                       move(poison_message));
-        DCHECK(send_status == MessageBus::SendStatus::kOK);
-        return;
-      }
-      default:
-        LOG(FATAL) << "Unknown message type to ForemanDistributed";
-    }
-
-    if (canCollectNewMessages(message_type)) {
-      vector<unique_ptr<S::WorkOrderMessage>> new_messages;
-      static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
-          getWorkOrderProtoMessages(&new_messages);
-      dispatchWorkOrderMessages(new_messages);
-    }
-  }
-}
-
-bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
-  return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
-                                        kCatalogRelationNewBlockMessage,
-                                        kWorkOrderFeedbackMessage) &&
-         // TODO(zuyu): Multiple Shiftbosses support.
-         !shiftboss_directory_.hasReachedCapacity(0);
-}
-
-void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
-  for (const auto &message : messages) {
-    DCHECK(message != nullptr);
-    // TODO(zuyu): Multiple Shiftbosses support.
-    sendWorkOrderMessage(0, *message);
-    shiftboss_directory_.incrementNumQueuedWorkOrders(0);
-  }
-}
-
-void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
-                                              const S::WorkOrderMessage &proto) {
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kWorkOrderMessage);
-  free(proto_bytes);
-
-  const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index);
-  DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage
-             << "') to Shiftboss with TMB client ID " << shiftboss_client_id;
-  const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         shiftboss_client_id,
-                                         move(message));
-  CHECK(send_status == MessageBus::SendStatus::kOK);
-}
-
-void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
-                                                        std::FILE *out) const {
-  const std::vector<WorkOrderTimeEntry> &recorded_times =
-      policy_enforcer_->getProfilingResults(query_id);
-  fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
-  for (const auto &workorder_entry : recorded_times) {
-    const std::size_t worker_id = workorder_entry.worker_id;
-    fprintf(out,
-            "%lu,%lu,%lu,%lu\n",
-            query_id,
-            worker_id,
-            workorder_entry.operator_id,  // Operator ID.
-            workorder_entry.end_time - workorder_entry.start_time);  // Time.
-  }
-}
-
-void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id,
-                                                             const std::size_t work_order_capacity) {
-  S::ShiftbossRegistrationResponseMessage proto;
-  proto.set_shiftboss_index(shiftboss_directory_.size());
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kShiftbossRegistrationResponseMessage);
-  free(proto_bytes);
-
-  shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
-
-  DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
-             << kShiftbossRegistrationResponseMessage
-             << "') to Shiftboss with TMB client id " << shiftboss_client_id;
-  const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         shiftboss_client_id,
-                                         move(message));
-  CHECK(send_status == MessageBus::SendStatus::kOK);
-}
-
-void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id,
-                                                               const relation_id result_relation_id) {
-  S::QueryExecutionSuccessMessage proto;
-  proto.mutable_result_relation()->MergeFrom(
-      static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kQueryExecutionSuccessMessage);
-  free(proto_bytes);
-
-  // Notify the CLI regarding the query result.
-  DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '"
-             << kQueryExecutionSuccessMessage
-             << "') to CLI with TMB client id " << cli_id;
-  const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         cli_id,
-                                         move(message));
-  CHECK(send_status == MessageBus::SendStatus::kOK);
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
deleted file mode 100644
index ccdd0ae..0000000
--- a/query_execution/ForemanDistributed.hpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <unordered_map>
-#include <unordered_set>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/ForemanBase.hpp"
-#include "query_execution/ShiftbossDirectory.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-
-namespace serialization { class WorkOrderMessage; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief The Foreman receives queries from the main thread, messages from the
- *        policy enforcer and dispatches the work to Shiftbosses. It also
- *        receives work completion messages from Shiftbosses.
- **/
-class ForemanDistributed final : public ForemanBase {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param bus A pointer to the TMB.
-   * @param catalog_database The catalog database where this query is executed.
-   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
-   *
-   * @note If cpu_id is not specified, Foreman thread can be possibly moved
-   *       around on different CPUs by the OS.
-  **/
-  ForemanDistributed(tmb::MessageBus *bus,
-                     CatalogDatabaseLite *catalog_database,
-                     const int cpu_id = -1);
-
-  ~ForemanDistributed() override {}
-
-  void printWorkOrderProfilingResults(const std::size_t query_id,
-                                      std::FILE *out) const override;
-
- protected:
-  void run() override;
-
- private:
-  /**
-   * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
-   *        worker threads.
-   *
-   * @param messages The messages to be dispatched.
-   **/
-  void dispatchWorkOrderMessages(
-      const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
-
-  /**
-   * @brief Send the given message to the specified worker.
-   *
-   * @param worker_index The logical index of the recipient worker in
-   *        ShiftbossDirectory.
-   * @param proto The WorkOrderMessage to be sent.
-   **/
-  void sendWorkOrderMessage(const std::size_t worker_index,
-                            const serialization::WorkOrderMessage &proto);
-
-  void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id,
-                                           const std::size_t work_order_capacity);
-
-  void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
-                                             const relation_id result_relation_id);
-
-  /**
-   * @brief Check if we can collect new messages from the PolicyEnforcer.
-   *
-   * @param message_type The type of the last received message.
-   **/
-  bool canCollectNewMessages(const tmb::message_type_id message_type);
-
-  ShiftbossDirectory shiftboss_directory_;
-
-  CatalogDatabaseLite *catalog_database_;
-
-  // From a query id to a set of Shiftbosses that save query result.
-  std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
-
-  DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
deleted file mode 100644
index 02799c7..0000000
--- a/query_execution/ForemanSingleNode.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/ForemanSingleNode.hpp"
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/PolicyEnforcerBase.hpp"
-#include "query_execution/PolicyEnforcerSingleNode.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
-#include "utility/Macros.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-using std::vector;
-
-namespace quickstep {
-
-class QueryHandle;
-
-DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
-              "of pending work orders for the worker. This information is used "
-              "by the Foreman to assign work orders to worker threads");
-
-ForemanSingleNode::ForemanSingleNode(
-    const tmb::client_id main_thread_client_id,
-    WorkerDirectory *worker_directory,
-    tmb::MessageBus *bus,
-    CatalogDatabaseLite *catalog_database,
-    StorageManager *storage_manager,
-    const int cpu_id,
-    const size_t num_numa_nodes)
-    : ForemanBase(bus, cpu_id),
-      main_thread_client_id_(main_thread_client_id),
-      worker_directory_(DCHECK_NOTNULL(worker_directory)),
-      catalog_database_(DCHECK_NOTNULL(catalog_database)),
-      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
-  const std::vector<QueryExecutionMessageType> sender_message_types{
-      kPoisonMessage,
-      kRebuildWorkOrderMessage,
-      kWorkOrderMessage,
-      kWorkloadCompletionMessage};
-
-  for (const auto message_type : sender_message_types) {
-    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
-  }
-
-  const std::vector<QueryExecutionMessageType> receiver_message_types{
-      kAdmitRequestMessage,
-      kCatalogRelationNewBlockMessage,
-      kDataPipelineMessage,
-      kPoisonMessage,
-      kRebuildWorkOrderCompleteMessage,
-      kWorkOrderFeedbackMessage,
-      kWorkOrderCompleteMessage};
-
-  for (const auto message_type : receiver_message_types) {
-    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
-  }
-
-  policy_enforcer_ = std::make_unique<PolicyEnforcerSingleNode>(
-      foreman_client_id_,
-      num_numa_nodes,
-      catalog_database_,
-      storage_manager_,
-      worker_directory_,
-      bus_);
-}
-
-void ForemanSingleNode::run() {
-  if (cpu_id_ >= 0) {
-    // We can pin the foreman thread to a CPU if specified.
-    ThreadUtil::BindToCPU(cpu_id_);
-  }
-
-  // Event loop
-  for (;;) {
-    // Receive() causes this thread to sleep until next message is received.
-    const AnnotatedMessage annotated_msg =
-        bus_->Receive(foreman_client_id_, 0, true);
-    const TaggedMessage &tagged_message = annotated_msg.tagged_message;
-    const tmb::message_type_id message_type = tagged_message.message_type();
-    switch (message_type) {
-      case kCatalogRelationNewBlockMessage:  // Fall through
-      case kDataPipelineMessage:
-      case kRebuildWorkOrderCompleteMessage:
-      case kWorkOrderCompleteMessage:
-      case kWorkOrderFeedbackMessage: {
-        policy_enforcer_->processMessage(tagged_message);
-        break;
-      }
-
-      case kAdmitRequestMessage: {
-        const AdmitRequestMessage *msg =
-            static_cast<const AdmitRequestMessage *>(tagged_message.message());
-        const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
-
-        DCHECK(!query_handles.empty());
-        bool all_queries_admitted = true;
-        if (query_handles.size() == 1u) {
-          all_queries_admitted =
-              policy_enforcer_->admitQuery(query_handles.front());
-        } else {
-          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
-        }
-        if (!all_queries_admitted) {
-          LOG(WARNING) << "The scheduler could not admit all the queries";
-          // TODO(harshad) - Inform the main thread about the failure.
-        }
-        break;
-      }
-      case kPoisonMessage: {
-        if (policy_enforcer_->hasQueries()) {
-          LOG(WARNING) << "Foreman thread exiting while some queries are "
-                          "under execution or waiting to be admitted";
-        }
-        return;
-      }
-      default:
-        LOG(FATAL) << "Unknown message type to Foreman";
-    }
-
-    if (canCollectNewMessages(message_type)) {
-      vector<unique_ptr<WorkerMessage>> new_messages;
-      static_cast<PolicyEnforcerSingleNode*>(policy_enforcer_.get())->
-          getWorkerMessages(&new_messages);
-      dispatchWorkerMessages(new_messages);
-    }
-
-    // We check again, as some queries may produce zero work orders and finish
-    // their execution.
-    if (!policy_enforcer_->hasQueries()) {
-      // Signal the main thread that there are no queries to be executed.
-      // Currently the message doesn't have any real content.
-      TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
-      DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage
-                 << "') to CLI with TMB client ID " << main_thread_client_id_;
-      const tmb::MessageBus::SendStatus send_status =
-          QueryExecutionUtil::SendTMBMessage(
-              bus_,
-              foreman_client_id_,
-              main_thread_client_id_,
-              move(completion_tagged_message));
-      CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-    }
-  }
-}
-
-bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) {
-  if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
-                                    kCatalogRelationNewBlockMessage,
-                                    kWorkOrderFeedbackMessage)) {
-    return false;
-  } else if (worker_directory_->getLeastLoadedWorker().second <=
-             FLAGS_min_load_per_worker) {
-    // If the least loaded worker has only one pending work order, we should
-    // collect new messages and dispatch them.
-    return true;
-  } else {
-    return false;
-  }
-}
-
-void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
-  for (const auto &message : messages) {
-    DCHECK(message != nullptr);
-    const int recipient_worker_thread_index = message->getRecipientHint();
-    if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
-      sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
-                        *message);
-      worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
-    } else {
-      const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
-      sendWorkerMessage(least_loaded_worker_thread_index, *message);
-      worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
-    }
-  }
-}
-
-void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
-                                          const WorkerMessage &message) {
-  tmb::message_type_id type;
-  if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
-    type = kRebuildWorkOrderMessage;
-  } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
-    type = kWorkOrderMessage;
-  } else {
-    FATAL_ERROR("Invalid WorkerMessageType");
-  }
-  TaggedMessage worker_tagged_message(&message, sizeof(message), type);
-
-  DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type
-             << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index);
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         worker_directory_->getClientID(worker_thread_index),
-                                         move(worker_tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-}
-
-void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id,
-                                                       std::FILE *out) const {
-  // TODO(harshad) - Add the CPU core ID of the operator to the output. This
-  // will require modifying the WorkerDirectory to remember worker affinities.
-  // Until then, the users can refer to the worker_affinities provided to the
-  // cli to infer the CPU core ID where a given worker is pinned.
-  const std::vector<WorkOrderTimeEntry> &recorded_times =
-      policy_enforcer_->getProfilingResults(query_id);
-  fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
-  for (auto workorder_entry : recorded_times) {
-    const std::size_t worker_id = workorder_entry.worker_id;
-    fprintf(out,
-            "%lu,%lu,%d,%lu,%lu\n",
-            query_id,
-            worker_id,
-            worker_directory_->getNUMANode(worker_id),
-            workorder_entry.operator_id,  // Operator ID.
-            workorder_entry.end_time - workorder_entry.start_time);  // Time.
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
deleted file mode 100644
index d2db51b..0000000
--- a/query_execution/ForemanSingleNode.hpp
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <vector>
-
-#include "query_execution/ForemanBase.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class StorageManager;
-class WorkerDirectory;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief The Foreman receives queries from the main thread, messages from the
- *        policy enforcer and dispatches the work to worker threads. It also
- *        receives work completion messages from workers.
- **/
-class ForemanSingleNode final : public ForemanBase {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param main_thread_client_id The TMB client ID of the main thread.
-   * @param worker_directory The worker directory.
-   * @param bus A pointer to the TMB.
-   * @param catalog_database The catalog database where this query is executed.
-   * @param storage_manager The StorageManager to use.
-   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
-   * @param num_numa_nodes The number of NUMA nodes in the system.
-   *
-   * @note If cpu_id is not specified, Foreman thread can be possibly moved
-   *       around on different CPUs by the OS.
-  **/
-  ForemanSingleNode(const tmb::client_id main_thread_client_id,
-          WorkerDirectory *worker_directory,
-          tmb::MessageBus *bus,
-          CatalogDatabaseLite *catalog_database,
-          StorageManager *storage_manager,
-          const int cpu_id = -1,
-          const std::size_t num_numa_nodes = 1);
-
-  ~ForemanSingleNode() override {}
-
-  void printWorkOrderProfilingResults(const std::size_t query_id,
-                                      std::FILE *out) const override;
-
- protected:
-  void run() override;
-
- private:
-  /**
-   * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
-   *        worker threads.
-   *
-   * @param messages The messages to be dispatched.
-   **/
-  void dispatchWorkerMessages(
-      const std::vector<std::unique_ptr<WorkerMessage>> &messages);
-
-  /**
-   * @brief Send the given message to the specified worker.
-   *
-   * @param worker_thread_index The logical index of the recipient worker thread
-   *        in WorkerDirectory.
-   * @param message The WorkerMessage to be sent.
-   **/
-  void sendWorkerMessage(const std::size_t worker_thread_index,
-                         const WorkerMessage &message);
-
-  /**
-   * @brief Check if we can collect new messages from the PolicyEnforcer.
-   *
-   * @param message_type The type of the last received message.
-   **/
-  bool canCollectNewMessages(const tmb::message_type_id message_type);
-
-  const tmb::client_id main_thread_client_id_;
-
-  WorkerDirectory *worker_directory_;
-
-  CatalogDatabaseLite *catalog_database_;
-  StorageManager *storage_manager_;
-
-  DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_