You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/28 19:34:15 UTC
[02/11] incubator-quickstep git commit: Initiated query execution
data structure in all Shiftbosses.
Initiated query execution data structure in all Shiftbosses.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a58a7475
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a58a7475
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a58a7475
Branch: refs/heads/exact-filter
Commit: a58a74751ac3b0caf55aae09c13161dd375f4147
Parents: a11cdfd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Oct 9 18:44:00 2016 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Oct 21 08:06:59 2016 -0700
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 15 ++++-
query_execution/ForemanDistributed.hpp | 5 ++
query_execution/PolicyEnforcerBase.cpp | 2 +
query_execution/PolicyEnforcerBase.hpp | 15 +++++
query_execution/PolicyEnforcerDistributed.cpp | 67 +++++++++-----------
query_execution/QueryExecutionMessages.proto | 6 +-
query_execution/QueryExecutionUtil.hpp | 13 ++++
query_execution/Shiftboss.cpp | 2 +
.../DistributedExecutionGeneratorTestRunner.hpp | 2 +-
9 files changed, 86 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 9c20465..56b319b 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -18,6 +18,8 @@
#include <cstdio>
#include <cstdlib>
#include <memory>
+#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -163,7 +165,9 @@ void ForemanDistributed::run() {
break;
}
case kQueryInitiateResponseMessage: {
- // TODO(zuyu): check the query id.
+ S::QueryInitiateResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ CHECK(policy_enforcer_->existQuery(proto.query_id()));
break;
}
case kCatalogRelationNewBlockMessage: // Fall through
@@ -183,7 +187,14 @@ void ForemanDistributed::run() {
S::SaveQueryResultResponseMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
- processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+ const std::size_t query_id = proto.query_id();
+ query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index());
+
+ // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+ if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) {
+ processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+ query_result_saved_shiftbosses_.erase(query_id);
+ }
break;
}
case kPoisonMessage: {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index fc1ede5..b42795c 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -18,6 +18,8 @@
#include <cstddef>
#include <cstdio>
#include <memory>
+#include <unordered_map>
+#include <unordered_set>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -120,6 +122,9 @@ class ForemanDistributed final : public ForemanBase {
std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
+ // From a query id to a set of Shiftbosses that save query result.
+ std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
+
DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 4174bd6..745ded6 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -142,6 +142,8 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
<< " that hasn't finished its execution";
}
admitted_queries_.erase(query_id);
+
+ removed_query_ids_.insert(query_id);
}
bool PolicyEnforcerBase::admitQueries(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 62906e9..ea2c06f 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -24,6 +24,7 @@
#include <memory>
#include <queue>
#include <unordered_map>
+#include <unordered_set>
#include <vector>
#include "query_execution/QueryExecutionTypedefs.hpp"
@@ -106,6 +107,16 @@ class PolicyEnforcerBase {
void processMessage(const TaggedMessage &tagged_message);
/**
+ * @brief Check if the given query id ever exists.
+ *
+ * @return True if the query ever exists, otherwise false.
+ **/
+ inline bool existQuery(const std::size_t query_id) const {
+ return admitted_queries_.find(query_id) != admitted_queries_.end() ||
+ removed_query_ids_.find(query_id) != removed_query_ids_.end();
+ }
+
+ /**
* @brief Check if there are any queries to be executed.
*
* @return True if there is at least one active or waiting query, false if
@@ -163,6 +174,10 @@ class PolicyEnforcerBase {
// Key = query ID, value = QueryManagerBase* for the key query.
std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
+ // TODO(quickstep-team): Delete a 'query_id' after receiving all
+ // 'QueryInitiateResponseMessage's for the 'query_id'.
+ std::unordered_set<std::size_t> removed_query_ids_;
+
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 47491ed..c06fd86 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -37,6 +37,7 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#include "tmb/address.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
@@ -170,25 +171,18 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
kQueryInitiateMessage);
free(proto_bytes);
- // TODO(zuyu): Multiple Shiftbosses support.
+ // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+ tmb::Address shiftboss_addresses;
+ for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+ shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+ }
+
DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
- << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- shiftboss_directory_->getClientId(0),
- move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-
- // Wait Shiftboss for QueryInitiateResponseMessage.
- const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
- DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
- << "' message from client " << annotated_message.sender;
-
- S::QueryInitiateResponseMessage proto_response;
- CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ << "') to all Shiftbosses";
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+ shiftboss_addresses,
+ move(message),
+ bus_);
}
void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
@@ -198,8 +192,14 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
const tmb::client_id cli_id = query_handle->getClientId();
const std::size_t query_id = query_handle->query_id();
+ // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+ tmb::Address shiftboss_addresses;
+ for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+ shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+ }
+
if (query_result == nullptr) {
- // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+ // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
serialization::QueryTeardownMessage proto;
proto.set_query_id(query_id);
@@ -211,15 +211,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
proto_length,
kQueryTeardownMessage);
- // TODO(zuyu): Support multiple shiftbosses.
DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
- << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
- tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- shiftboss_directory_->getClientId(0),
- move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ << "') to all Shiftbosses";
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+ shiftboss_addresses,
+ move(message),
+ bus_);
TaggedMessage cli_message(kQueryExecutionSuccessMessage);
@@ -227,7 +224,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
<< kQueryExecutionSuccessMessage
<< "') to CLI with TMB client id " << cli_id;
- send_status =
+ const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
cli_id,
@@ -257,15 +254,13 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
kSaveQueryResultMessage);
free(proto_bytes);
- // TODO(zuyu): Support multiple shiftbosses.
+ // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
- << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- shiftboss_directory_->getClientId(0),
- move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ << "') to all Shiftbosses";
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+ shiftboss_addresses,
+ move(message),
+ bus_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 060efa1..1a2cb78 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -128,8 +128,10 @@ message SaveQueryResultMessage {
}
message SaveQueryResultResponseMessage {
- required int32 relation_id = 1;
- required uint32 cli_id = 2; // tmb::client_id.
+ required uint64 query_id = 1;
+ required int32 relation_id = 2;
+ required uint32 cli_id = 3; // tmb::client_id.
+ required uint64 shiftboss_index = 4;
}
message QueryExecutionSuccessMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 7a3a3b3..b41965c 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -121,6 +121,19 @@ class QueryExecutionUtil {
DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
}
+ static void BroadcastMessage(const tmb::client_id sender_id,
+ const tmb::Address &addresses,
+ tmb::TaggedMessage &&tagged_message, // NOLINT(whitespace/operators)
+ tmb::MessageBus *bus) {
+ // The sender broadcasts the given message to all 'addresses'.
+ tmb::MessageStyle style;
+ style.Broadcast(true);
+
+ const tmb::MessageBus::SendStatus send_status =
+ bus->Send(sender_id, addresses, style, std::move(tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ }
+
static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
// Terminate all threads.
// The sender thread broadcasts poison message to the workers and foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 5c2c5e0..a434527 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -189,8 +189,10 @@ void Shiftboss::run() {
query_contexts_.erase(proto.query_id());
serialization::SaveQueryResultResponseMessage proto_response;
+ proto_response.set_query_id(proto.query_id());
proto_response.set_relation_id(proto.relation_id());
proto_response.set_cli_id(proto.cli_id());
+ proto_response.set_shiftboss_index(shiftboss_index_);
const size_t proto_response_length = proto_response.ByteSize();
char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a58a7475/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index e4d0765..ab10841 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -49,7 +49,7 @@ namespace quickstep {
namespace optimizer {
namespace {
-constexpr int kNumInstances = 1;
+constexpr int kNumInstances = 3;
} // namespace
/**