You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/27 03:22:53 UTC

[02/50] [abbrv] incubator-quickstep git commit: Code review changes.

Code review changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/24c93ca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/24c93ca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/24c93ca5

Branch: refs/heads/query-manager-used-in-foreman
Commit: 24c93ca5a11e453d45821bbe64e7946ca02948e5
Parents: dd62b80
Author: Harshad Deshmukh <ha...@cs.wisc.edu>
Authored: Fri Apr 29 16:05:16 2016 -0500
Committer: Harshad Deshmukh <ha...@cs.wisc.edu>
Committed: Fri Apr 29 16:05:16 2016 -0500

----------------------------------------------------------------------
 cli/QuickstepCli.cpp                    |  2 +-
 cli/tests/CommandExecutorTestRunner.cpp |  2 +-
 query_execution/AdmitRequestMessage.hpp |  3 +-
 query_execution/CMakeLists.txt          |  5 ++--
 query_execution/Foreman.cpp             | 42 ++++++++++++++++------------
 query_execution/Foreman.hpp             | 16 ++++-------
 query_execution/PolicyEnforcer.cpp      | 30 ++++++++------------
 query_execution/QueryExecutionUtil.hpp  |  3 +-
 query_optimizer/ExecutionGenerator.cpp  |  4 +--
 storage/StorageManager.hpp              |  4 +--
 10 files changed, 51 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index e4a82a1..66e58fe 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -398,7 +398,7 @@ int main(int argc, char* argv[]) {
           const AnnotatedMessage annotated_msg =
               bus.Receive(main_thread_client_id, 0, true);
           const TaggedMessage &tagged_message = annotated_msg.tagged_message;
-          DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
+          DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
           end = std::chrono::steady_clock::now();
 
           const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index e3ed5d6..f5c37c3 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -115,7 +115,7 @@ void CommandExecutorTestRunner::runTestCase(
           const AnnotatedMessage annotated_msg =
               bus_.Receive(main_thread_client_id_, 0, true);
           const TaggedMessage &tagged_message = annotated_msg.tagged_message;
-          DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
+          DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
           const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
           if (query_result_relation) {
             PrintToScreen::PrintRelation(*query_result_relation,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index 7534d92..e33b354 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -49,7 +49,7 @@ class AdmitRequestMessage {
    *
    * @param query_handle The handle of the query requesting to be admitted.
    **/
-  explicit AdmitRequestMessage(QueryHandle* query_handle) {
+  explicit AdmitRequestMessage(QueryHandle *query_handle) {
     query_handles_.push_back(query_handle);
   }
 
@@ -71,4 +71,3 @@ class AdmitRequestMessage {
 }  // namespace quickstep
 
 #endif  // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
-

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index fbfd09a..9658c08 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -42,11 +42,10 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag
 add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
 
 # Link dependencies:
-target_link_libraries(quickstep_queryexecution_ForemanLite
-                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_AdmitRequestMessage
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_Foreman
+                      gflags_nothreads-static
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanLite
@@ -66,8 +65,8 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
                       tmb)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       glog
-                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryManager
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index d1ecebd..3609120 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -31,6 +31,7 @@
 #include "utility/EqualsAnyConstant.hpp"
 #include "utility/Macros.hpp"
 
+#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 #include "tmb/message_bus.h"
@@ -43,6 +44,10 @@ using std::vector;
 
 namespace quickstep {
 
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
+              "of pending work orders for the worker. This information is used "
+              "by the Foreman to assign work orders to worker threads");
+
 Foreman::Foreman(const tmb::client_id main_thread_client_id,
                  WorkerDirectory *worker_directory,
                  tmb::MessageBus *bus,
@@ -54,19 +59,18 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       main_thread_client_id_(main_thread_client_id),
       worker_directory_(DCHECK_NOTNULL(worker_directory)),
       catalog_database_(DCHECK_NOTNULL(catalog_database)),
-      storage_manager_(DCHECK_NOTNULL(storage_manager)),
-      min_load_per_worker_(2) {  // TODO(harshad) - Make this field configurable.
-  std::vector<QueryExecutionMessageType> sender_message_types{
+      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+  const std::vector<QueryExecutionMessageType> sender_message_types{
       kPoisonMessage,
       kRebuildWorkOrderMessage,
       kWorkOrderMessage,
       kWorkloadCompletionMessage};
 
-  for (auto message_type : sender_message_types) {
+  for (const auto message_type : sender_message_types) {
     bus_->RegisterClientAsSender(foreman_client_id_, message_type);
   }
 
-  std::vector<QueryExecutionMessageType> receiver_message_types{
+  const std::vector<QueryExecutionMessageType> receiver_message_types{
       kAdmitRequestMessage,
       kCatalogRelationNewBlockMessage,
       kDataPipelineMessage,
@@ -76,9 +80,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       kWorkOrdersAvailableMessage,
       kWorkOrderCompleteMessage};
 
-  for (auto message_type : receiver_message_types) {
+  for (const auto message_type : receiver_message_types) {
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
+
   policy_enforcer_.reset(new PolicyEnforcer(
       foreman_client_id_,
       num_numa_nodes,
@@ -96,12 +101,12 @@ void Foreman::run() {
   // Event loop
   for (;;) {
     // Receive() causes this thread to sleep until next message is received.
-    AnnotatedMessage annotated_msg =
+    const AnnotatedMessage annotated_msg =
         bus_->Receive(foreman_client_id_, 0, true);
     const TaggedMessage &tagged_message = annotated_msg.tagged_message;
     const tmb::message_type_id message_type = tagged_message.message_type();
     switch (message_type) {
-      case kCatalogRelationNewBlockMessage:
+      case kCatalogRelationNewBlockMessage:  // Fall through
       case kDataPipelineMessage:
       case kRebuildWorkOrderCompleteMessage:
       case kWorkOrderCompleteMessage:
@@ -139,11 +144,13 @@ void Foreman::run() {
       default:
         LOG(FATAL) << "Unknown message type to Foreman";
     }
+
     if (canCollectNewMessages(message_type)) {
       vector<unique_ptr<WorkerMessage>> new_messages;
       policy_enforcer_->getWorkerMessages(&new_messages);
-      dispatchWorkerMessages(&new_messages);
+      dispatchWorkerMessages(new_messages);
     }
+
     // We check again, as some queries may produce zero work orders and finish
     // their execution.
     if (!policy_enforcer_->hasQueries()) {
@@ -171,7 +178,8 @@ bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
                                     kCatalogRelationNewBlockMessage,
                                     kWorkOrderFeedbackMessage)) {
     return false;
-  } else if (worker_directory_->getLeastLoadedWorker().second <= min_load_per_worker_) {
+  } else if (worker_directory_->getLeastLoadedWorker().second <=
+             FLAGS_min_load_per_worker) {
     // If the least loaded worker has only one pending work order, we should
     // collect new messages and dispatch them.
     return true;
@@ -180,18 +188,16 @@ bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
   }
 }
 
-void Foreman::dispatchWorkerMessages(vector<unique_ptr<WorkerMessage>> *messages) {
-  for (auto message_it = messages->begin();
-       message_it != messages->end();
-       ++message_it) {
-    DCHECK(*message_it != nullptr);
-    int recipient_worker_thread_index = (*message_it)->getRecipientHint();
+void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
+  for (auto const &message : messages) {
+    DCHECK(message != nullptr);
+    int recipient_worker_thread_index = message->getRecipientHint();
     if (recipient_worker_thread_index != -1) {
       sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
-                        *(*message_it));
+                        *message);
     } else {
       sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first,
-                        *(*message_it));
+                        *message);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 2dc96d5..5c4893d 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -26,11 +26,15 @@
 #include "query_execution/PolicyEnforcer.hpp"
 #include "utility/Macros.hpp"
 
+#include "gflags/gflags.h"
+
 #include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 
 namespace quickstep {
 
+DECLARE_uint64(min_load_per_worker);
+
 class CatalogDatabaseLite;
 class StorageManager;
 class WorkerDirectory;
@@ -71,13 +75,6 @@ class Foreman final : public ForemanLite {
 
   ~Foreman() override {}
 
-  /**
-   * @brief Get the Foreman's TMB client ID.
-   **/
-  tmb::client_id getBusClientID() const {
-    return foreman_client_id_;
-  }
-
  protected:
   /**
    * @brief Run the event-based loop in the Foreman thread.
@@ -92,7 +89,7 @@ class Foreman final : public ForemanLite {
    * @param messages The messages to be dispatched.
    **/
   void dispatchWorkerMessages(
-      std::vector<std::unique_ptr<WorkerMessage>> *messages);
+      const std::vector<std::unique_ptr<WorkerMessage>> &messages);
 
   /**
    * @brief Send the given message to the specified worker.
@@ -120,9 +117,6 @@ class Foreman final : public ForemanLite {
 
   std::unique_ptr<PolicyEnforcer> policy_enforcer_;
 
-  // Every worker should have at least these many pending work orders.
-  const std::size_t min_load_per_worker_;
-
   DISALLOW_COPY_AND_ASSIGN(Foreman);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index c9d5189..1ee1df9 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -43,7 +43,7 @@ bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
                            catalog_database_, storage_manager_, bus_));
       return true;
     } else {
-      LOG(ERROR) << "Query with same ID " << query_id << " exists";
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
       return false;
     }
   } else {
@@ -59,13 +59,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
   // TaggedMessage only once.
   std::size_t query_id;
   switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      break;
-    }
+    case kWorkOrderCompleteMessage:  // Fall through.
     case kRebuildWorkOrderCompleteMessage: {
       serialization::WorkOrderCompletionMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
@@ -104,7 +98,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
   }
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  QueryManager::QueryStatusCode return_code =
+  const QueryManager::QueryStatusCode return_code =
       admitted_queries_[query_id]->processMessage(tagged_message);
   if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
@@ -135,10 +129,8 @@ void PolicyEnforcer::getWorkerMessages(
   DCHECK_GT(per_query_share, 0u);
   std::vector<std::size_t> finished_queries_ids;
 
-  for (auto query_manager_it = admitted_queries_.begin();
-       query_manager_it != admitted_queries_.end();
-       ++query_manager_it) {
-    QueryManager *curr_query_manager = query_manager_it->second.get();
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManager *curr_query_manager = admitted_query_info.second.get();
     DCHECK(curr_query_manager != nullptr);
     std::size_t messages_collected_curr_query = 0;
     while (messages_collected_curr_query < per_query_share) {
@@ -152,7 +144,7 @@ void PolicyEnforcer::getWorkerMessages(
         // Check if the query's execution is over.
         if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
           // If the query has been executed, remove it.
-          finished_queries_ids.push_back(query_manager_it->first);
+          finished_queries_ids.push_back(admitted_query_info.first);
         }
         break;
       }
@@ -169,17 +161,17 @@ void PolicyEnforcer::removeQuery(const std::size_t query_id) {
     LOG(WARNING) << "Removing query with ID " << query_id
                  << " that hasn't finished its execution";
   }
-  admitted_queries_[query_id].reset(nullptr);
   admitted_queries_.erase(query_id);
 }
 
 bool PolicyEnforcer::admitQueries(
     const std::vector<QueryHandle *> &query_handles) {
-  bool result = true;
-  for (QueryHandle* curr_query : query_handles) {
-    result = result && admitQuery(curr_query);
+  for (QueryHandle *curr_query : query_handles) {
+    if (!admitQuery(curr_query)) {
+      return false;
+    }
   }
-  return result;
+  return true;
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index e5a590b..0cd0a4e 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -79,7 +79,8 @@ class QueryExecutionUtil {
     const tmb::MessageBus::SendStatus send_status = bus->Send(
         sender_id, address, style, std::move(poison_tagged_message));
     CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-       "Broadcast message from Foreman to workers failed";
+       "Broadcast poison message from sender with TMB client ID " << sender_id
+       << " failed";
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index b260e37..dd4e678 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1528,9 +1528,9 @@ void ExecutionGenerator::convertTableGenerator(
 }
 
 S::InsertDestination* ExecutionGenerator::addNewInsertDestinationToQueryContext() {
-  std::unique_ptr<S::InsertDestination> insert_destination_proto(query_context_proto_->add_insert_destinations());
+  S::InsertDestination *insert_destination_proto(query_context_proto_->add_insert_destinations());
   insert_destination_proto->set_query_id(query_context_proto_->query_id());
-  return insert_destination_proto.release();
+  return insert_destination_proto;
 }
 
 }  // namespace optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24c93ca5/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index dd67177..624e060 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -381,12 +381,12 @@ class StorageManager {
 
   /**
    * @brief Save a block or blob in memory to the persistent storage.
-   * 
+   *
    * @param block The id of the block or blob to save.
    * @param force Force the block to the persistent storage, even if it is not
    *        dirty (by default, only actually write dirty blocks to the
    *        persistent storage).
-   * 
+   *
    * @return False if the block is not found in the memory. True if the block is
    *         successfully saved to the persistent storage OR the block is clean
    *         and force is false.