You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:33 UTC
[41/50] incubator-quickstep git commit: Marked SingleNodeQuery for
Insertions.
Marked SingleNodeQuery for Insertions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0859a17a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0859a17a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0859a17a
Branch: refs/heads/quickstep_partition_parser_support
Commit: 0859a17aa4e71ef8d3d261f15e52518b39f617f6
Parents: e50a2b7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 14:11:58 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 15:44:24 2016 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 16 +++++++---
query_execution/PolicyEnforcerDistributed.hpp | 23 +++++++++++---
query_optimizer/ExecutionGenerator.cpp | 3 ++
query_optimizer/QueryHandle.hpp | 37 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 61f0603..0dad8b0 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
return true;
}
+namespace {
+constexpr size_t kDefaultShiftbossIndex = 0u;
+} // namespace
+
void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
- const size_t num_shiftbosses = shiftboss_directory_.size();
- size_t shiftboss_index = 0u;
+ static size_t shiftboss_index = kDefaultShiftbossIndex;
+
+ PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get());
for (const auto &message : messages) {
DCHECK(message != nullptr);
const S::WorkOrderMessage &proto = *message;
size_t shiftboss_index_for_particular_work_order_type;
- if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
+ // Always schedule the single-node query to the same Shiftboss.
+ shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+ } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else {
// TODO(zuyu): Take data-locality into account for scheduling.
@@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
- shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+ shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size();
} else {
// NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
// <shiftboss_index_for_particular_work_order_type> might be scheduled one
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e8bc394..2c00a6b 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -17,14 +17,20 @@
#include <cstddef>
#include <memory>
+#include <unordered_map>
+#include <utility>
#include <vector>
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
+#include "glog/logging.h"
+
#include "tmb/id_typedefs.h"
namespace tmb {
@@ -35,10 +41,6 @@ class TaggedMessage;
namespace quickstep {
class CatalogDatabaseLite;
-class QueryHandle;
-class QueryManagerBase;
-
-namespace serialization { class WorkOrderMessage; }
/** \addtogroup QueryExecution
* @{
@@ -90,6 +92,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
/**
+ * @brief Whether the query should be executed on one Shiftboss.
+ *
+ * @param query_id The query id.
+ *
+ * @return Whether the query should be executed on one Shiftboss.
+ **/
+ bool isSingleNodeQuery(const std::size_t query_id) const {
+ const auto cit = admitted_queries_.find(query_id);
+ DCHECK(cit != admitted_queries_.end());
+ return cit->second->query_handle()->is_single_node_query();
+ }
+
+ /**
* @brief Get or set the index of Shiftboss for an Aggregation related
* WorkOrder. If it is the first Aggregation on <aggr_state_index>,
* <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e0d8f3..5a2c450 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable(
void ExecutionGenerator::convertInsertTuple(
const P::InsertTuplePtr &physical_plan) {
// InsertTuple is converted to an Insert and a SaveBlocks.
+#ifdef QUICKSTEP_DISTRIBUTED
+ query_handle_->set_is_single_node_query();
+#endif // QUICKSTEP_DISTRIBUTED
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 1ca6021..cbd1cd9 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -26,6 +26,7 @@
#include "catalog/Catalog.pb.h"
#include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "query_optimizer/QueryPlan.hpp"
#include "utility/Macros.hpp"
@@ -134,6 +135,22 @@ class QueryHandle {
query_result_relation_ = relation;
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Whether the query will be executed in the single node.
+ */
+ bool is_single_node_query() const {
+ return is_single_node_query_;
+ }
+
+ /**
+ * @brief Set the query to be executed in the single node.
+ */
+ void set_is_single_node_query() {
+ is_single_node_query_ = true;
+ }
+#endif // QUICKSTEP_DISTRIBUTED
+
private:
const std::size_t query_id_;
@@ -153,6 +170,26 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;
+#ifdef QUICKSTEP_DISTRIBUTED
+ // Indicate whether the query should be executed on the default Shiftboss for
+ // correctness purpose.
+ // An example would be the insert query that might otherwise need block
+ // invalidation among multiple StorageManagers. In this case, an insert query
+ // has scheduled on node 0, and the block is in the buffer pool of node 0.
+ // Another insert query on the same relation might be scheduled on another
+ // node, say node 1, which will pull the block from node 0, and do the
+ // insertion. Thus, two blocks with the same block id in two nodes
+ // have different contents, which is incorrect.
+ // One approach is to evict blocks cached in all other nodes for every
+ // change. It, however, does not scale, and even worse, it will also affect
+ // the performance of each select query.
+ // Alternatively, we choose to mark the query as a single-node query to
+ // modify blocks on the default node only. But if the changed block has also
+ // cached in another node, this approach would still produce inconsistent
+ // query result.
+ bool is_single_node_query_ = false;
+#endif // QUICKSTEP_DISTRIBUTED
+
DISALLOW_COPY_AND_ASSIGN(QueryHandle);
};