You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/12/05 04:42:51 UTC
incubator-quickstep git commit: Scheduling based on data locality
info. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 982cc0653 -> 137daf9aa (forced update)
Scheduling based on data locality info.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/137daf9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/137daf9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/137daf9a
Branch: refs/heads/master
Commit: 137daf9aa02e3fe6d9585828bbab2face3c393ad
Parents: 0859a17
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 20:41:10 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 20:41:10 2016 -0800
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 7 +++
query_execution/BlockLocator.hpp | 33 +++++++++-
query_execution/CMakeLists.txt | 2 +
query_execution/ForemanDistributed.cpp | 65 +++++++++++++++++++-
query_execution/ForemanDistributed.hpp | 27 +++++++-
query_execution/QueryExecutionMessages.proto | 6 ++
query_execution/QueryExecutionTypedefs.hpp | 1 +
query_execution/Shiftboss.cpp | 1 +
.../DistributedExecutionGeneratorTestRunner.cpp | 4 +-
storage/StorageManager.cpp | 29 +++++++++
storage/StorageManager.hpp | 9 +++
11 files changed, 178 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 5de6a54..e0fc47a 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -65,6 +65,13 @@ void BlockLocator::run() {
processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
break;
}
+ case kBlockDomainToShiftbossIndexMessage: {
+ serialization::BlockDomainToShiftbossIndexMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ block_domain_to_shiftboss_index_.emplace(proto.block_domain(), proto.shiftboss_index());
+ break;
+ }
case kAddBlockLocationMessage: {
serialization::BlockLocationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index a83a394..d5753a8 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -21,6 +21,7 @@
#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
#include <atomic>
+#include <cstddef>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -48,6 +49,9 @@ namespace quickstep {
**/
class BlockLocator : public Thread {
public:
+ typedef std::unordered_map<block_id_domain, std::size_t> BlockDomainToShiftbossIndex;
+ typedef std::unordered_map<block_id, std::unordered_set<block_id_domain>> BlockLocation;
+
/**
* @brief Constructor.
*
@@ -67,6 +71,8 @@ class BlockLocator : public Thread {
bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainToShiftbossIndexMessage);
+
bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
@@ -91,6 +97,24 @@ class BlockLocator : public Thread {
return locator_client_id_;
}
+ /**
+ * @brief Get the mapping from a block domain to its Shiftboss index.
+ *
+ * @return The mapping from a block domain to its Shiftboss index.
+ **/
+ const BlockDomainToShiftbossIndex& block_domain_to_shiftboss_index() const {
+ return block_domain_to_shiftboss_index_;
+ }
+
+ /**
+ * @brief Get the mapping from a block id to its loaded block domain.
+ *
+ * @return The mapping from a block id to its loaded block domain.
+ **/
+ const BlockLocation& block_locations() const {
+ return block_locations_;
+ }
+
protected:
void run() override;
@@ -110,8 +134,15 @@ class BlockLocator : public Thread {
// "0.0.0.0:0".
std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
+ // From a block domain to its Shiftboss index, used by ForemanDistributed
+ // to schedule based on the data-locality info.
+ // Note that not every 'block_id_domain' has a Shiftboss index. For example,
+ // DistributedCli has StorageManager with a 'block_id_domain', which is not
+ // a part of Shiftboss.
+ BlockDomainToShiftbossIndex block_domain_to_shiftboss_index_;
+
// From a block to its domains.
- std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+ BlockLocation block_locations_;
// From a block domain to all blocks loaded in its buffer pool.
std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 1f7add8..4181f4a 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -105,6 +105,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_BlockLocator
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerDistributed
@@ -114,6 +115,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_ShiftbossDirectory
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_StorageBlockInfo
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 0dad8b0..db82908 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/QueryContext.hpp"
@@ -36,6 +37,7 @@
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
#include "threading/ThreadUtil.hpp"
#include "utility/EqualsAnyConstant.hpp"
@@ -64,10 +66,14 @@ namespace S = serialization;
class QueryHandle;
ForemanDistributed::ForemanDistributed(
+ const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+ const BlockLocator::BlockLocation &block_locations,
MessageBus *bus,
CatalogDatabaseLite *catalog_database,
const int cpu_id)
: ForemanBase(bus, cpu_id),
+ block_domain_to_shiftboss_index_(block_domain_to_shiftboss_index),
+ block_locations_(block_locations),
catalog_database_(DCHECK_NOTNULL(catalog_database)) {
const std::vector<QueryExecutionMessageType> sender_message_types{
kShiftbossRegistrationResponseMessage,
@@ -295,6 +301,62 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
return true;
}
+bool ForemanDistributed::isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+ std::size_t *shiftboss_index_for_join) {
+ if (work_order_proto.work_order_type() != S::NESTED_LOOP_JOIN) {
+ return false;
+ }
+
+ const block_id left_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id);
+ if (hasBlockLocalityInfoHelper(left_block, shiftboss_index_for_join)) {
+ return true;
+ }
+
+ const block_id right_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id);
+ return hasBlockLocalityInfoHelper(right_block, shiftboss_index_for_join);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfo(const S::WorkOrder &work_order_proto,
+ size_t *shiftboss_index_for_block) {
+ block_id block = kInvalidBlockId;
+ switch (work_order_proto.work_order_type()) {
+ case S::SAVE_BLOCKS: {
+ block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
+ break;
+ }
+ case S::SELECT: {
+ block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+ break;
+ }
+ case S::SORT_RUN_GENERATION: {
+ block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
+ break;
+ }
+ default:
+ return false;
+ }
+
+ DCHECK_NE(block, kInvalidBlockId);
+ return hasBlockLocalityInfoHelper(block, shiftboss_index_for_block);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfoHelper(const block_id block,
+ size_t *shiftboss_index_for_block) {
+ const auto cit = block_locations_.find(block);
+ if (cit != block_locations_.end()) {
+ for (const block_id_domain block_domain : cit->second) {
+ // TODO(quickstep-team): choose the best node, instead of the first.
+ const auto cit_shiftboss_index = block_domain_to_shiftboss_index_.find(block_domain);
+ if (cit_shiftboss_index != block_domain_to_shiftboss_index_.end()) {
+ *shiftboss_index_for_block = cit_shiftboss_index->second;
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
namespace {
constexpr size_t kDefaultShiftbossIndex = 0u;
} // namespace
@@ -312,8 +374,9 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
} else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ } else if (hasBlockLocalityInfo(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
+ } else if (isNestedLoopsJoinWorkOrder(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
} else {
- // TODO(zuyu): Take data-locality into account for scheduling.
shiftboss_index_for_particular_work_order_type = shiftboss_index;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 34bac07..92d1e98 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -23,8 +23,10 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
@@ -51,6 +53,9 @@ class ForemanDistributed final : public ForemanBase {
/**
* @brief Constructor.
*
+ * @param block_domain_to_shiftboss_index A mapping from a block domain to
+ * its Shiftboss index in BlockLocator.
+ * @param block_locations Block locality info in BlockLocator.
* @param bus A pointer to the TMB.
* @param catalog_database The catalog database where this query is executed.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
@@ -58,9 +63,12 @@ class ForemanDistributed final : public ForemanBase {
* @note If cpu_id is not specified, Foreman thread can be possibly moved
* around on different CPUs by the OS.
**/
- ForemanDistributed(tmb::MessageBus *bus,
- CatalogDatabaseLite *catalog_database,
- const int cpu_id = -1);
+ ForemanDistributed(
+ const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+ const BlockLocator::BlockLocation &block_locations,
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id = -1);
~ForemanDistributed() override {}
@@ -79,6 +87,15 @@ class ForemanDistributed final : public ForemanBase {
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index_for_hash_join);
+ bool isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+ std::size_t *shiftboss_index_for_join);
+
+ bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
+ std::size_t *shiftboss_index_for_block);
+
+ bool hasBlockLocalityInfoHelper(const block_id block,
+ std::size_t *shiftboss_index_for_block);
+
/**
* @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
* worker threads.
@@ -111,6 +128,10 @@ class ForemanDistributed final : public ForemanBase {
**/
bool canCollectNewMessages(const tmb::message_type_id message_type);
+ // Block locality info for scheduling.
+ const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index_;
+ const BlockLocator::BlockLocation &block_locations_;
+
ShiftbossDirectory shiftboss_directory_;
CatalogDatabaseLite *catalog_database_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e6d741a..93e458c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -142,6 +142,12 @@ message BlockDomainMessage {
required uint32 block_domain = 1;
}
+// Used for the block locality based scheduling in ForemanDistributed.
+message BlockDomainToShiftbossIndexMessage {
+ required uint32 block_domain = 1;
+ required uint64 shiftboss_index = 2;
+}
+
// Used when StorageManager loads or evicts a block or a blob from its buffer
// pool.
message BlockLocationMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fb9a9d6..919e45b 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -99,6 +99,7 @@ enum QueryExecutionMessageType : message_type_id {
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker.
+ kBlockDomainToShiftbossIndexMessage, // From StorageManager to BlockLocator.
kAddBlockLocationMessage, // From StorageManager to BlockLocator.
kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
kLocateBlockMessage, // From StorageManager to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ed4bade..2ed42d0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -286,6 +286,7 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
shiftboss_index_ = proto.shiftboss_index();
+ storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
// Forward this message to Workers regarding <shiftboss_index_>.
QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5100651..7a0e720 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -98,7 +98,9 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(block_locator_->block_domain_to_shiftboss_index(),
+ block_locator_->block_locations(),
+ &bus_, test_database_loader_->catalog_database());
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index a202f71..6299cda 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -230,6 +230,8 @@ StorageManager::StorageManager(
bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainToShiftbossIndexMessage);
+
bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
@@ -470,6 +472,33 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
}
#ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index) {
+ serialization::BlockDomainToShiftbossIndexMessage proto;
+ proto.set_block_domain(block_domain_);
+ proto.set_shiftboss_index(shiftboss_index);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainToShiftbossIndexMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockDomainToShiftbossIndexMessage (typed '" << kBlockDomainToShiftbossIndexMessage
+ << "') to BlockLocator";
+
+ DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+ DCHECK(bus_ != nullptr);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+}
+
void StorageManager::pullBlockOrBlob(const block_id block,
PullResponse *response) const {
SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 066953b..b61f10a 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -382,6 +382,15 @@ class StorageManager {
#ifdef QUICKSTEP_DISTRIBUTED
/**
+ * @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
+ * ForemanDistributed could take advantages of block locality info
+ * for a better scheduling policy.
+ *
+ * @param shiftboss_index The Shiftboss index.
+ **/
+ void sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index);
+
+ /**
* @brief Pull a block or a blob. Used by DataExchangerAsync.
*
* @param block The id of the block or blob.