You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/05 23:51:45 UTC
[4/5] incubator-quickstep git commit: Saved catalog in the
distributed version.
Saved catalog in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/27a80558
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/27a80558
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/27a80558
Branch: refs/heads/reduce-group-by-attrs
Commit: 27a8055872f82737c35f6f0914ce43bcbe272ce3
Parents: dda085c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Feb 5 02:16:34 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Feb 5 02:16:34 2017 -0800
----------------------------------------------------------------------
cli/distributed/Conductor.cpp | 5 ++++-
query_execution/ForemanDistributed.cpp | 3 +++
query_execution/ForemanDistributed.hpp | 4 ++++
query_execution/PolicyEnforcerDistributed.cpp | 2 ++
query_execution/PolicyEnforcerDistributed.hpp | 7 +++++++
.../tests/DistributedExecutionGeneratorTestRunner.cpp | 10 +++++++++-
6 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index c4a2721..13d4d57 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -22,6 +22,7 @@
#include <cstddef>
#include <cstdlib>
#include <exception>
+#include <functional>
#include <memory>
#include <sstream>
#include <string>
@@ -95,7 +96,9 @@ void Conductor::init() {
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
- foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, query_processor_->getDefaultDatabase());
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_,
+ std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
+ query_processor_->getDefaultDatabase());
foreman_->start();
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index fe4c483..4d95f16 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -17,6 +17,7 @@
#include <cstddef>
#include <cstdio>
#include <cstdlib>
+#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
@@ -67,6 +68,7 @@ class QueryHandle;
ForemanDistributed::ForemanDistributed(
const BlockLocator &block_locator,
+ std::function<void()> &&save_catalog_callback,
MessageBus *bus,
CatalogDatabaseLite *catalog_database,
const int cpu_id)
@@ -106,6 +108,7 @@ ForemanDistributed::ForemanDistributed(
policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
foreman_client_id_,
+ move(save_catalog_callback),
catalog_database_,
&shiftboss_directory_,
bus_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index ed09fda..5f1a14b 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -17,6 +17,7 @@
#include <cstddef>
#include <cstdio>
+#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
@@ -55,6 +56,8 @@ class ForemanDistributed final : public ForemanBase {
* @param block_locator The block locator that manages block location info.
* @param bus A pointer to the TMB.
* @param catalog_database The catalog database where this query is executed.
+ * @param save_catalog_callback The callback used to save catalog upon the query
+ * completion.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
*
* @note If cpu_id is not specified, Foreman thread can be possibly moved
@@ -62,6 +65,7 @@ class ForemanDistributed final : public ForemanBase {
**/
ForemanDistributed(
const BlockLocator &block_locator,
+ std::function<void()> &&save_catalog_callback,
tmb::MessageBus *bus,
CatalogDatabaseLite *catalog_database,
const int cpu_id = -1);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index e9f57d3..38b8a34 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -227,6 +227,8 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
}
if (query_result == nullptr) {
+ save_catalog_callback_();
+
// Clean up query execution states, i.e., QueryContext, in Shiftbosses.
serialization::QueryTeardownMessage proto;
proto.set_query_id(query_id);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 248948a..f8476c8 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -16,6 +16,7 @@
#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
#include <cstddef>
+#include <functional>
#include <memory>
#include <unordered_map>
#include <utility>
@@ -57,15 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
* @brief Constructor.
*
* @param foreman_client_id The TMB client ID of the Foreman.
+ * @param save_catalog_callback The callback used to save catalog upon the query
+ * completion.
* @param catalog_database The CatalogDatabase used.
* @param bus The TMB.
**/
PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+ std::function<void()> &&save_catalog_callback,
CatalogDatabaseLite *catalog_database,
ShiftbossDirectory *shiftboss_directory,
tmb::MessageBus *bus)
: PolicyEnforcerBase(catalog_database),
foreman_client_id_(foreman_client_id),
+ save_catalog_callback_(std::move(save_catalog_callback)),
shiftboss_directory_(shiftboss_directory),
bus_(bus) {}
@@ -154,6 +159,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
const tmb::client_id foreman_client_id_;
+ const std::function<void()> save_catalog_callback_;
+
ShiftbossDirectory *shiftboss_directory_;
tmb::MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 45d4fdf..2e18467 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -21,6 +21,7 @@
#include <cstdio>
#include <cstdlib>
+#include <functional>
#include <memory>
#include <set>
#include <string>
@@ -64,6 +65,12 @@ class CatalogRelation;
namespace optimizer {
+namespace {
+
+void nop() {}
+
+} // namespace
+
const char *DistributedExecutionGeneratorTestRunner::kResetOption =
"reset_before_execution";
@@ -98,7 +105,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
+ test_database_loader_->catalog_database());
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,