You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/05 22:04:09 UTC

[71/72] incubator-quickstep git commit: Saved catalog in the distributed version.

Saved catalog in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/27a80558
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/27a80558
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/27a80558

Branch: refs/heads/reorder-partitioned-hash-join
Commit: 27a8055872f82737c35f6f0914ce43bcbe272ce3
Parents: dda085c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Feb 5 02:16:34 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Feb 5 02:16:34 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Conductor.cpp                             |  5 ++++-
 query_execution/ForemanDistributed.cpp                    |  3 +++
 query_execution/ForemanDistributed.hpp                    |  4 ++++
 query_execution/PolicyEnforcerDistributed.cpp             |  2 ++
 query_execution/PolicyEnforcerDistributed.hpp             |  7 +++++++
 .../tests/DistributedExecutionGeneratorTestRunner.cpp     | 10 +++++++++-
 6 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index c4a2721..13d4d57 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -22,6 +22,7 @@
 #include <cstddef>
 #include <cstdlib>
 #include <exception>
+#include <functional>
 #include <memory>
 #include <sstream>
 #include <string>
@@ -95,7 +96,9 @@ void Conductor::init() {
   block_locator_ = make_unique<BlockLocator>(&bus_);
   block_locator_->start();
 
-  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, query_processor_->getDefaultDatabase());
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_,
+                                             std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
+                                             query_processor_->getDefaultDatabase());
   foreman_->start();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index fe4c483..4d95f16 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -17,6 +17,7 @@
 #include <cstddef>
 #include <cstdio>
 #include <cstdlib>
+#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <unordered_set>
@@ -67,6 +68,7 @@ class QueryHandle;
 
 ForemanDistributed::ForemanDistributed(
     const BlockLocator &block_locator,
+    std::function<void()> &&save_catalog_callback,
     MessageBus *bus,
     CatalogDatabaseLite *catalog_database,
     const int cpu_id)
@@ -106,6 +108,7 @@ ForemanDistributed::ForemanDistributed(
 
   policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
       foreman_client_id_,
+      move(save_catalog_callback),
       catalog_database_,
       &shiftboss_directory_,
       bus_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index ed09fda..5f1a14b 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -17,6 +17,7 @@
 
 #include <cstddef>
 #include <cstdio>
+#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <unordered_set>
@@ -55,6 +56,8 @@ class ForemanDistributed final : public ForemanBase {
    * @param block_locator The block locator that manages block location info.
    * @param bus A pointer to the TMB.
    * @param catalog_database The catalog database where this query is executed.
+   * @param save_catalog_callback The callback used to save catalog upon the query
+   *        completion.
    * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
    *
    * @note If cpu_id is not specified, Foreman thread can be possibly moved
@@ -62,6 +65,7 @@ class ForemanDistributed final : public ForemanBase {
   **/
   ForemanDistributed(
       const BlockLocator &block_locator,
+      std::function<void()> &&save_catalog_callback,
       tmb::MessageBus *bus,
       CatalogDatabaseLite *catalog_database,
       const int cpu_id = -1);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index e9f57d3..38b8a34 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -227,6 +227,8 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   }
 
   if (query_result == nullptr) {
+    save_catalog_callback_();
+
     // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
     serialization::QueryTeardownMessage proto;
     proto.set_query_id(query_id);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 248948a..f8476c8 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -16,6 +16,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
 
 #include <cstddef>
+#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <utility>
@@ -57,15 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    * @brief Constructor.
    *
    * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param save_catalog_callback The callback used to save catalog upon the query
+   *        completion.
    * @param catalog_database The CatalogDatabase used.
    * @param bus The TMB.
    **/
   PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+                            std::function<void()> &&save_catalog_callback,
                             CatalogDatabaseLite *catalog_database,
                             ShiftbossDirectory *shiftboss_directory,
                             tmb::MessageBus *bus)
       : PolicyEnforcerBase(catalog_database),
         foreman_client_id_(foreman_client_id),
+        save_catalog_callback_(std::move(save_catalog_callback)),
         shiftboss_directory_(shiftboss_directory),
         bus_(bus) {}
 
@@ -154,6 +159,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
 
   const tmb::client_id foreman_client_id_;
 
+  const std::function<void()> save_catalog_callback_;
+
   ShiftbossDirectory *shiftboss_directory_;
 
   tmb::MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 45d4fdf..2e18467 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -21,6 +21,7 @@
 
 #include <cstdio>
 #include <cstdlib>
+#include <functional>
 #include <memory>
 #include <set>
 #include <string>
@@ -64,6 +65,12 @@ class CatalogRelation;
 
 namespace optimizer {
 
+namespace {
+
+void nop() {}
+
+}  // namespace
+
 const char *DistributedExecutionGeneratorTestRunner::kResetOption =
     "reset_before_execution";
 
@@ -98,7 +105,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
   // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
   // could receive a registration message from the latter.
-  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database());
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
+                                             test_database_loader_->catalog_database());
 
   // We don't use the NUMA aware version of worker code.
   const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,