You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/03 01:14:56 UTC

incubator-quickstep git commit: Refactored catalog saving in the distributed version.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 964a80649 -> ccb2852f7


Refactored catalog saving in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccb2852f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccb2852f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccb2852f

Branch: refs/heads/master
Commit: ccb2852f71da77d364d4bfcb276cb6318b751a8c
Parents: 964a806
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 17:14:50 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 17:14:50 2017 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                         |  1 +
 query_execution/ForemanDistributed.cpp                 |  5 ++---
 query_execution/ForemanDistributed.hpp                 |  6 +++---
 query_execution/PolicyEnforcerDistributed.cpp          |  5 ++++-
 query_execution/PolicyEnforcerDistributed.hpp          | 13 ++++++-------
 .../tests/DistributedExecutionGeneratorTestRunner.cpp  |  6 ++----
 6 files changed, 18 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 12d6be0..23b706f 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -165,6 +165,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_QueryProcessor
                         quickstep_storage_StorageBlockInfo
                         quickstep_utility_ExecutionDAGVisualizer
                         quickstep_utility_Macros

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 389d6ab..57f432f 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -17,7 +17,6 @@
 #include <cstddef>
 #include <cstdio>
 #include <cstdlib>
-#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <unordered_set>
@@ -68,9 +67,9 @@ class QueryHandle;
 
 ForemanDistributed::ForemanDistributed(
     const BlockLocator &block_locator,
-    std::function<void()> &&save_catalog_callback,
     MessageBus *bus,
     CatalogDatabaseLite *catalog_database,
+    QueryProcessor *query_processor,
     const int cpu_id)
     : ForemanBase(bus, cpu_id),
       block_locator_(block_locator),
@@ -108,8 +107,8 @@ ForemanDistributed::ForemanDistributed(
 
   policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
       foreman_client_id_,
-      move(save_catalog_callback),
       catalog_database_,
+      query_processor,
       &shiftboss_directory_,
       bus_);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 5f1a14b..7fc98bd 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -17,7 +17,6 @@
 
 #include <cstddef>
 #include <cstdio>
-#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <unordered_set>
@@ -36,6 +35,7 @@ namespace quickstep {
 
 class BlockLocator;
 class CatalogDatabaseLite;
+class QueryProcessor;
 
 namespace serialization { class WorkOrderMessage; }
 
@@ -56,7 +56,7 @@ class ForemanDistributed final : public ForemanBase {
    * @param block_locator The block locator that manages block location info.
    * @param bus A pointer to the TMB.
    * @param catalog_database The catalog database where this query is executed.
-   * @param save_catalog_callback The callback used to save catalog upon the query
+   * @param query_processor The QueryProcessor to save catalog upon the query
    *        completion.
    * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
    *
@@ -65,9 +65,9 @@ class ForemanDistributed final : public ForemanBase {
   **/
   ForemanDistributed(
       const BlockLocator &block_locator,
-      std::function<void()> &&save_catalog_callback,
       tmb::MessageBus *bus,
       CatalogDatabaseLite *catalog_database,
+      QueryProcessor *query_processor,
       const int cpu_id = -1);
 
   ~ForemanDistributed() override {}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6ee58a8..25f2d72 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -35,6 +35,7 @@
 #include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/QueryManagerDistributed.hpp"
 #include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/ExecutionDAGVisualizer.hpp"
 
@@ -259,7 +260,9 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   }
 
   if (query_result == nullptr) {
-    save_catalog_callback_();
+    if (query_processor_) {
+      query_processor_->saveCatalog();
+    }
 
     // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
     serialization::QueryTeardownMessage proto;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index f8476c8..18fd9ae 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -16,7 +16,6 @@
 #define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
 
 #include <cstddef>
-#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <utility>
@@ -43,6 +42,7 @@ class TaggedMessage;
 namespace quickstep {
 
 class CatalogDatabaseLite;
+class QueryProcessor;
 
 /** \addtogroup QueryExecution
  *  @{
@@ -58,19 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    * @brief Constructor.
    *
    * @param foreman_client_id The TMB client ID of the Foreman.
-   * @param save_catalog_callback The callback used to save catalog upon the query
-   *        completion.
    * @param catalog_database The CatalogDatabase used.
+   * @param query_processor The QueryProcessor to save catalog upon the query
+   *        completion.
    * @param bus The TMB.
    **/
   PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
-                            std::function<void()> &&save_catalog_callback,
                             CatalogDatabaseLite *catalog_database,
+                            QueryProcessor *query_processor,
                             ShiftbossDirectory *shiftboss_directory,
                             tmb::MessageBus *bus)
       : PolicyEnforcerBase(catalog_database),
         foreman_client_id_(foreman_client_id),
-        save_catalog_callback_(std::move(save_catalog_callback)),
+        query_processor_(query_processor),
         shiftboss_directory_(shiftboss_directory),
         bus_(bus) {}
 
@@ -159,8 +159,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
 
   const tmb::client_id foreman_client_id_;
 
-  const std::function<void()> save_catalog_callback_;
-
+  QueryProcessor *query_processor_;
   ShiftbossDirectory *shiftboss_directory_;
 
   tmb::MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 3b1259a..0eeb83f 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -71,8 +71,6 @@ namespace {
 
 constexpr int kNumInstances = 3;
 
-void nop() {}
-
 }  // namespace
 
 const char *DistributedExecutionGeneratorTestRunner::kResetOption =
@@ -110,8 +108,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
   // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
   // could receive a registration message from the latter.
-  foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
-                                             test_database_loader_->catalog_database());
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(),
+                                             nullptr /* query_processor */);
 
   // We don't use the NUMA aware version of worker code.
   const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,