You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:33 UTC

[41/50] incubator-quickstep git commit: Marked SingleNodeQuery for Insertions.

Marked SingleNodeQuery for Insertions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0859a17a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0859a17a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0859a17a

Branch: refs/heads/quickstep_partition_parser_support
Commit: 0859a17aa4e71ef8d3d261f15e52518b39f617f6
Parents: e50a2b7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 14:11:58 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 15:44:24 2016 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        | 16 +++++++---
 query_execution/PolicyEnforcerDistributed.hpp | 23 +++++++++++---
 query_optimizer/ExecutionGenerator.cpp        |  3 ++
 query_optimizer/QueryHandle.hpp               | 37 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 61f0603..0dad8b0 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   return true;
 }
 
+namespace {
+constexpr size_t kDefaultShiftbossIndex = 0u;
+}  // namespace
+
 void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
-  const size_t num_shiftbosses = shiftboss_directory_.size();
-  size_t shiftboss_index = 0u;
+  static size_t shiftboss_index = kDefaultShiftbossIndex;
+
+  PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get());
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
     const S::WorkOrderMessage &proto = *message;
     size_t shiftboss_index_for_particular_work_order_type;
-    if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+    if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
+      // Always schedule the single-node query to the same Shiftboss.
+      shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+    } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else {
       // TODO(zuyu): Take data-locality into account for scheduling.
@@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
     shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
 
     if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
-      shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+      shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size();
     } else {
       // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
       // <shiftboss_index_for_particular_work_order_type> might be scheduled one

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e8bc394..2c00a6b 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -17,14 +17,20 @@
 
 #include <cstddef>
 #include <memory>
+#include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 #include "tmb/id_typedefs.h"
 
 namespace tmb {
@@ -35,10 +41,6 @@ class TaggedMessage;
 namespace quickstep {
 
 class CatalogDatabaseLite;
-class QueryHandle;
-class QueryManagerBase;
-
-namespace serialization { class WorkOrderMessage; }
 
 /** \addtogroup QueryExecution
  *  @{
@@ -90,6 +92,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
 
   /**
+   * @brief Whether the query should be executed on one Shiftboss.
+   *
+   * @param query_id The query id.
+   *
+   * @return Whether the query should be executed on one Shiftboss.
+   **/
+  bool isSingleNodeQuery(const std::size_t query_id) const {
+    const auto cit = admitted_queries_.find(query_id);
+    DCHECK(cit != admitted_queries_.end());
+    return cit->second->query_handle()->is_single_node_query();
+  }
+
+  /**
    * @brief Get or set the index of Shiftboss for an Aggregation related
    * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
    * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e0d8f3..5a2c450 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable(
 void ExecutionGenerator::convertInsertTuple(
     const P::InsertTuplePtr &physical_plan) {
   // InsertTuple is converted to an Insert and a SaveBlocks.
+#ifdef QUICKSTEP_DISTRIBUTED
+  query_handle_->set_is_single_node_query();
+#endif  // QUICKSTEP_DISTRIBUTED
 
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 1ca6021..cbd1cd9 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/Catalog.pb.h"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
 #include "utility/Macros.hpp"
 
@@ -134,6 +135,22 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Whether the query will be executed in the single node.
+   */
+  bool is_single_node_query() const {
+    return is_single_node_query_;
+  }
+
+  /**
+   * @brief Set the query to be executed in the single node.
+   */
+  void set_is_single_node_query() {
+    is_single_node_query_ = true;
+  }
+#endif  // QUICKSTEP_DISTRIBUTED
+
  private:
   const std::size_t query_id_;
 
@@ -153,6 +170,26 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Indicate whether the query should be executed on the default Shiftboss for
+  // correctness purpose.
+  // An example would be the insert query that might otherwise need block
+  // invalidation among multiple StorageManagers. In this case, an insert query
+  // has scheduled on node 0, and the block is in the buffer pool of node 0.
+  // Another insert query on the same relation might be scheduled on another
+  // node, say node 1, which will pull the block from node 0, and do the
+  // insertion. Thus, two blocks with the same block id in two nodes
+  // have different contents, which is incorrect.
+  // One approach is to evict blocks cached in all other nodes for every
+  // change. It, however, does not scale, and even worse, it will also affect
+  // the performance of each select query.
+  // Alternatively, we choose to mark the query as a single-node query to
+  // modify blocks on the default node only. But if the changed block has also
+  // cached in another node, this approach would still produce inconsistent
+  // query result.
+  bool is_single_node_query_ = false;
+#endif  // QUICKSTEP_DISTRIBUTED
+
   DISALLOW_COPY_AND_ASSIGN(QueryHandle);
 };