You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/03 01:14:56 UTC
incubator-quickstep git commit: Refactored catalog saving in the
distributed version.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 964a80649 -> ccb2852f7
Refactored catalog saving in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccb2852f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccb2852f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccb2852f
Branch: refs/heads/master
Commit: ccb2852f71da77d364d4bfcb276cb6318b751a8c
Parents: 964a806
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 17:14:50 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 17:14:50 2017 -0800
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 +
query_execution/ForemanDistributed.cpp | 5 ++---
query_execution/ForemanDistributed.hpp | 6 +++---
query_execution/PolicyEnforcerDistributed.cpp | 5 ++++-
query_execution/PolicyEnforcerDistributed.hpp | 13 ++++++-------
.../tests/DistributedExecutionGeneratorTestRunner.cpp | 6 ++----
6 files changed, 18 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 12d6be0..23b706f 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -165,6 +165,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_ShiftbossDirectory
quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryProcessor
quickstep_storage_StorageBlockInfo
quickstep_utility_ExecutionDAGVisualizer
quickstep_utility_Macros
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 389d6ab..57f432f 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -17,7 +17,6 @@
#include <cstddef>
#include <cstdio>
#include <cstdlib>
-#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
@@ -68,9 +67,9 @@ class QueryHandle;
ForemanDistributed::ForemanDistributed(
const BlockLocator &block_locator,
- std::function<void()> &&save_catalog_callback,
MessageBus *bus,
CatalogDatabaseLite *catalog_database,
+ QueryProcessor *query_processor,
const int cpu_id)
: ForemanBase(bus, cpu_id),
block_locator_(block_locator),
@@ -108,8 +107,8 @@ ForemanDistributed::ForemanDistributed(
policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
foreman_client_id_,
- move(save_catalog_callback),
catalog_database_,
+ query_processor,
&shiftboss_directory_,
bus_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 5f1a14b..7fc98bd 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -17,7 +17,6 @@
#include <cstddef>
#include <cstdio>
-#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
@@ -36,6 +35,7 @@ namespace quickstep {
class BlockLocator;
class CatalogDatabaseLite;
+class QueryProcessor;
namespace serialization { class WorkOrderMessage; }
@@ -56,7 +56,7 @@ class ForemanDistributed final : public ForemanBase {
* @param block_locator The block locator that manages block location info.
* @param bus A pointer to the TMB.
* @param catalog_database The catalog database where this query is executed.
- * @param save_catalog_callback The callback used to save catalog upon the query
+ * @param query_processor The QueryProcessor to save catalog upon the query
* completion.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
*
@@ -65,9 +65,9 @@ class ForemanDistributed final : public ForemanBase {
**/
ForemanDistributed(
const BlockLocator &block_locator,
- std::function<void()> &&save_catalog_callback,
tmb::MessageBus *bus,
CatalogDatabaseLite *catalog_database,
+ QueryProcessor *query_processor,
const int cpu_id = -1);
~ForemanDistributed() override {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6ee58a8..25f2d72 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -35,6 +35,7 @@
#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/QueryManagerDistributed.hpp"
#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/ExecutionDAGVisualizer.hpp"
@@ -259,7 +260,9 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
}
if (query_result == nullptr) {
- save_catalog_callback_();
+ if (query_processor_) {
+ query_processor_->saveCatalog();
+ }
// Clean up query execution states, i.e., QueryContext, in Shiftbosses.
serialization::QueryTeardownMessage proto;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index f8476c8..18fd9ae 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -16,7 +16,6 @@
#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
#include <cstddef>
-#include <functional>
#include <memory>
#include <unordered_map>
#include <utility>
@@ -43,6 +42,7 @@ class TaggedMessage;
namespace quickstep {
class CatalogDatabaseLite;
+class QueryProcessor;
/** \addtogroup QueryExecution
* @{
@@ -58,19 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
* @brief Constructor.
*
* @param foreman_client_id The TMB client ID of the Foreman.
- * @param save_catalog_callback The callback used to save catalog upon the query
- * completion.
* @param catalog_database The CatalogDatabase used.
+ * @param query_processor The QueryProcessor to save catalog upon the query
+ * completion.
* @param bus The TMB.
**/
PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
- std::function<void()> &&save_catalog_callback,
CatalogDatabaseLite *catalog_database,
+ QueryProcessor *query_processor,
ShiftbossDirectory *shiftboss_directory,
tmb::MessageBus *bus)
: PolicyEnforcerBase(catalog_database),
foreman_client_id_(foreman_client_id),
- save_catalog_callback_(std::move(save_catalog_callback)),
+ query_processor_(query_processor),
shiftboss_directory_(shiftboss_directory),
bus_(bus) {}
@@ -159,8 +159,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
const tmb::client_id foreman_client_id_;
- const std::function<void()> save_catalog_callback_;
-
+ QueryProcessor *query_processor_;
ShiftbossDirectory *shiftboss_directory_;
tmb::MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 3b1259a..0eeb83f 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -71,8 +71,6 @@ namespace {
constexpr int kNumInstances = 3;
-void nop() {}
-
} // namespace
const char *DistributedExecutionGeneratorTestRunner::kResetOption =
@@ -110,8 +108,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
- test_database_loader_->catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(),
+ nullptr /* query_processor */);
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,