You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/12/05 04:58:05 UTC
[1/5] incubator-quickstep git commit: Logged only in the debug mode.
[Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-executable 8ea875d04 -> c128191cc (forced update)
Logged only in the debug mode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ff89dd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ff89dd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ff89dd5
Branch: refs/heads/dist-executable
Commit: 5ff89dd5427bf904e1406111bc2877836f586196
Parents: c608e99
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Nov 30 23:07:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Nov 30 23:07:25 2016 -0800
----------------------------------------------------------------------
storage/StorageManager.cpp | 26 +++++++++++++-------------
1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ff89dd5/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 56ca323..a202f71 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -557,9 +557,9 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
kGetPeerDomainNetworkAddressesMessage);
free(proto_bytes);
- LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
- << "') to BlockLocator";
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+ << "') to BlockLocator";
DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
DCHECK(bus_ != nullptr);
@@ -573,8 +573,8 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
const TaggedMessage &tagged_message = annotated_message.tagged_message;
CHECK_EQ(block_locator_client_id_, annotated_message.sender);
CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
- LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -591,10 +591,10 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
const tmb::message_type_id message_type) {
switch (message_type) {
case kAddBlockLocationMessage:
- LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ DLOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
break;
case kDeleteBlockLocationMessage:
- LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ DLOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
break;
default:
LOG(FATAL) << "Unknown message type " << message_type;
@@ -613,9 +613,9 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
message_type);
free(proto_bytes);
- LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent BlockLocationMessage (typed '" << message_type
- << "') to BlockLocator";
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockLocationMessage (typed '" << message_type
+ << "') to BlockLocator";
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
storage_manager_client_id_,
@@ -635,7 +635,7 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
// TODO(quickstep-team): Use a cost model to determine whether to load from
// a remote peer or the disk.
if (BlockIdUtil::Domain(block) != block_domain_) {
- LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+ DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
for (const string &peer_domain_network_address : peer_domain_network_addresses) {
DataExchangerClientAsync client(
@@ -648,8 +648,8 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
}
}
- LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
- << " from remote peers, so try to load from disk.";
+ DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+ << " from remote peers, so try to load from disk.";
}
#endif
[3/5] incubator-quickstep git commit: Marked SingleNodeQuery for
Insertions.
Posted by zu...@apache.org.
Marked SingleNodeQuery for Insertions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0859a17a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0859a17a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0859a17a
Branch: refs/heads/dist-executable
Commit: 0859a17aa4e71ef8d3d261f15e52518b39f617f6
Parents: e50a2b7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 14:11:58 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 15:44:24 2016 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 16 +++++++---
query_execution/PolicyEnforcerDistributed.hpp | 23 +++++++++++---
query_optimizer/ExecutionGenerator.cpp | 3 ++
query_optimizer/QueryHandle.hpp | 37 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 61f0603..0dad8b0 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
return true;
}
+namespace {
+constexpr size_t kDefaultShiftbossIndex = 0u;
+} // namespace
+
void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
- const size_t num_shiftbosses = shiftboss_directory_.size();
- size_t shiftboss_index = 0u;
+ static size_t shiftboss_index = kDefaultShiftbossIndex;
+
+ PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get());
for (const auto &message : messages) {
DCHECK(message != nullptr);
const S::WorkOrderMessage &proto = *message;
size_t shiftboss_index_for_particular_work_order_type;
- if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
+ // Always schedule the single-node query to the same Shiftboss.
+ shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+ } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else {
// TODO(zuyu): Take data-locality into account for scheduling.
@@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
- shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+ shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size();
} else {
// NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
// <shiftboss_index_for_particular_work_order_type> might be scheduled one
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e8bc394..2c00a6b 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -17,14 +17,20 @@
#include <cstddef>
#include <memory>
+#include <unordered_map>
+#include <utility>
#include <vector>
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
+#include "glog/logging.h"
+
#include "tmb/id_typedefs.h"
namespace tmb {
@@ -35,10 +41,6 @@ class TaggedMessage;
namespace quickstep {
class CatalogDatabaseLite;
-class QueryHandle;
-class QueryManagerBase;
-
-namespace serialization { class WorkOrderMessage; }
/** \addtogroup QueryExecution
* @{
@@ -90,6 +92,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
/**
+ * @brief Whether the query should be executed on one Shiftboss.
+ *
+ * @param query_id The query id.
+ *
+ * @return Whether the query should be executed on one Shiftboss.
+ **/
+ bool isSingleNodeQuery(const std::size_t query_id) const {
+ const auto cit = admitted_queries_.find(query_id);
+ DCHECK(cit != admitted_queries_.end());
+ return cit->second->query_handle()->is_single_node_query();
+ }
+
+ /**
* @brief Get or set the index of Shiftboss for an Aggregation related
* WorkOrder. If it is the first Aggregation on <aggr_state_index>,
* <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e0d8f3..5a2c450 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable(
void ExecutionGenerator::convertInsertTuple(
const P::InsertTuplePtr &physical_plan) {
// InsertTuple is converted to an Insert and a SaveBlocks.
+#ifdef QUICKSTEP_DISTRIBUTED
+ query_handle_->set_is_single_node_query();
+#endif // QUICKSTEP_DISTRIBUTED
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 1ca6021..cbd1cd9 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -26,6 +26,7 @@
#include "catalog/Catalog.pb.h"
#include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "query_optimizer/QueryPlan.hpp"
#include "utility/Macros.hpp"
@@ -134,6 +135,22 @@ class QueryHandle {
query_result_relation_ = relation;
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Whether the query will be executed in the single node.
+ */
+ bool is_single_node_query() const {
+ return is_single_node_query_;
+ }
+
+ /**
+ * @brief Set the query to be executed in the single node.
+ */
+ void set_is_single_node_query() {
+ is_single_node_query_ = true;
+ }
+#endif // QUICKSTEP_DISTRIBUTED
+
private:
const std::size_t query_id_;
@@ -153,6 +170,26 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;
+#ifdef QUICKSTEP_DISTRIBUTED
+ // Indicate whether the query should be executed on the default Shiftboss for
+ // correctness purpose.
+ // An example would be the insert query that might otherwise need block
+ // invalidation among multiple StorageManagers. In this case, an insert query
+ // has scheduled on node 0, and the block is in the buffer pool of node 0.
+ // Another insert query on the same relation might be scheduled on another
+ // node, say node 1, which will pull the block from node 0, and do the
+ // insertion. Thus, two blocks with the same block id in two nodes
+ // have different contents, which is incorrect.
+ // One approach is to evict blocks cached in all other nodes for every
+ // change. It, however, does not scale, and even worse, it will also affect
+ // the performance of each select query.
+ // Alternatively, we choose to mark the query as a single-node query to
+ // modify blocks on the default node only. But if the changed block has also
+ // cached in another node, this approach would still produce inconsistent
+ // query result.
+ bool is_single_node_query_ = false;
+#endif // QUICKSTEP_DISTRIBUTED
+
DISALLOW_COPY_AND_ASSIGN(QueryHandle);
};
[4/5] incubator-quickstep git commit: Scheduling based on data
locality info.
Posted by zu...@apache.org.
Scheduling based on data locality info.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/137daf9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/137daf9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/137daf9a
Branch: refs/heads/dist-executable
Commit: 137daf9aa02e3fe6d9585828bbab2face3c393ad
Parents: 0859a17
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 20:41:10 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 20:41:10 2016 -0800
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 7 +++
query_execution/BlockLocator.hpp | 33 +++++++++-
query_execution/CMakeLists.txt | 2 +
query_execution/ForemanDistributed.cpp | 65 +++++++++++++++++++-
query_execution/ForemanDistributed.hpp | 27 +++++++-
query_execution/QueryExecutionMessages.proto | 6 ++
query_execution/QueryExecutionTypedefs.hpp | 1 +
query_execution/Shiftboss.cpp | 1 +
.../DistributedExecutionGeneratorTestRunner.cpp | 4 +-
storage/StorageManager.cpp | 29 +++++++++
storage/StorageManager.hpp | 9 +++
11 files changed, 178 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 5de6a54..e0fc47a 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -65,6 +65,13 @@ void BlockLocator::run() {
processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
break;
}
+ case kBlockDomainToShiftbossIndexMessage: {
+ serialization::BlockDomainToShiftbossIndexMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ block_domain_to_shiftboss_index_.emplace(proto.block_domain(), proto.shiftboss_index());
+ break;
+ }
case kAddBlockLocationMessage: {
serialization::BlockLocationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index a83a394..d5753a8 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -21,6 +21,7 @@
#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
#include <atomic>
+#include <cstddef>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -48,6 +49,9 @@ namespace quickstep {
**/
class BlockLocator : public Thread {
public:
+ typedef std::unordered_map<block_id_domain, std::size_t> BlockDomainToShiftbossIndex;
+ typedef std::unordered_map<block_id, std::unordered_set<block_id_domain>> BlockLocation;
+
/**
* @brief Constructor.
*
@@ -67,6 +71,8 @@ class BlockLocator : public Thread {
bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainToShiftbossIndexMessage);
+
bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
@@ -91,6 +97,24 @@ class BlockLocator : public Thread {
return locator_client_id_;
}
+ /**
+ * @brief Get the mapping from a block domain to its Shiftboss index.
+ *
+ * @return The mapping from a block domain to its Shiftboss index.
+ **/
+ const BlockDomainToShiftbossIndex& block_domain_to_shiftboss_index() const {
+ return block_domain_to_shiftboss_index_;
+ }
+
+ /**
+ * @brief Get the mapping from a block id to its loaded block domain.
+ *
+ * @return The mapping from a block id to its loaded block domain.
+ **/
+ const BlockLocation& block_locations() const {
+ return block_locations_;
+ }
+
protected:
void run() override;
@@ -110,8 +134,15 @@ class BlockLocator : public Thread {
// "0.0.0.0:0".
std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
+ // From a block domain to its Shiftboss index, used by ForemanDistributed
+ // to schedule based on the data-locality info.
+ // Note that not every 'block_id_domain' has a Shiftboss index. For example,
+ // DistributedCli has StorageManager with a 'block_id_domain', which is not
+ // a part of Shiftboss.
+ BlockDomainToShiftbossIndex block_domain_to_shiftboss_index_;
+
// From a block to its domains.
- std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+ BlockLocation block_locations_;
// From a block domain to all blocks loaded in its buffer pool.
std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 1f7add8..4181f4a 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -105,6 +105,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_BlockLocator
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerDistributed
@@ -114,6 +115,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_ShiftbossDirectory
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_StorageBlockInfo
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 0dad8b0..db82908 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/QueryContext.hpp"
@@ -36,6 +37,7 @@
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
#include "threading/ThreadUtil.hpp"
#include "utility/EqualsAnyConstant.hpp"
@@ -64,10 +66,14 @@ namespace S = serialization;
class QueryHandle;
ForemanDistributed::ForemanDistributed(
+ const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+ const BlockLocator::BlockLocation &block_locations,
MessageBus *bus,
CatalogDatabaseLite *catalog_database,
const int cpu_id)
: ForemanBase(bus, cpu_id),
+ block_domain_to_shiftboss_index_(block_domain_to_shiftboss_index),
+ block_locations_(block_locations),
catalog_database_(DCHECK_NOTNULL(catalog_database)) {
const std::vector<QueryExecutionMessageType> sender_message_types{
kShiftbossRegistrationResponseMessage,
@@ -295,6 +301,62 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
return true;
}
+bool ForemanDistributed::isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+ std::size_t *shiftboss_index_for_join) {
+ if (work_order_proto.work_order_type() != S::NESTED_LOOP_JOIN) {
+ return false;
+ }
+
+ const block_id left_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id);
+ if (hasBlockLocalityInfoHelper(left_block, shiftboss_index_for_join)) {
+ return true;
+ }
+
+ const block_id right_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id);
+ return hasBlockLocalityInfoHelper(right_block, shiftboss_index_for_join);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfo(const S::WorkOrder &work_order_proto,
+ size_t *shiftboss_index_for_block) {
+ block_id block = kInvalidBlockId;
+ switch (work_order_proto.work_order_type()) {
+ case S::SAVE_BLOCKS: {
+ block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
+ break;
+ }
+ case S::SELECT: {
+ block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+ break;
+ }
+ case S::SORT_RUN_GENERATION: {
+ block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
+ break;
+ }
+ default:
+ return false;
+ }
+
+ DCHECK_NE(block, kInvalidBlockId);
+ return hasBlockLocalityInfoHelper(block, shiftboss_index_for_block);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfoHelper(const block_id block,
+ size_t *shiftboss_index_for_block) {
+ const auto cit = block_locations_.find(block);
+ if (cit != block_locations_.end()) {
+ for (const block_id_domain block_domain : cit->second) {
+ // TODO(quickstep-team): choose the best node, instead of the first.
+ const auto cit_shiftboss_index = block_domain_to_shiftboss_index_.find(block_domain);
+ if (cit_shiftboss_index != block_domain_to_shiftboss_index_.end()) {
+ *shiftboss_index_for_block = cit_shiftboss_index->second;
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
namespace {
constexpr size_t kDefaultShiftbossIndex = 0u;
} // namespace
@@ -312,8 +374,9 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
} else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ } else if (hasBlockLocalityInfo(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
+ } else if (isNestedLoopsJoinWorkOrder(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
} else {
- // TODO(zuyu): Take data-locality into account for scheduling.
shiftboss_index_for_particular_work_order_type = shiftboss_index;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 34bac07..92d1e98 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -23,8 +23,10 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
@@ -51,6 +53,9 @@ class ForemanDistributed final : public ForemanBase {
/**
* @brief Constructor.
*
+ * @param block_domain_to_shiftboss_index A mapping from a block domain to
+ * its Shiftboss index in BlockLocator.
+ * @param block_locations Block locality info in BlockLocator.
* @param bus A pointer to the TMB.
* @param catalog_database The catalog database where this query is executed.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
@@ -58,9 +63,12 @@ class ForemanDistributed final : public ForemanBase {
* @note If cpu_id is not specified, Foreman thread can be possibly moved
* around on different CPUs by the OS.
**/
- ForemanDistributed(tmb::MessageBus *bus,
- CatalogDatabaseLite *catalog_database,
- const int cpu_id = -1);
+ ForemanDistributed(
+ const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+ const BlockLocator::BlockLocation &block_locations,
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id = -1);
~ForemanDistributed() override {}
@@ -79,6 +87,15 @@ class ForemanDistributed final : public ForemanBase {
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index_for_hash_join);
+ bool isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+ std::size_t *shiftboss_index_for_join);
+
+ bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
+ std::size_t *shiftboss_index_for_block);
+
+ bool hasBlockLocalityInfoHelper(const block_id block,
+ std::size_t *shiftboss_index_for_block);
+
/**
* @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
* worker threads.
@@ -111,6 +128,10 @@ class ForemanDistributed final : public ForemanBase {
**/
bool canCollectNewMessages(const tmb::message_type_id message_type);
+ // Block locality info for scheduling.
+ const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index_;
+ const BlockLocator::BlockLocation &block_locations_;
+
ShiftbossDirectory shiftboss_directory_;
CatalogDatabaseLite *catalog_database_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e6d741a..93e458c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -142,6 +142,12 @@ message BlockDomainMessage {
required uint32 block_domain = 1;
}
+// Used for the block locality based scheduling in ForemanDistributed.
+message BlockDomainToShiftbossIndexMessage {
+ required uint32 block_domain = 1;
+ required uint64 shiftboss_index = 2;
+}
+
// Used when StorageManager loads or evicts a block or a blob from its buffer
// pool.
message BlockLocationMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fb9a9d6..919e45b 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -99,6 +99,7 @@ enum QueryExecutionMessageType : message_type_id {
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker.
+ kBlockDomainToShiftbossIndexMessage, // From StorageManager to BlockLocator.
kAddBlockLocationMessage, // From StorageManager to BlockLocator.
kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
kLocateBlockMessage, // From StorageManager to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ed4bade..2ed42d0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -286,6 +286,7 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
shiftboss_index_ = proto.shiftboss_index();
+ storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
// Forward this message to Workers regarding <shiftboss_index_>.
QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5100651..7a0e720 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -98,7 +98,9 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(block_locator_->block_domain_to_shiftboss_index(),
+ block_locator_->block_locations(),
+ &bus_, test_database_loader_->catalog_database());
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index a202f71..6299cda 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -230,6 +230,8 @@ StorageManager::StorageManager(
bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainToShiftbossIndexMessage);
+
bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
@@ -470,6 +472,33 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
}
#ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index) {
+ serialization::BlockDomainToShiftbossIndexMessage proto;
+ proto.set_block_domain(block_domain_);
+ proto.set_shiftboss_index(shiftboss_index);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainToShiftbossIndexMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockDomainToShiftbossIndexMessage (typed '" << kBlockDomainToShiftbossIndexMessage
+ << "') to BlockLocator";
+
+ DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+ DCHECK(bus_ != nullptr);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+}
+
void StorageManager::pullBlockOrBlob(const block_id block,
PullResponse *response) const {
SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 066953b..b61f10a 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -382,6 +382,15 @@ class StorageManager {
#ifdef QUICKSTEP_DISTRIBUTED
/**
+ * @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
+ * ForemanDistributed could take advantages of block locality info
+ * for a better scheduling policy.
+ *
+ * @param shiftboss_index The Shiftboss index.
+ **/
+ void sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index);
+
+ /**
* @brief Pull a block or a blob. Used by DataExchangerAsync.
*
* @param block The id of the block or blob.
[5/5] incubator-quickstep git commit: Added DistributedCli executable.
Posted by zu...@apache.org.
Added DistributedCli executable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c128191c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c128191c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c128191c
Branch: refs/heads/dist-executable
Commit: c128191cc7b8279dcfa18fd98d6cae12364ef44c
Parents: 137daf9
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 27 22:32:24 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 20:43:15 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 17 ++
cli/CMakeLists.txt | 3 +
cli/distributed/CMakeLists.txt | 93 +++++++++
cli/distributed/Cli.cpp | 239 ++++++++++++++++++++++
cli/distributed/Cli.hpp | 71 +++++++
cli/distributed/CliDistributedModule.hpp | 23 +++
cli/distributed/Conductor.cpp | 182 ++++++++++++++++
cli/distributed/Conductor.hpp | 80 ++++++++
cli/distributed/Executor.cpp | 87 ++++++++
cli/distributed/Executor.hpp | 83 ++++++++
cli/distributed/QuickstepDistributedCli.cpp | 81 ++++++++
cli/distributed/Role.cpp | 51 +++++
cli/distributed/Role.hpp | 69 +++++++
query_execution/QueryExecutionMessages.proto | 8 +
query_execution/QueryExecutionTypedefs.hpp | 6 +-
validate_cmakelists.py | 4 +-
16 files changed, 1095 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 126b47b..391cb26 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -782,3 +782,20 @@ endif()
# Link against other required system and third-party libraries.
target_link_libraries(quickstep_cli_shell ${LIBS})
+
+if (ENABLE_DISTRIBUTED)
+ # Build the quickstep_distributed_cli_shell executable.
+ add_executable (quickstep_distributed_cli_shell cli/distributed/QuickstepDistributedCli.cpp)
+ # Link against direct deps (will transitively pull in everything needed).
+ # NOTE(zuyu): Link quickstep_cli_LineReader on behalf of quickstep_cli_distributed_Cli,
+ # as a workaround for bypassing conditionally built target checks in validate_cmakelists.py.
+ target_link_libraries(quickstep_distributed_cli_shell
+ glog
+ quickstep_cli_LineReader
+ quickstep_cli_distributed_Cli
+ quickstep_cli_distributed_Conductor
+ quickstep_cli_distributed_Executor
+ quickstep_utility_StringUtil
+ ${GFLAGS_LIB_NAME}
+ ${GRPCPLUSPLUS_LIBRARIES})
+endif(ENABLE_DISTRIBUTED)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 9b62af9..be13c82 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -16,6 +16,9 @@
# under the License.
include_directories(${CMAKE_CURRENT_BINARY_DIR})
+if (ENABLE_DISTRIBUTED)
+ add_subdirectory(distributed)
+endif(ENABLE_DISTRIBUTED)
add_subdirectory(tests)
if (WIN32)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
new file mode 100644
index 0000000..e16d8af
--- /dev/null
+++ b/cli/distributed/CMakeLists.txt
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_SHARED_LIBS)
+ set(GFLAGS_LIB_NAME gflags_nothreads-shared)
+else()
+ set(GFLAGS_LIB_NAME gflags_nothreads-static)
+endif()
+
+# Declare micro-libs and link dependencies:
+add_library(quickstep_cli_distributed_Cli Cli.cpp Cli.hpp)
+add_library(quickstep_cli_distributed_Conductor Conductor.cpp Conductor.hpp)
+add_library(quickstep_cli_distributed_Executor Executor.cpp Executor.hpp)
+add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_cli_distributed_Cli
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_cli_Flags
+ quickstep_cli_PrintToScreen
+ quickstep_cli_distributed_Role
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_BlockLocatorUtil
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_utility_Macros
+ quickstep_utility_StringUtil
+ tmb)
+target_link_libraries(quickstep_cli_distributed_Conductor
+ glog
+ quickstep_cli_DefaultsConfigurator
+ quickstep_cli_Flags
+ quickstep_cli_distributed_Role
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryProcessor
+ quickstep_storage_StorageConstants
+ quickstep_utility_Macros
+ quickstep_utility_SqlError
+ tmb)
+target_link_libraries(quickstep_cli_distributed_Executor
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_cli_Flags
+ quickstep_cli_distributed_Role
+ quickstep_queryexecution_BlockLocatorUtil
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_Shiftboss
+ quickstep_queryexecution_Worker
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageManager
+ quickstep_utility_Macros
+ tmb)
+target_link_libraries(quickstep_cli_distributed_Role
+ quickstep_utility_Macros
+ tmb
+ ${GFLAGS_LIB_NAME})
+
+# Module all-in-one library:
+add_library(quickstep_cli_distributed ../../empty_src.cpp CliDistributedModule.hpp)
+
+target_link_libraries(quickstep_cli_distributed
+ quickstep_cli_distributed_Cli
+ quickstep_cli_distributed_Conductor
+ quickstep_cli_distributed_Executor
+ quickstep_cli_distributed_Role)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
new file mode 100644
index 0000000..01f824d
--- /dev/null
+++ b/cli/distributed/Cli.cpp
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Cli.hpp"
+
+#include <chrono>
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "cli/CliConfig.h" // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Flags.hpp"
+
+#ifdef QUICKSTEP_USE_LINENOISE
+#include "cli/LineReaderLineNoise.hpp"
+typedef quickstep::LineReaderLineNoise LineReaderImpl;
+#else
+#include "cli/LineReaderDumb.hpp"
+typedef quickstep::LineReaderDumb LineReaderImpl;
+#endif
+
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "utility/StringUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::fprintf;
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::printf;
+using std::size_t;
+using std::string;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Cli::init() {
+ cli_id_ = bus_.Connect();
+ DLOG(INFO) << "DistributedCli TMB Client ID: " << cli_id_;
+
+ bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage);
+
+ DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage (typed '"
+ << kDistributedCliRegistrationMessage
+ << "') to all";
+
+ tmb::Address all_addresses;
+ all_addresses.All(true);
+ // NOTE(zuyu): The singleton Conductor would need one copy of the message.
+ tmb::MessageStyle style;
+
+ TaggedMessage cli_reg_message(kDistributedCliRegistrationMessage);
+ DCHECK(tmb::MessageBus::SendStatus::kOK ==
+ bus_.Send(cli_id_, all_addresses, style, move(cli_reg_message)));
+
+ // Wait for Conductor to response.
+ const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true));
+ DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
+ cli_reg_response_message.tagged_message.message_type());
+ conductor_client_id_ = cli_reg_response_message.sender;
+
+ DLOG(INFO) << "DistributedCli received typed '" << kDistributedCliRegistrationResponseMessage
+ << "' message from Conductor (id'" << conductor_client_id_ << "').";
+
+ // Setup StorageManager.
+ bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+ client_id locator_client_id;
+ storage_manager_ = make_unique<StorageManager>(
+ FLAGS_storage_path,
+ block_locator::getBlockDomain(data_exchanger_.network_address(), cli_id_, &locator_client_id, &bus_),
+ locator_client_id, &bus_);
+
+ data_exchanger_.set_storage_manager(storage_manager_.get());
+ data_exchanger_.start();
+
+ // Prepare for submitting a query.
+ bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+}
+
+void Cli::run() {
+ LineReaderImpl line_reader("distributed_quickstep> ",
+ " ...> ");
+ auto parser_wrapper = make_unique<SqlParserWrapper>();
+ std::chrono::time_point<std::chrono::steady_clock> start, end;
+
+ for (;;) {
+ string *command_string = new string();
+ *command_string = line_reader.getNextCommand();
+ if (command_string->size() == 0) {
+ delete command_string;
+ break;
+ }
+
+ parser_wrapper->feedNextBuffer(command_string);
+
+ bool quitting = false;
+ // A parse error should reset the parser. This is because the thrown quickstep
+ // SqlError does not do the proper reset work of the YYABORT macro.
+ bool reset_parser = false;
+ for (;;) {
+ ParseResult result = parser_wrapper->getNextStatement();
+ const ParseStatement &statement = *result.parsed_statement;
+ if (result.condition == ParseResult::kSuccess) {
+ if (statement.getStatementType() == ParseStatement::kQuit) {
+ quitting = true;
+ break;
+ }
+
+ CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
+ << "TODO(quickstep-team)";
+
+ DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+ << "') to Conductor";
+ S::SqlQueryMessage proto;
+ proto.set_sql_query(*command_string);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kSqlQueryMessage);
+ free(proto_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ cli_id_,
+ conductor_client_id_,
+ move(sql_query_message));
+
+ start = std::chrono::steady_clock::now();
+
+ const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+ switch (tagged_message.message_type()) {
+ case kQueryExecutionSuccessMessage: {
+ end = std::chrono::steady_clock::now();
+
+ S::QueryExecutionSuccessMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ if (proto.has_result_relation()) {
+ CatalogRelation result_relation(proto.result_relation());
+
+ PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
+
+ const vector<block_id> blocks(result_relation.getBlocksSnapshot());
+ for (const block_id block : blocks) {
+ storage_manager_->deleteBlockOrBlobFile(block);
+ }
+ }
+
+ std::chrono::duration<double, std::milli> time_in_ms = end - start;
+ printf("Time: %s ms\n", DoubleToStringWithSignificantDigits(time_in_ms.count(), 3).c_str());
+ break;
+ }
+ case kQueryExecutionErrorMessage: {
+ S::QueryExecutionErrorMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ fprintf(stderr, "%s", proto.error_message().c_str());
+ break;
+ }
+ default: {
+ LOG(ERROR) << "Unknown TMB message type";
+ }
+ }
+ } else {
+ if (result.condition == ParseResult::kError) {
+ fprintf(stderr, "%s", result.error_message.c_str());
+ }
+ reset_parser = true;
+ break;
+ }
+ }
+
+ if (quitting) {
+ break;
+ } else if (reset_parser) {
+ parser_wrapper = make_unique<SqlParserWrapper>();
+ reset_parser = false;
+ }
+ }
+
+ bus_.Disconnect(cli_id_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Cli.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.hpp b/cli/distributed/Cli.hpp
new file mode 100644
index 0000000..32c178f
--- /dev/null
+++ b/cli/distributed/Cli.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+
+#include <memory>
+
+#include "cli/distributed/Role.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A class for the Cli component in the distributed version.
+ **/
+class Cli final : public Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Cli() = default;
+
+ ~Cli() override {
+ data_exchanger_.shutdown();
+ storage_manager_.reset();
+ data_exchanger_.join();
+ }
+
+ void init() override;
+
+ void run() override;
+
+ private:
+ tmb::client_id cli_id_, conductor_client_id_;
+
+ DataExchangerAsync data_exchanger_;
+ std::unique_ptr<StorageManager> storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(Cli);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/CliDistributedModule.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/CliDistributedModule.hpp b/cli/distributed/CliDistributedModule.hpp
new file mode 100644
index 0000000..cfa1e1b
--- /dev/null
+++ b/cli/distributed/CliDistributedModule.hpp
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/** @defgroup CliDistributed
+ *
+ * The distributed QuickStep command-line interface.
+ **/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
new file mode 100644
index 0000000..95e5f4d
--- /dev/null
+++ b/cli/distributed/Conductor.cpp
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Conductor.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <exception>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "cli/DefaultsConfigurator.hpp"
+#include "cli/Flags.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageConstants.hpp"
+#include "utility/SqlError.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Conductor::init() {
+ try {
+ string catalog_path = FLAGS_storage_path + kCatalogFilename;
+
+ if (quickstep::FLAGS_initialize_db) { // Initialize the database
+ DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
+ }
+
+ query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
+ } catch (const std::exception &e) {
+ LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
+ << "\nIf you intended to create a new database, "
+ << "please use the \"-initialize_db=true\" command line option.";
+ } catch (...) {
+ LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
+ }
+
+ bus_.ResetBus();
+
+ conductor_client_id_ = bus_.Connect();
+ DLOG(INFO) << "Conductor TMB Client ID: " << conductor_client_id_;
+
+ bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
+
+ bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+
+ block_locator_ = make_unique<BlockLocator>(&bus_);
+ block_locator_->start();
+
+ foreman_ = make_unique<ForemanDistributed>(block_locator_->block_domain_to_shiftboss_index(),
+ block_locator_->block_locations(),
+ &bus_, query_processor_->getDefaultDatabase());
+ foreman_->start();
+}
+
+void Conductor::run() {
+ for (;;) {
+ AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const client_id sender = annotated_message.sender;
+
+ DLOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
+ << "' message from client " << sender;
+ switch (tagged_message.message_type()) {
+ case kDistributedCliRegistrationMessage: {
+ TaggedMessage message(kDistributedCliRegistrationResponseMessage);
+
+ DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
+ << kDistributedCliRegistrationResponseMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+ break;
+ }
+ case kSqlQueryMessage: {
+ S::SqlQueryMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ DLOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
+
+ processSqlQueryMessage(sender, new string(move(proto.sql_query())));
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown TMB message type";
+ }
+ }
+}
+
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+ parser_wrapper_.feedNextBuffer(command_string);
+ ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+ CHECK(parse_result.condition == ParseResult::kSuccess)
+ << "Any SQL syntax error should be addressed in the DistributedCli.";
+
+ const ParseStatement &statement = *parse_result.parsed_statement;
+ CHECK(statement.getStatementType() != ParseStatement::kCommand)
+ << "TODO(quickstep-team)";
+
+ try {
+ auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+ sender,
+ statement.getPriority());
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_,
+ foreman_->getBusClientID(),
+ query_handle.release(),
+ &bus_);
+ } catch (const SqlError &sql_error) {
+ // Set the query execution status along with the error message.
+ S::QueryExecutionErrorMessage proto;
+ proto.set_error_message(sql_error.formatMessage(*command_string));
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionErrorMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
+ << kQueryExecutionErrorMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
new file mode 100644
index 0000000..e8c9582
--- /dev/null
+++ b/cli/distributed/Conductor.hpp
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Role.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A class for the Conductor component in the distributed version.
+ **/
+class Conductor final : public Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Conductor() = default;
+
+ ~Conductor() override {
+ foreman_->join();
+ block_locator_->join();
+ }
+
+ void init() override;
+
+ void run() override;
+
+ private:
+ void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
+
+ SqlParserWrapper parser_wrapper_;
+
+ std::unique_ptr<QueryProcessor> query_processor_;
+
+ tmb::client_id conductor_client_id_;
+
+ std::unique_ptr<BlockLocator> block_locator_;
+
+ std::unique_ptr<ForemanDistributed> foreman_;
+
+ DISALLOW_COPY_AND_ASSIGN(Conductor);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
new file mode 100644
index 0000000..1d03579
--- /dev/null
+++ b/cli/distributed/Executor.cpp
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Executor.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+
+#include "glog/logging.h"
+
+using std::make_unique;
+using std::size_t;
+using std::vector;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+void Executor::init() {
+ executor_client_id_ = bus_.Connect();
+ DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
+
+ bus_.RegisterClientAsSender(executor_client_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(executor_client_id_, kBlockDomainRegistrationResponseMessage);
+
+ vector<client_id> worker_client_ids;
+ vector<numa_node_id> worker_numa_nodes(FLAGS_num_workers, kAnyNUMANodeID);
+
+ for (std::size_t worker_thread_index = 0;
+ worker_thread_index < FLAGS_num_workers;
+ ++worker_thread_index) {
+ workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+ worker_client_ids.push_back(workers_.back()->getBusClientID());
+ }
+
+ worker_directory_ =
+ make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, worker_numa_nodes);
+
+ client_id locator_client_id;
+ storage_manager_ = make_unique<StorageManager>(
+ FLAGS_storage_path,
+ block_locator::getBlockDomain(data_exchanger_.network_address(), executor_client_id_, &locator_client_id, &bus_),
+ locator_client_id, &bus_);
+
+ data_exchanger_.set_storage_manager(storage_manager_.get());
+ data_exchanger_.start();
+
+ shiftboss_ =
+ make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+ shiftboss_->start();
+
+ for (const auto &worker : workers_) {
+ worker->start();
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
new file mode 100644
index 0000000..6ffa756
--- /dev/null
+++ b/cli/distributed/Executor.hpp
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "cli/distributed/Role.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A class for the Executor component in the distributed version.
+ **/
+class Executor final : public Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Executor() = default;
+
+ ~Executor() override {
+ for (const auto &worker : workers_) {
+ worker->join();
+ }
+ shiftboss_->join();
+
+ data_exchanger_.shutdown();
+ storage_manager_.reset();
+ data_exchanger_.join();
+ }
+
+ void init() override;
+
+ void run() override {}
+
+ private:
+ tmb::client_id executor_client_id_;
+
+ std::vector<std::unique_ptr<Worker>> workers_;
+ std::unique_ptr<WorkerDirectory> worker_directory_;
+ DataExchangerAsync data_exchanger_;
+ std::unique_ptr<StorageManager> storage_manager_;
+ std::unique_ptr<Shiftboss> shiftboss_;
+
+ DISALLOW_COPY_AND_ASSIGN(Executor);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
new file mode 100644
index 0000000..f01cd13
--- /dev/null
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep */
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Cli.hpp"
+#include "cli/distributed/Conductor.hpp"
+#include "cli/distributed/Executor.hpp"
+#include "cli/distributed/Role.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+using std::make_unique;
+
+namespace quickstep {
+
+constexpr char kCliRole[] = "cli";
+constexpr char kConductorRole[] = "conductor";
+constexpr char kExecutorRole[] = "executor";
+
+DEFINE_string(role, "",
+ "The role in the distributed Quickstep: Conductor, Executor, or Cli.");
+static bool ValidateRole(const char *flagname,
+ const std::string &value) {
+ if (value.empty()) {
+ return false;
+ }
+
+ FLAGS_role = ToLower(value);
+ return FLAGS_role == kCliRole ||
+ FLAGS_role == kConductorRole ||
+ FLAGS_role == kExecutorRole;
+}
+static const volatile bool role_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_role, &ValidateRole);
+
+} // namespace quickstep
+
+using quickstep::FLAGS_role;
+
+int main(int argc, char *argv[]) {
+ google::InitGoogleLogging(argv[0]);
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ grpc_init();
+
+ std::unique_ptr<quickstep::Role> role;
+ if (FLAGS_role == quickstep::kCliRole) {
+ role = make_unique<quickstep::Cli>();
+ } else if (FLAGS_role == quickstep::kConductorRole) {
+ role = make_unique<quickstep::Conductor>();
+ } else if (FLAGS_role == quickstep::kExecutorRole) {
+ role = make_unique<quickstep::Executor>();
+ }
+
+ role->init();
+ role->run();
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Role.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.cpp b/cli/distributed/Role.cpp
new file mode 100644
index 0000000..d56ef09
--- /dev/null
+++ b/cli/distributed/Role.cpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Role.hpp"
+
+#include <cstdio>
+#include <cstdint>
+
+#include "gflags/gflags.h"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+ std::int32_t value) {
+ if (value > 0 && value < 65536) {
+ return true;
+ } else {
+ std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+ return false;
+ }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+Role::Role() {
+ bus_.AddServer(FLAGS_tmb_server_ip, FLAGS_tmb_server_port);
+ bus_.Initialize();
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Role.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.hpp b/cli/distributed/Role.hpp
new file mode 100644
index 0000000..b802543
--- /dev/null
+++ b/cli/distributed/Role.hpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+
+#include "utility/Macros.hpp"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ * @{
+ */
+
+/**
+ * @brief A base class for all components in the distributed version.
+ **/
+class Role {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Role();
+
+ /**
+ * @brief Virtual destructor.
+ **/
+ virtual ~Role() {}
+
+ /**
+ * @brief Initialize the component.
+ **/
+ virtual void init() = 0;
+
+ /**
+ * @brief Start the component.
+ **/
+ virtual void run() = 0;
+
+ protected:
+ tmb::NativeNetClientMessageBus bus_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Role);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 93e458c..28b5ebd 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -78,6 +78,10 @@ message ShiftbossRegistrationResponseMessage {
required uint64 shiftboss_index = 1;
}
+message SqlQueryMessage {
+ required string sql_query = 1;
+}
+
message QueryInitiateMessage {
required uint64 query_id = 1;
required CatalogDatabase catalog_database_cache = 2;
@@ -131,6 +135,10 @@ message QueryExecutionSuccessMessage {
optional CatalogRelationSchema result_relation = 1;
}
+message QueryExecutionErrorMessage {
+ required string error_message = 1;
+}
+
// BlockLocator related messages.
message BlockDomainRegistrationMessage {
// Format IP:Port, i.e., "0.0.0.0:0".
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 919e45b..faf2132 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,9 @@ enum QueryExecutionMessageType : message_type_id {
kShiftbossRegistrationMessage, // From Shiftboss to Foreman.
kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss, or from
// Shiftboss to Worker.
+ kDistributedCliRegistrationMessage, // From CLI to Conductor.
+ kDistributedCliRegistrationResponseMessage, // From Conductor to CLI.
+ kSqlQueryMessage, // From CLI to Conductor.
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
@@ -92,8 +95,9 @@ enum QueryExecutionMessageType : message_type_id {
kSaveQueryResultMessage, // From Foreman to Shiftboss.
kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
- // From Foreman to CLI.
+ // From Foreman / Conductor to CLI.
kQueryExecutionSuccessMessage,
+ kQueryExecutionErrorMessage,
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index f691d1f..9d1f530 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -46,7 +46,9 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
# Explicitly ignored dependencies (special headers with no other quickstep
# dependencies).
IGNORED_DEPENDENCIES = frozenset(
- ["quickstep_storage_DataExchange.grpc_proto",
+ ["quickstep_cli_LineReaderDumb",
+ "quickstep_cli_LineReaderLineNoise",
+ "quickstep_storage_DataExchange.grpc_proto",
"quickstep_threading_WinThreadsAPI",
"quickstep_utility_textbasedtest_TextBasedTest",
"quickstep_utility_textbasedtest_TextBasedTestDriver",
[2/5] incubator-quickstep git commit: LOG only in the debug mode for
BlockLocator.
Posted by zu...@apache.org.
LOG only in the debug mode for BlockLocator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e50a2b7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e50a2b7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e50a2b7a
Branch: refs/heads/dist-executable
Commit: e50a2b7aaed97f3282c338be0b36071a5cffd523
Parents: 5ff89dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Dec 3 21:24:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Dec 3 21:24:25 2016 -0800
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 48 ++++++++++++------------
query_execution/QueryManagerDistributed.cpp | 4 +-
2 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 81684ba..5de6a54 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -55,8 +55,8 @@ void BlockLocator::run() {
const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const client_id sender = annotated_message.sender;
- LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
- << "' message from TMB Client " << sender;
+ DLOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+ << "' message from TMB Client " << sender;
switch (tagged_message.message_type()) {
case kBlockDomainRegistrationMessage: {
serialization::BlockDomainRegistrationMessage proto;
@@ -77,9 +77,9 @@ void BlockLocator::run() {
DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
if (result_domain_blocks.second) {
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
} else {
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
}
break;
}
@@ -95,9 +95,9 @@ void BlockLocator::run() {
block_locations_[block].erase(domain);
domain_blocks_[domain].erase(block);
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
} else {
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
}
break;
}
@@ -128,7 +128,7 @@ void BlockLocator::run() {
}
domain_blocks_.erase(domain);
- LOG(INFO) << "Unregistered Domain " << domain;
+ DLOG(INFO) << "Unregistered Domain " << domain;
break;
}
case kPoisonMessage: {
@@ -153,14 +153,14 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kBlockDomainRegistrationResponseMessage);
+ proto_length,
+ kBlockDomainRegistrationResponseMessage);
free(proto_bytes);
- LOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent BlockDomainRegistrationResponseMessage (typed '"
- << kBlockDomainRegistrationResponseMessage
- << "') to Worker (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') to TMB Client (id '" << receiver << "')";
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
@@ -181,13 +181,13 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kLocateBlockResponseMessage);
+ proto_length,
+ kLocateBlockResponseMessage);
free(proto_bytes);
- LOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
- << "') to StorageManager (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
@@ -208,14 +208,14 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kGetPeerDomainNetworkAddressesResponseMessage);
+ proto_length,
+ kGetPeerDomainNetworkAddressesResponseMessage);
free(proto_bytes);
- LOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
- << kGetPeerDomainNetworkAddressesResponseMessage
- << "') to StorageManager (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+ << kGetPeerDomainNetworkAddressesResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 20650d0..5c7e0d8 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -176,8 +176,8 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
}
- LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
- << "') to all Shiftbosses";
+ DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+ << "') to all Shiftbosses";
QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
shiftboss_addresses,
move(tagged_msg),