You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/12/05 08:20:30 UTC
[1/2] incubator-quickstep git commit: Scheduling based on data
locality info. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-executable c128191cc -> b949c5045 (forced update)
Scheduling based on data locality info.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bc4086be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bc4086be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bc4086be
Branch: refs/heads/dist-executable
Commit: bc4086be44bb400f29ae9cada17b951241040bee
Parents: 0859a17
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Dec 5 00:13:59 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Dec 5 00:13:59 2016 -0800
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 53 +++++++++++++++++---
query_execution/BlockLocator.hpp | 53 ++++++++++++++++++++
query_execution/CMakeLists.txt | 3 ++
query_execution/ForemanDistributed.cpp | 53 +++++++++++++++++++-
query_execution/ForemanDistributed.hpp | 13 +++--
query_execution/QueryExecutionMessages.proto | 6 +++
query_execution/QueryExecutionTypedefs.hpp | 1 +
query_execution/Shiftboss.cpp | 1 +
.../DistributedExecutionGeneratorTestRunner.cpp | 2 +-
storage/StorageManager.cpp | 29 +++++++++++
storage/StorageManager.hpp | 9 ++++
11 files changed, 210 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 5de6a54..fa6db51 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -27,6 +27,7 @@
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "threading/SpinSharedMutex.hpp"
#include "threading/ThreadUtil.hpp"
#include "glog/logging.h"
@@ -65,6 +66,18 @@ void BlockLocator::run() {
processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
break;
}
+ case kBlockDomainToShiftbossIndexMessage: {
+ serialization::BlockDomainToShiftbossIndexMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ {
+ // Lock 'block_domain_to_shiftboss_index_shared_mutex_' as briefly as
+ // possible to insert an entry for the new Shiftboss index.
+ SpinSharedMutexExclusiveLock<false> write_lock(block_domain_to_shiftboss_index_shared_mutex_);
+ block_domain_to_shiftboss_index_.emplace(proto.block_domain(), proto.shiftboss_index());
+ }
+ break;
+ }
case kAddBlockLocationMessage: {
serialization::BlockLocationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -72,9 +85,15 @@ void BlockLocator::run() {
const block_id block = proto.block_id();
const block_id_domain domain = proto.block_domain();
- const auto result_block_locations = block_locations_[block].insert(domain);
const auto result_domain_blocks = domain_blocks_[domain].insert(block);
- DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+
+ {
+ // Lock 'block_locations_shared_mutex_' as briefly as possible to
+ // insert an entry for the new block location.
+ SpinSharedMutexExclusiveLock<false> write_lock(block_locations_shared_mutex_);
+ const auto result_block_locations = block_locations_[block].insert(domain);
+ DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+ }
if (result_domain_blocks.second) {
DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
@@ -90,11 +109,20 @@ void BlockLocator::run() {
const block_id block = proto.block_id();
const block_id_domain domain = proto.block_domain();
- const auto cit = block_locations_[block].find(domain);
- if (cit != block_locations_[block].end()) {
- block_locations_[block].erase(domain);
- domain_blocks_[domain].erase(block);
+ bool block_found = false;
+ {
+ // Lock 'block_locations_shared_mutex_' as briefly as possible to
+ // delete an entry for the block location.
+ SpinSharedMutexExclusiveLock<false> write_lock(block_locations_shared_mutex_);
+ const auto cit = block_locations_[block].find(domain);
+ if (cit != block_locations_[block].end()) {
+ block_locations_[block].erase(domain);
+ block_found = true;
+ }
+ }
+ if (block_found) {
+ domain_blocks_[domain].erase(block);
DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
} else {
DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
@@ -123,8 +151,13 @@ void BlockLocator::run() {
domain_network_addresses_.erase(domain);
- for (const block_id block : domain_blocks_[domain]) {
- block_locations_[block].erase(domain);
+ {
+ // Lock 'block_locations_shared_mutex_' as briefly as possible to
+ // delete all entry for the block domain.
+ SpinSharedMutexExclusiveLock<false> write_lock(block_locations_shared_mutex_);
+ for (const block_id block : domain_blocks_[domain]) {
+ block_locations_[block].erase(domain);
+ }
}
domain_blocks_.erase(domain);
@@ -172,6 +205,8 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
const block_id block) {
serialization::LocateBlockResponseMessage proto;
+ // NOTE(zuyu): We don't need to protect here, as all the writers are in the
+ // single thread.
for (const block_id_domain domain : block_locations_[block]) {
proto.add_block_domains(domain);
}
@@ -199,6 +234,8 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r
const block_id block) {
serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
+ // NOTE(zuyu): We don't need to protect here, as all the writers are in the
+ // single thread.
for (const block_id_domain domain : block_locations_[block]) {
proto.add_domain_network_addresses(domain_network_addresses_[domain]);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index a83a394..4690369 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -21,6 +21,7 @@
#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
#include <atomic>
+#include <cstddef>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -28,6 +29,7 @@
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageConstants.hpp"
+#include "threading/SpinSharedMutex.hpp"
#include "threading/Thread.hpp"
#include "utility/Macros.hpp"
@@ -67,6 +69,8 @@ class BlockLocator : public Thread {
bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainToShiftbossIndexMessage);
+
bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
@@ -91,6 +95,46 @@ class BlockLocator : public Thread {
return locator_client_id_;
}
+ /**
+ * @brief Get the block locality info for scheduling in ForemanDistributed.
+ *
+ * @param block The given block.
+ * @param shiftboss_index_for_block The index of Shiftboss that has loaded the
+ * block in the buffer pool.
+ *
+ * @return Whether the block locality info has found.
+ **/
+ bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
+ std::unordered_set<block_id_domain> block_domains;
+ {
+ // Lock 'block_locations_shared_mutex_' as briefly as possible as a
+ // reader.
+ SpinSharedMutexSharedLock<false> read_lock(block_locations_shared_mutex_);
+ const auto cit = block_locations_.find(block);
+ if (cit != block_locations_.end()) {
+ block_domains = cit->second;
+ } else {
+ return false;
+ }
+ }
+
+ {
+ // NOTE(zuyu): This lock is held for the rest duration of this call, as the
+ // exclusive case is rare.
+ SpinSharedMutexSharedLock<false> read_lock(block_domain_to_shiftboss_index_shared_mutex_);
+ for (const block_id_domain block_domain : block_domains) {
+ // TODO(quickstep-team): choose the best node, instead of the first.
+ const auto cit = block_domain_to_shiftboss_index_.find(block_domain);
+ if (cit != block_domain_to_shiftboss_index_.end()) {
+ *shiftboss_index_for_block = cit->second;
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
protected:
void run() override;
@@ -110,8 +154,17 @@ class BlockLocator : public Thread {
// "0.0.0.0:0".
std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
+ // From a block domain to its Shiftboss index, used by ForemanDistributed
+ // to schedule based on the data-locality info.
+ // Note that not every 'block_id_domain' has a Shiftboss index. For example,
+ // DistributedCli has StorageManager with a 'block_id_domain', which is not
+ // a part of Shiftboss.
+ std::unordered_map<block_id_domain, std::size_t> block_domain_to_shiftboss_index_;
+ alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_to_shiftboss_index_shared_mutex_;
+
// From a block to its domains.
std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+ alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_locations_shared_mutex_;
// From a block domain to all blocks loaded in its buffer pool.
std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 1f7add8..0f74384 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -80,6 +80,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryExecutionUtil
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageConstants
+ quickstep_threading_SpinSharedMutex
quickstep_threading_Thread
quickstep_threading_ThreadUtil
quickstep_utility_Macros
@@ -105,6 +106,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_BlockLocator
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerDistributed
@@ -114,6 +116,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_ShiftbossDirectory
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_StorageBlockInfo
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 0dad8b0..0fa701d 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/QueryContext.hpp"
@@ -36,6 +37,7 @@
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
#include "threading/ThreadUtil.hpp"
#include "utility/EqualsAnyConstant.hpp"
@@ -64,10 +66,12 @@ namespace S = serialization;
class QueryHandle;
ForemanDistributed::ForemanDistributed(
+ const BlockLocator &block_locator,
MessageBus *bus,
CatalogDatabaseLite *catalog_database,
const int cpu_id)
: ForemanBase(bus, cpu_id),
+ block_locator_(block_locator),
catalog_database_(DCHECK_NOTNULL(catalog_database)) {
const std::vector<QueryExecutionMessageType> sender_message_types{
kShiftbossRegistrationResponseMessage,
@@ -296,7 +300,50 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
}
namespace {
+
constexpr size_t kDefaultShiftbossIndex = 0u;
+
+bool isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+ const BlockLocator &block_locator,
+ std::size_t *shiftboss_index_for_join) {
+ if (work_order_proto.work_order_type() != S::NESTED_LOOP_JOIN) {
+ return false;
+ }
+
+ const block_id left_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id);
+ if (block_locator.getBlockLocalityInfo(left_block, shiftboss_index_for_join)) {
+ return true;
+ }
+
+ const block_id right_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id);
+ return block_locator.getBlockLocalityInfo(right_block, shiftboss_index_for_join);
+}
+
+bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
+ const BlockLocator &block_locator,
+ std::size_t *shiftboss_index_for_block) {
+ block_id block = kInvalidBlockId;
+ switch (work_order_proto.work_order_type()) {
+ case S::SAVE_BLOCKS: {
+ block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
+ break;
+ }
+ case S::SELECT: {
+ block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+ break;
+ }
+ case S::SORT_RUN_GENERATION: {
+ block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
+ break;
+ }
+ default:
+ return false;
+ }
+
+ DCHECK_NE(block, kInvalidBlockId);
+ return block_locator.getBlockLocalityInfo(block, shiftboss_index_for_block);
+}
+
} // namespace
void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
@@ -306,14 +353,18 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
for (const auto &message : messages) {
DCHECK(message != nullptr);
const S::WorkOrderMessage &proto = *message;
+ const S::WorkOrder &work_order_proto = proto.work_order();
size_t shiftboss_index_for_particular_work_order_type;
if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
// Always schedule the single-node query to the same Shiftboss.
shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
} else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ } else if (hasBlockLocalityInfo(work_order_proto, block_locator_,
+ &shiftboss_index_for_particular_work_order_type)) {
+ } else if (isNestedLoopsJoinWorkOrder(work_order_proto, block_locator_,
+ &shiftboss_index_for_particular_work_order_type)) {
} else {
- // TODO(zuyu): Take data-locality into account for scheduling.
shiftboss_index_for_particular_work_order_type = shiftboss_index;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 34bac07..ed09fda 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -33,6 +33,7 @@ namespace tmb { class MessageBus; }
namespace quickstep {
+class BlockLocator;
class CatalogDatabaseLite;
namespace serialization { class WorkOrderMessage; }
@@ -51,6 +52,7 @@ class ForemanDistributed final : public ForemanBase {
/**
* @brief Constructor.
*
+ * @param block_locator The block locator that manages block location info.
* @param bus A pointer to the TMB.
* @param catalog_database The catalog database where this query is executed.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
@@ -58,9 +60,11 @@ class ForemanDistributed final : public ForemanBase {
* @note If cpu_id is not specified, Foreman thread can be possibly moved
* around on different CPUs by the OS.
**/
- ForemanDistributed(tmb::MessageBus *bus,
- CatalogDatabaseLite *catalog_database,
- const int cpu_id = -1);
+ ForemanDistributed(
+ const BlockLocator &block_locator,
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id = -1);
~ForemanDistributed() override {}
@@ -111,6 +115,9 @@ class ForemanDistributed final : public ForemanBase {
**/
bool canCollectNewMessages(const tmb::message_type_id message_type);
+ // To get block locality info for scheduling.
+ const BlockLocator &block_locator_;
+
ShiftbossDirectory shiftboss_directory_;
CatalogDatabaseLite *catalog_database_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e6d741a..93e458c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -142,6 +142,12 @@ message BlockDomainMessage {
required uint32 block_domain = 1;
}
+// Used for the block locality based scheduling in ForemanDistributed.
+message BlockDomainToShiftbossIndexMessage {
+ required uint32 block_domain = 1;
+ required uint64 shiftboss_index = 2;
+}
+
// Used when StorageManager loads or evicts a block or a blob from its buffer
// pool.
message BlockLocationMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fb9a9d6..919e45b 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -99,6 +99,7 @@ enum QueryExecutionMessageType : message_type_id {
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker.
+ kBlockDomainToShiftbossIndexMessage, // From StorageManager to BlockLocator.
kAddBlockLocationMessage, // From StorageManager to BlockLocator.
kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
kLocateBlockMessage, // From StorageManager to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ed4bade..2ed42d0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -286,6 +286,7 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
shiftboss_index_ = proto.shiftboss_index();
+ storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
// Forward this message to Workers regarding <shiftboss_index_>.
QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5100651..45d4fdf 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -98,7 +98,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database());
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index a202f71..6299cda 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -230,6 +230,8 @@ StorageManager::StorageManager(
bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainToShiftbossIndexMessage);
+
bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
@@ -470,6 +472,33 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
}
#ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index) {
+ serialization::BlockDomainToShiftbossIndexMessage proto;
+ proto.set_block_domain(block_domain_);
+ proto.set_shiftboss_index(shiftboss_index);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainToShiftbossIndexMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockDomainToShiftbossIndexMessage (typed '" << kBlockDomainToShiftbossIndexMessage
+ << "') to BlockLocator";
+
+ DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+ DCHECK(bus_ != nullptr);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+}
+
void StorageManager::pullBlockOrBlob(const block_id block,
PullResponse *response) const {
SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 066953b..b61f10a 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -382,6 +382,15 @@ class StorageManager {
#ifdef QUICKSTEP_DISTRIBUTED
/**
+ * @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
+ * ForemanDistributed could take advantages of block locality info
+ * for a better scheduling policy.
+ *
+ * @param shiftboss_index The Shiftboss index.
+ **/
+ void sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index);
+
+ /**
* @brief Pull a block or a blob. Used by DataExchangerAsync.
*
* @param block The id of the block or blob.
[2/2] incubator-quickstep git commit: Added DistributedCli executable.
Posted by zu...@apache.org.
Added DistributedCli executable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b949c504
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b949c504
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b949c504
Branch: refs/heads/dist-executable
Commit: b949c50450625f16c40a581db3ed811be2b9ccf2
Parents: bc4086b
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 27 22:32:24 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Dec 5 00:15:16 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 17 ++
cli/CMakeLists.txt | 3 +
cli/distributed/CMakeLists.txt | 93 +++++++++
cli/distributed/Cli.cpp | 239 ++++++++++++++++++++++
cli/distributed/Cli.hpp | 71 +++++++
cli/distributed/CliDistributedModule.hpp | 23 +++
cli/distributed/Conductor.cpp | 180 ++++++++++++++++
cli/distributed/Conductor.hpp | 80 ++++++++
cli/distributed/Executor.cpp | 87 ++++++++
cli/distributed/Executor.hpp | 83 ++++++++
cli/distributed/QuickstepDistributedCli.cpp | 81 ++++++++
cli/distributed/Role.cpp | 51 +++++
cli/distributed/Role.hpp | 69 +++++++
query_execution/QueryExecutionMessages.proto | 8 +
query_execution/QueryExecutionTypedefs.hpp | 6 +-
validate_cmakelists.py | 4 +-
16 files changed, 1093 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 126b47b..391cb26 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -782,3 +782,20 @@ endif()
# Link against other required system and third-party libraries.
target_link_libraries(quickstep_cli_shell ${LIBS})
+
+if (ENABLE_DISTRIBUTED)
+ # Build the quickstep_distributed_cli_shell executable.
+ add_executable (quickstep_distributed_cli_shell cli/distributed/QuickstepDistributedCli.cpp)
+ # Link against direct deps (will transitively pull in everything needed).
+ # NOTE(zuyu): Link quickstep_cli_LineReader on behalf of quickstep_cli_distributed_Cli,
+ # as a workaround for bypassing conditionally built target checks in validate_cmakelists.py.
+ target_link_libraries(quickstep_distributed_cli_shell
+ glog
+ quickstep_cli_LineReader
+ quickstep_cli_distributed_Cli
+ quickstep_cli_distributed_Conductor
+ quickstep_cli_distributed_Executor
+ quickstep_utility_StringUtil
+ ${GFLAGS_LIB_NAME}
+ ${GRPCPLUSPLUS_LIBRARIES})
+endif(ENABLE_DISTRIBUTED)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 9b62af9..be13c82 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -16,6 +16,9 @@
# under the License.
include_directories(${CMAKE_CURRENT_BINARY_DIR})
+if (ENABLE_DISTRIBUTED)
+ add_subdirectory(distributed)
+endif(ENABLE_DISTRIBUTED)
add_subdirectory(tests)
if (WIN32)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
new file mode 100644
index 0000000..e16d8af
--- /dev/null
+++ b/cli/distributed/CMakeLists.txt
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_SHARED_LIBS)
+ set(GFLAGS_LIB_NAME gflags_nothreads-shared)
+else()
+ set(GFLAGS_LIB_NAME gflags_nothreads-static)
+endif()
+
+# Declare micro-libs and link dependencies:
+add_library(quickstep_cli_distributed_Cli Cli.cpp Cli.hpp)
+add_library(quickstep_cli_distributed_Conductor Conductor.cpp Conductor.hpp)
+add_library(quickstep_cli_distributed_Executor Executor.cpp Executor.hpp)
+add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_cli_distributed_Cli
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_cli_Flags
+ quickstep_cli_PrintToScreen
+ quickstep_cli_distributed_Role
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_BlockLocatorUtil
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_utility_Macros
+ quickstep_utility_StringUtil
+ tmb)
+target_link_libraries(quickstep_cli_distributed_Conductor
+ glog
+ quickstep_cli_DefaultsConfigurator
+ quickstep_cli_Flags
+ quickstep_cli_distributed_Role
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryProcessor
+ quickstep_storage_StorageConstants
+ quickstep_utility_Macros
+ quickstep_utility_SqlError
+ tmb)
+target_link_libraries(quickstep_cli_distributed_Executor
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_cli_Flags
+ quickstep_cli_distributed_Role
+ quickstep_queryexecution_BlockLocatorUtil
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_Shiftboss
+ quickstep_queryexecution_Worker
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageManager
+ quickstep_utility_Macros
+ tmb)
+target_link_libraries(quickstep_cli_distributed_Role
+ quickstep_utility_Macros
+ tmb
+ ${GFLAGS_LIB_NAME})
+
+# Module all-in-one library:
+add_library(quickstep_cli_distributed ../../empty_src.cpp CliDistributedModule.hpp)
+
+target_link_libraries(quickstep_cli_distributed
+ quickstep_cli_distributed_Cli
+ quickstep_cli_distributed_Conductor
+ quickstep_cli_distributed_Executor
+ quickstep_cli_distributed_Role)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
new file mode 100644
index 0000000..01f824d
--- /dev/null
+++ b/cli/distributed/Cli.cpp
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Cli.hpp"
+
+#include <chrono>
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "cli/CliConfig.h" // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Flags.hpp"
+
+#ifdef QUICKSTEP_USE_LINENOISE
+#include "cli/LineReaderLineNoise.hpp"
+typedef quickstep::LineReaderLineNoise LineReaderImpl;
+#else
+#include "cli/LineReaderDumb.hpp"
+typedef quickstep::LineReaderDumb LineReaderImpl;
+#endif
+
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "utility/StringUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::fprintf;
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::printf;
+using std::size_t;
+using std::string;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Cli::init() {
+ cli_id_ = bus_.Connect();
+ DLOG(INFO) << "DistributedCli TMB Client ID: " << cli_id_;
+
+ bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage);
+
+ DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage (typed '"
+ << kDistributedCliRegistrationMessage
+ << "') to all";
+
+ tmb::Address all_addresses;
+ all_addresses.All(true);
+ // NOTE(zuyu): The singleton Conductor would need one copy of the message.
+ tmb::MessageStyle style;
+
+ TaggedMessage cli_reg_message(kDistributedCliRegistrationMessage);
+ DCHECK(tmb::MessageBus::SendStatus::kOK ==
+ bus_.Send(cli_id_, all_addresses, style, move(cli_reg_message)));
+
+ // Wait for Conductor to response.
+ const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true));
+ DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
+ cli_reg_response_message.tagged_message.message_type());
+ conductor_client_id_ = cli_reg_response_message.sender;
+
+ DLOG(INFO) << "DistributedCli received typed '" << kDistributedCliRegistrationResponseMessage
+ << "' message from Conductor (id'" << conductor_client_id_ << "').";
+
+ // Setup StorageManager.
+ bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+ client_id locator_client_id;
+ storage_manager_ = make_unique<StorageManager>(
+ FLAGS_storage_path,
+ block_locator::getBlockDomain(data_exchanger_.network_address(), cli_id_, &locator_client_id, &bus_),
+ locator_client_id, &bus_);
+
+ data_exchanger_.set_storage_manager(storage_manager_.get());
+ data_exchanger_.start();
+
+ // Prepare for submitting a query.
+ bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+}
+
+void Cli::run() {
+ LineReaderImpl line_reader("distributed_quickstep> ",
+ " ...> ");
+ auto parser_wrapper = make_unique<SqlParserWrapper>();
+ std::chrono::time_point<std::chrono::steady_clock> start, end;
+
+ for (;;) {
+ string *command_string = new string();
+ *command_string = line_reader.getNextCommand();
+ if (command_string->size() == 0) {
+ delete command_string;
+ break;
+ }
+
+ parser_wrapper->feedNextBuffer(command_string);
+
+ bool quitting = false;
+ // A parse error should reset the parser. This is because the thrown quickstep
+ // SqlError does not do the proper reset work of the YYABORT macro.
+ bool reset_parser = false;
+ for (;;) {
+ ParseResult result = parser_wrapper->getNextStatement();
+ const ParseStatement &statement = *result.parsed_statement;
+ if (result.condition == ParseResult::kSuccess) {
+ if (statement.getStatementType() == ParseStatement::kQuit) {
+ quitting = true;
+ break;
+ }
+
+ CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
+ << "TODO(quickstep-team)";
+
+ DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+ << "') to Conductor";
+ S::SqlQueryMessage proto;
+ proto.set_sql_query(*command_string);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kSqlQueryMessage);
+ free(proto_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ cli_id_,
+ conductor_client_id_,
+ move(sql_query_message));
+
+ start = std::chrono::steady_clock::now();
+
+ const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+ switch (tagged_message.message_type()) {
+ case kQueryExecutionSuccessMessage: {
+ end = std::chrono::steady_clock::now();
+
+ S::QueryExecutionSuccessMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ if (proto.has_result_relation()) {
+ CatalogRelation result_relation(proto.result_relation());
+
+ PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
+
+ const vector<block_id> blocks(result_relation.getBlocksSnapshot());
+ for (const block_id block : blocks) {
+ storage_manager_->deleteBlockOrBlobFile(block);
+ }
+ }
+
+ std::chrono::duration<double, std::milli> time_in_ms = end - start;
+ printf("Time: %s ms\n", DoubleToStringWithSignificantDigits(time_in_ms.count(), 3).c_str());
+ break;
+ }
+ case kQueryExecutionErrorMessage: {
+ S::QueryExecutionErrorMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ fprintf(stderr, "%s", proto.error_message().c_str());
+ break;
+ }
+ default: {
+ LOG(ERROR) << "Unknown TMB message type";
+ }
+ }
+ } else {
+ if (result.condition == ParseResult::kError) {
+ fprintf(stderr, "%s", result.error_message.c_str());
+ }
+ reset_parser = true;
+ break;
+ }
+ }
+
+ if (quitting) {
+ break;
+ } else if (reset_parser) {
+ parser_wrapper = make_unique<SqlParserWrapper>();
+ reset_parser = false;
+ }
+ }
+
+ bus_.Disconnect(cli_id_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Cli.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.hpp b/cli/distributed/Cli.hpp
new file mode 100644
index 0000000..32c178f
--- /dev/null
+++ b/cli/distributed/Cli.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+
+#include <memory>
+
+#include "cli/distributed/Role.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A class for the Cli component in the distributed version.
+ **/
+class Cli final : public Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Cli() = default;
+
+ ~Cli() override {
+ data_exchanger_.shutdown();
+ storage_manager_.reset();
+ data_exchanger_.join();
+ }
+
+ void init() override;
+
+ void run() override;
+
+ private:
+ tmb::client_id cli_id_, conductor_client_id_;
+
+ DataExchangerAsync data_exchanger_;
+ std::unique_ptr<StorageManager> storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(Cli);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/CliDistributedModule.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/CliDistributedModule.hpp b/cli/distributed/CliDistributedModule.hpp
new file mode 100644
index 0000000..cfa1e1b
--- /dev/null
+++ b/cli/distributed/CliDistributedModule.hpp
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/** @defgroup CliDistributed
+ *
+ * The distributed QuickStep command-line interface.
+ **/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
new file mode 100644
index 0000000..c4a2721
--- /dev/null
+++ b/cli/distributed/Conductor.cpp
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Conductor.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <exception>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "cli/DefaultsConfigurator.hpp"
+#include "cli/Flags.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageConstants.hpp"
+#include "utility/SqlError.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Conductor::init() {
+ try {
+ string catalog_path = FLAGS_storage_path + kCatalogFilename;
+
+ if (quickstep::FLAGS_initialize_db) { // Initialize the database
+ DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
+ }
+
+ query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
+ } catch (const std::exception &e) {
+ LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
+ << "\nIf you intended to create a new database, "
+ << "please use the \"-initialize_db=true\" command line option.";
+ } catch (...) {
+ LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
+ }
+
+ bus_.ResetBus();
+
+ conductor_client_id_ = bus_.Connect();
+ DLOG(INFO) << "Conductor TMB Client ID: " << conductor_client_id_;
+
+ bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
+
+ bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+
+ block_locator_ = make_unique<BlockLocator>(&bus_);
+ block_locator_->start();
+
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, query_processor_->getDefaultDatabase());
+ foreman_->start();
+}
+
+void Conductor::run() {
+ for (;;) {
+ AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const client_id sender = annotated_message.sender;
+
+ DLOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
+ << "' message from client " << sender;
+ switch (tagged_message.message_type()) {
+ case kDistributedCliRegistrationMessage: {
+ TaggedMessage message(kDistributedCliRegistrationResponseMessage);
+
+ DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
+ << kDistributedCliRegistrationResponseMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+ break;
+ }
+ case kSqlQueryMessage: {
+ S::SqlQueryMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ DLOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
+
+ processSqlQueryMessage(sender, new string(move(proto.sql_query())));
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown TMB message type";
+ }
+ }
+}
+
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+ parser_wrapper_.feedNextBuffer(command_string);
+ ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+ CHECK(parse_result.condition == ParseResult::kSuccess)
+ << "Any SQL syntax error should be addressed in the DistributedCli.";
+
+ const ParseStatement &statement = *parse_result.parsed_statement;
+ CHECK(statement.getStatementType() != ParseStatement::kCommand)
+ << "TODO(quickstep-team)";
+
+ try {
+ auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+ sender,
+ statement.getPriority());
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_,
+ foreman_->getBusClientID(),
+ query_handle.release(),
+ &bus_);
+ } catch (const SqlError &sql_error) {
+ // Set the query execution status along with the error message.
+ S::QueryExecutionErrorMessage proto;
+ proto.set_error_message(sql_error.formatMessage(*command_string));
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionErrorMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
+ << kQueryExecutionErrorMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
new file mode 100644
index 0000000..e8c9582
--- /dev/null
+++ b/cli/distributed/Conductor.hpp
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Role.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A class for the Conductor component in the distributed version.
+ **/
+class Conductor final : public Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Conductor() = default;
+
+ ~Conductor() override {
+ foreman_->join();
+ block_locator_->join();
+ }
+
+ void init() override;
+
+ void run() override;
+
+ private:
+ void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
+
+ SqlParserWrapper parser_wrapper_;
+
+ std::unique_ptr<QueryProcessor> query_processor_;
+
+ tmb::client_id conductor_client_id_;
+
+ std::unique_ptr<BlockLocator> block_locator_;
+
+ std::unique_ptr<ForemanDistributed> foreman_;
+
+ DISALLOW_COPY_AND_ASSIGN(Conductor);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
new file mode 100644
index 0000000..1d03579
--- /dev/null
+++ b/cli/distributed/Executor.cpp
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Executor.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+
+#include "glog/logging.h"
+
+using std::make_unique;
+using std::size_t;
+using std::vector;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+void Executor::init() {
+ executor_client_id_ = bus_.Connect();
+ DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
+
+ bus_.RegisterClientAsSender(executor_client_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(executor_client_id_, kBlockDomainRegistrationResponseMessage);
+
+ vector<client_id> worker_client_ids;
+ vector<numa_node_id> worker_numa_nodes(FLAGS_num_workers, kAnyNUMANodeID);
+
+ for (std::size_t worker_thread_index = 0;
+ worker_thread_index < FLAGS_num_workers;
+ ++worker_thread_index) {
+ workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+ worker_client_ids.push_back(workers_.back()->getBusClientID());
+ }
+
+ worker_directory_ =
+ make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, worker_numa_nodes);
+
+ client_id locator_client_id;
+ storage_manager_ = make_unique<StorageManager>(
+ FLAGS_storage_path,
+ block_locator::getBlockDomain(data_exchanger_.network_address(), executor_client_id_, &locator_client_id, &bus_),
+ locator_client_id, &bus_);
+
+ data_exchanger_.set_storage_manager(storage_manager_.get());
+ data_exchanger_.start();
+
+ shiftboss_ =
+ make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+ shiftboss_->start();
+
+ for (const auto &worker : workers_) {
+ worker->start();
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
new file mode 100644
index 0000000..6ffa756
--- /dev/null
+++ b/cli/distributed/Executor.hpp
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "cli/distributed/Role.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A class for the Executor component in the distributed version.
+ **/
+class Executor final : public Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Executor() = default;
+
+ ~Executor() override {
+ for (const auto &worker : workers_) {
+ worker->join();
+ }
+ shiftboss_->join();
+
+ data_exchanger_.shutdown();
+ storage_manager_.reset();
+ data_exchanger_.join();
+ }
+
+ void init() override;
+
+ void run() override {}
+
+ private:
+ tmb::client_id executor_client_id_;
+
+ std::vector<std::unique_ptr<Worker>> workers_;
+ std::unique_ptr<WorkerDirectory> worker_directory_;
+ DataExchangerAsync data_exchanger_;
+ std::unique_ptr<StorageManager> storage_manager_;
+ std::unique_ptr<Shiftboss> shiftboss_;
+
+ DISALLOW_COPY_AND_ASSIGN(Executor);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
new file mode 100644
index 0000000..f01cd13
--- /dev/null
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep */
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Cli.hpp"
+#include "cli/distributed/Conductor.hpp"
+#include "cli/distributed/Executor.hpp"
+#include "cli/distributed/Role.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+using std::make_unique;
+
+namespace quickstep {
+
+constexpr char kCliRole[] = "cli";
+constexpr char kConductorRole[] = "conductor";
+constexpr char kExecutorRole[] = "executor";
+
+DEFINE_string(role, "",
+ "The role in the distributed Quickstep: Conductor, Executor, or Cli.");
+static bool ValidateRole(const char *flagname,
+ const std::string &value) {
+ if (value.empty()) {
+ return false;
+ }
+
+ FLAGS_role = ToLower(value);
+ return FLAGS_role == kCliRole ||
+ FLAGS_role == kConductorRole ||
+ FLAGS_role == kExecutorRole;
+}
+static const volatile bool role_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_role, &ValidateRole);
+
+} // namespace quickstep
+
+using quickstep::FLAGS_role;
+
+int main(int argc, char *argv[]) {
+ google::InitGoogleLogging(argv[0]);
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ grpc_init();
+
+ std::unique_ptr<quickstep::Role> role;
+ if (FLAGS_role == quickstep::kCliRole) {
+ role = make_unique<quickstep::Cli>();
+ } else if (FLAGS_role == quickstep::kConductorRole) {
+ role = make_unique<quickstep::Conductor>();
+ } else if (FLAGS_role == quickstep::kExecutorRole) {
+ role = make_unique<quickstep::Executor>();
+ }
+
+ role->init();
+ role->run();
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Role.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.cpp b/cli/distributed/Role.cpp
new file mode 100644
index 0000000..d56ef09
--- /dev/null
+++ b/cli/distributed/Role.cpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Role.hpp"
+
+#include <cstdio>
+#include <cstdint>
+
+#include "gflags/gflags.h"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+ std::int32_t value) {
+ if (value > 0 && value < 65536) {
+ return true;
+ } else {
+ std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+ return false;
+ }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+Role::Role() {
+ bus_.AddServer(FLAGS_tmb_server_ip, FLAGS_tmb_server_port);
+ bus_.Initialize();
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Role.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.hpp b/cli/distributed/Role.hpp
new file mode 100644
index 0000000..b802543
--- /dev/null
+++ b/cli/distributed/Role.hpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+
+#include "utility/Macros.hpp"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A base class for all components in the distributed version.
+ **/
+class Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Role();
+
+ /**
+ * @brief Virtual destructor.
+ **/
+ virtual ~Role() {}
+
+ /**
+ * @brief Initialize the component.
+ **/
+ virtual void init() = 0;
+
+ /**
+ * @brief Start the component.
+ **/
+ virtual void run() = 0;
+
+ protected:
+ tmb::NativeNetClientMessageBus bus_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Role);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 93e458c..28b5ebd 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -78,6 +78,10 @@ message ShiftbossRegistrationResponseMessage {
required uint64 shiftboss_index = 1;
}
+message SqlQueryMessage {
+ required string sql_query = 1;
+}
+
message QueryInitiateMessage {
required uint64 query_id = 1;
required CatalogDatabase catalog_database_cache = 2;
@@ -131,6 +135,10 @@ message QueryExecutionSuccessMessage {
optional CatalogRelationSchema result_relation = 1;
}
+message QueryExecutionErrorMessage {
+ required string error_message = 1;
+}
+
// BlockLocator related messages.
message BlockDomainRegistrationMessage {
// Format IP:Port, i.e., "0.0.0.0:0".
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 919e45b..faf2132 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,9 @@ enum QueryExecutionMessageType : message_type_id {
kShiftbossRegistrationMessage, // From Shiftboss to Foreman.
kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss, or from
// Shiftboss to Worker.
+ kDistributedCliRegistrationMessage, // From CLI to Conductor.
+ kDistributedCliRegistrationResponseMessage, // From Conductor to CLI.
+ kSqlQueryMessage, // From CLI to Conductor.
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
@@ -92,8 +95,9 @@ enum QueryExecutionMessageType : message_type_id {
kSaveQueryResultMessage, // From Foreman to Shiftboss.
kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
- // From Foreman to CLI.
+ // From Foreman / Conductor to CLI.
kQueryExecutionSuccessMessage,
+ kQueryExecutionErrorMessage,
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index f691d1f..9d1f530 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -46,7 +46,9 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
# Explicitly ignored dependencies (special headers with no other quickstep
# dependencies).
IGNORED_DEPENDENCIES = frozenset(
- ["quickstep_storage_DataExchange.grpc_proto",
+ ["quickstep_cli_LineReaderDumb",
+ "quickstep_cli_LineReaderLineNoise",
+ "quickstep_storage_DataExchange.grpc_proto",
"quickstep_threading_WinThreadsAPI",
"quickstep_utility_textbasedtest_TextBasedTest",
"quickstep_utility_textbasedtest_TextBasedTestDriver",