You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/27 03:22:53 UTC
[02/50] [abbrv] incubator-quickstep git commit: Code review changes.
Code review changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/24c93ca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/24c93ca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/24c93ca5
Branch: refs/heads/query-manager-used-in-foreman
Commit: 24c93ca5a11e453d45821bbe64e7946ca02948e5
Parents: dd62b80
Author: Harshad Deshmukh <ha...@cs.wisc.edu>
Authored: Fri Apr 29 16:05:16 2016 -0500
Committer: Harshad Deshmukh <ha...@cs.wisc.edu>
Committed: Fri Apr 29 16:05:16 2016 -0500
----------------------------------------------------------------------
cli/QuickstepCli.cpp | 2 +-
cli/tests/CommandExecutorTestRunner.cpp | 2 +-
query_execution/AdmitRequestMessage.hpp | 3 +-
query_execution/CMakeLists.txt | 5 ++--
query_execution/Foreman.cpp | 42 ++++++++++++++++------------
query_execution/Foreman.hpp | 16 ++++-------
query_execution/PolicyEnforcer.cpp | 30 ++++++++------------
query_execution/QueryExecutionUtil.hpp | 3 +-
query_optimizer/ExecutionGenerator.cpp | 4 +--
storage/StorageManager.hpp | 4 +--
10 files changed, 51 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index e4a82a1..66e58fe 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -398,7 +398,7 @@ int main(int argc, char* argv[]) {
const AnnotatedMessage annotated_msg =
bus.Receive(main_thread_client_id, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
- DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
+ DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
end = std::chrono::steady_clock::now();
const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index e3ed5d6..f5c37c3 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -115,7 +115,7 @@ void CommandExecutorTestRunner::runTestCase(
const AnnotatedMessage annotated_msg =
bus_.Receive(main_thread_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
- DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
+ DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index 7534d92..e33b354 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -49,7 +49,7 @@ class AdmitRequestMessage {
*
* @param query_handle The handle of the query requesting to be admitted.
**/
- explicit AdmitRequestMessage(QueryHandle* query_handle) {
+ explicit AdmitRequestMessage(QueryHandle *query_handle) {
query_handles_.push_back(query_handle);
}
@@ -71,4 +71,3 @@ class AdmitRequestMessage {
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
-
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index fbfd09a..9658c08 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -42,11 +42,10 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag
add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
# Link dependencies:
-target_link_libraries(quickstep_queryexecution_ForemanLite
- quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_AdmitRequestMessage
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_Foreman
+ gflags_nothreads-static
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanLite
@@ -66,8 +65,8 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
tmb)
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
glog
- quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryManager
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index d1ecebd..3609120 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -31,6 +31,7 @@
#include "utility/EqualsAnyConstant.hpp"
#include "utility/Macros.hpp"
+#include "gflags/gflags.h"
#include "glog/logging.h"
#include "tmb/message_bus.h"
@@ -43,6 +44,10 @@ using std::vector;
namespace quickstep {
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
+ "of pending work orders for the worker. This information is used "
+ "by the Foreman to assign work orders to worker threads");
+
Foreman::Foreman(const tmb::client_id main_thread_client_id,
WorkerDirectory *worker_directory,
tmb::MessageBus *bus,
@@ -54,19 +59,18 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
main_thread_client_id_(main_thread_client_id),
worker_directory_(DCHECK_NOTNULL(worker_directory)),
catalog_database_(DCHECK_NOTNULL(catalog_database)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)),
- min_load_per_worker_(2) { // TODO(harshad) - Make this field configurable.
- std::vector<QueryExecutionMessageType> sender_message_types{
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ const std::vector<QueryExecutionMessageType> sender_message_types{
kPoisonMessage,
kRebuildWorkOrderMessage,
kWorkOrderMessage,
kWorkloadCompletionMessage};
- for (auto message_type : sender_message_types) {
+ for (const auto message_type : sender_message_types) {
bus_->RegisterClientAsSender(foreman_client_id_, message_type);
}
- std::vector<QueryExecutionMessageType> receiver_message_types{
+ const std::vector<QueryExecutionMessageType> receiver_message_types{
kAdmitRequestMessage,
kCatalogRelationNewBlockMessage,
kDataPipelineMessage,
@@ -76,9 +80,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
kWorkOrdersAvailableMessage,
kWorkOrderCompleteMessage};
- for (auto message_type : receiver_message_types) {
+ for (const auto message_type : receiver_message_types) {
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
+
policy_enforcer_.reset(new PolicyEnforcer(
foreman_client_id_,
num_numa_nodes,
@@ -96,12 +101,12 @@ void Foreman::run() {
// Event loop
for (;;) {
// Receive() causes this thread to sleep until next message is received.
- AnnotatedMessage annotated_msg =
+ const AnnotatedMessage annotated_msg =
bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
const tmb::message_type_id message_type = tagged_message.message_type();
switch (message_type) {
- case kCatalogRelationNewBlockMessage:
+ case kCatalogRelationNewBlockMessage: // Fall through
case kDataPipelineMessage:
case kRebuildWorkOrderCompleteMessage:
case kWorkOrderCompleteMessage:
@@ -139,11 +144,13 @@ void Foreman::run() {
default:
LOG(FATAL) << "Unknown message type to Foreman";
}
+
if (canCollectNewMessages(message_type)) {
vector<unique_ptr<WorkerMessage>> new_messages;
policy_enforcer_->getWorkerMessages(&new_messages);
- dispatchWorkerMessages(&new_messages);
+ dispatchWorkerMessages(new_messages);
}
+
// We check again, as some queries may produce zero work orders and finish
// their execution.
if (!policy_enforcer_->hasQueries()) {
@@ -171,7 +178,8 @@ bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
kCatalogRelationNewBlockMessage,
kWorkOrderFeedbackMessage)) {
return false;
- } else if (worker_directory_->getLeastLoadedWorker().second <= min_load_per_worker_) {
+ } else if (worker_directory_->getLeastLoadedWorker().second <=
+ FLAGS_min_load_per_worker) {
// If the least loaded worker has only one pending work order, we should
// collect new messages and dispatch them.
return true;
@@ -180,18 +188,16 @@ bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
}
}
-void Foreman::dispatchWorkerMessages(vector<unique_ptr<WorkerMessage>> *messages) {
- for (auto message_it = messages->begin();
- message_it != messages->end();
- ++message_it) {
- DCHECK(*message_it != nullptr);
- int recipient_worker_thread_index = (*message_it)->getRecipientHint();
+void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
+ for (auto const &message : messages) {
+ DCHECK(message != nullptr);
+ int recipient_worker_thread_index = message->getRecipientHint();
if (recipient_worker_thread_index != -1) {
sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
- *(*message_it));
+ *message);
} else {
sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first,
- *(*message_it));
+ *message);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 2dc96d5..5c4893d 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -26,11 +26,15 @@
#include "query_execution/PolicyEnforcer.hpp"
#include "utility/Macros.hpp"
+#include "gflags/gflags.h"
+
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
namespace quickstep {
+DECLARE_uint64(min_load_per_worker);
+
class CatalogDatabaseLite;
class StorageManager;
class WorkerDirectory;
@@ -71,13 +75,6 @@ class Foreman final : public ForemanLite {
~Foreman() override {}
- /**
- * @brief Get the Foreman's TMB client ID.
- **/
- tmb::client_id getBusClientID() const {
- return foreman_client_id_;
- }
-
protected:
/**
* @brief Run the event-based loop in the Foreman thread.
@@ -92,7 +89,7 @@ class Foreman final : public ForemanLite {
* @param messages The messages to be dispatched.
**/
void dispatchWorkerMessages(
- std::vector<std::unique_ptr<WorkerMessage>> *messages);
+ const std::vector<std::unique_ptr<WorkerMessage>> &messages);
/**
* @brief Send the given message to the specified worker.
@@ -120,9 +117,6 @@ class Foreman final : public ForemanLite {
std::unique_ptr<PolicyEnforcer> policy_enforcer_;
- // Every worker should have at least these many pending work orders.
- const std::size_t min_load_per_worker_;
-
DISALLOW_COPY_AND_ASSIGN(Foreman);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index c9d5189..1ee1df9 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -43,7 +43,7 @@ bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
catalog_database_, storage_manager_, bus_));
return true;
} else {
- LOG(ERROR) << "Query with same ID " << query_id << " exists";
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
return false;
}
} else {
@@ -59,13 +59,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
// TaggedMessage only once.
std::size_t query_id;
switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- query_id = proto.query_id();
- break;
- }
+ case kWorkOrderCompleteMessage: // Fall through.
case kRebuildWorkOrderCompleteMessage: {
serialization::WorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
@@ -104,7 +98,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
}
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
- QueryManager::QueryStatusCode return_code =
+ const QueryManager::QueryStatusCode return_code =
admitted_queries_[query_id]->processMessage(tagged_message);
if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
@@ -135,10 +129,8 @@ void PolicyEnforcer::getWorkerMessages(
DCHECK_GT(per_query_share, 0u);
std::vector<std::size_t> finished_queries_ids;
- for (auto query_manager_it = admitted_queries_.begin();
- query_manager_it != admitted_queries_.end();
- ++query_manager_it) {
- QueryManager *curr_query_manager = query_manager_it->second.get();
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManager *curr_query_manager = admitted_query_info.second.get();
DCHECK(curr_query_manager != nullptr);
std::size_t messages_collected_curr_query = 0;
while (messages_collected_curr_query < per_query_share) {
@@ -152,7 +144,7 @@ void PolicyEnforcer::getWorkerMessages(
// Check if the query's execution is over.
if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
// If the query has been executed, remove it.
- finished_queries_ids.push_back(query_manager_it->first);
+ finished_queries_ids.push_back(admitted_query_info.first);
}
break;
}
@@ -169,17 +161,17 @@ void PolicyEnforcer::removeQuery(const std::size_t query_id) {
LOG(WARNING) << "Removing query with ID " << query_id
<< " that hasn't finished its execution";
}
- admitted_queries_[query_id].reset(nullptr);
admitted_queries_.erase(query_id);
}
bool PolicyEnforcer::admitQueries(
const std::vector<QueryHandle *> &query_handles) {
- bool result = true;
- for (QueryHandle* curr_query : query_handles) {
- result = result && admitQuery(curr_query);
+ for (QueryHandle *curr_query : query_handles) {
+ if (!admitQuery(curr_query)) {
+ return false;
+ }
}
- return result;
+ return true;
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index e5a590b..0cd0a4e 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -79,7 +79,8 @@ class QueryExecutionUtil {
const tmb::MessageBus::SendStatus send_status = bus->Send(
sender_id, address, style, std::move(poison_tagged_message));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
- "Broadcast message from Foreman to workers failed";
+ "Broadcast poison message from sender with TMB client ID " << sender_id
+ << " failed";
}
private:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index b260e37..dd4e678 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1528,9 +1528,9 @@ void ExecutionGenerator::convertTableGenerator(
}
S::InsertDestination* ExecutionGenerator::addNewInsertDestinationToQueryContext() {
- std::unique_ptr<S::InsertDestination> insert_destination_proto(query_context_proto_->add_insert_destinations());
+ S::InsertDestination *insert_destination_proto(query_context_proto_->add_insert_destinations());
insert_destination_proto->set_query_id(query_context_proto_->query_id());
- return insert_destination_proto.release();
+ return insert_destination_proto;
}
} // namespace optimizer
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index dd67177..624e060 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -381,12 +381,12 @@ class StorageManager {
/**
* @brief Save a block or blob in memory to the persistent storage.
- *
+ *
* @param block The id of the block or blob to save.
* @param force Force the block to the persistent storage, even if it is not
* dirty (by default, only actually write dirty blocks to the
* persistent storage).
- *
+ *
* @return False if the block is not found in the memory. True if the block is
* successfully saved to the persistent storage OR the block is clean
* and force is false.