You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/12/04 23:44:35 UTC

[1/3] incubator-quickstep git commit: Logged only in the debug mode. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/mark_single_node_query 5e03cdd17 -> 0859a17aa (forced update)


Logged only in the debug mode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ff89dd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ff89dd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ff89dd5

Branch: refs/heads/mark_single_node_query
Commit: 5ff89dd5427bf904e1406111bc2877836f586196
Parents: c608e99
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Nov 30 23:07:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Nov 30 23:07:25 2016 -0800

----------------------------------------------------------------------
 storage/StorageManager.cpp | 26 +++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ff89dd5/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 56ca323..a202f71 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -557,9 +557,9 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
                         kGetPeerDomainNetworkAddressesMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
-            << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
-            << "') to BlockLocator";
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+             << "') to BlockLocator";
 
   DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
   DCHECK(bus_ != nullptr);
@@ -573,8 +573,8 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   CHECK_EQ(block_locator_client_id_, annotated_message.sender);
   CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
-  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
-            << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
 
   serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
   CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -591,10 +591,10 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
                                               const tmb::message_type_id message_type) {
   switch (message_type) {
     case kAddBlockLocationMessage:
-      LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      DLOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
       break;
     case kDeleteBlockLocationMessage:
-      LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      DLOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
       break;
     default:
       LOG(FATAL) << "Unknown message type " << message_type;
@@ -613,9 +613,9 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
                         message_type);
   free(proto_bytes);
 
-  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
-            << "') sent BlockLocationMessage (typed '" << message_type
-            << "') to BlockLocator";
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') sent BlockLocationMessage (typed '" << message_type
+             << "') to BlockLocator";
   CHECK(MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          storage_manager_client_id_,
@@ -635,7 +635,7 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // TODO(quickstep-team): Use a cost model to determine whether to load from
   // a remote peer or the disk.
   if (BlockIdUtil::Domain(block) != block_domain_) {
-    LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
     const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
     for (const string &peer_domain_network_address : peer_domain_network_addresses) {
       DataExchangerClientAsync client(
@@ -648,8 +648,8 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
       }
     }
 
-    LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
-              << " from remote peers, so try to load from disk.";
+    DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+               << " from remote peers, so try to load from disk.";
   }
 #endif
 


[2/3] incubator-quickstep git commit: LOG only in the debug mode for BlockLocator.

Posted by zu...@apache.org.
LOG only in the debug mode for BlockLocator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e50a2b7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e50a2b7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e50a2b7a

Branch: refs/heads/mark_single_node_query
Commit: e50a2b7aaed97f3282c338be0b36071a5cffd523
Parents: 5ff89dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Dec 3 21:24:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Dec 3 21:24:25 2016 -0800

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp            | 48 ++++++++++++------------
 query_execution/QueryManagerDistributed.cpp |  4 +-
 2 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 81684ba..5de6a54 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -55,8 +55,8 @@ void BlockLocator::run() {
     const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
     const TaggedMessage &tagged_message = annotated_message.tagged_message;
     const client_id sender = annotated_message.sender;
-    LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
-              << "' message from TMB Client " << sender;
+    DLOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+               << "' message from TMB Client " << sender;
     switch (tagged_message.message_type()) {
       case kBlockDomainRegistrationMessage: {
         serialization::BlockDomainRegistrationMessage proto;
@@ -77,9 +77,9 @@ void BlockLocator::run() {
         DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
 
         if (result_domain_blocks.second) {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
         } else {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
         }
         break;
       }
@@ -95,9 +95,9 @@ void BlockLocator::run() {
           block_locations_[block].erase(domain);
           domain_blocks_[domain].erase(block);
 
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
         } else {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
         }
         break;
       }
@@ -128,7 +128,7 @@ void BlockLocator::run() {
         }
         domain_blocks_.erase(domain);
 
-        LOG(INFO) << "Unregistered Domain " << domain;
+        DLOG(INFO) << "Unregistered Domain " << domain;
         break;
       }
       case kPoisonMessage: {
@@ -153,14 +153,14 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
   TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kBlockDomainRegistrationResponseMessage);
+                        proto_length,
+                        kBlockDomainRegistrationResponseMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent BlockDomainRegistrationResponseMessage (typed '"
-            << kBlockDomainRegistrationResponseMessage
-            << "') to Worker (id '" << receiver << "')";
+  DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+             << "') sent BlockDomainRegistrationResponseMessage (typed '"
+             << kBlockDomainRegistrationResponseMessage
+             << "') to TMB Client (id '" << receiver << "')";
   CHECK(tmb::MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          locator_client_id_,
@@ -181,13 +181,13 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
   TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kLocateBlockResponseMessage);
+                        proto_length,
+                        kLocateBlockResponseMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
-            << "') to StorageManager (id '" << receiver << "')";
+  DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+             << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+             << "') to StorageManager (id '" << receiver << "')";
   CHECK(tmb::MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          locator_client_id_,
@@ -208,14 +208,14 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
   TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kGetPeerDomainNetworkAddressesResponseMessage);
+                        proto_length,
+                        kGetPeerDomainNetworkAddressesResponseMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
-            << kGetPeerDomainNetworkAddressesResponseMessage
-            << "') to StorageManager (id '" << receiver << "')";
+  DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+             << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+             << kGetPeerDomainNetworkAddressesResponseMessage
+             << "') to StorageManager (id '" << receiver << "')";
   CHECK(tmb::MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          locator_client_id_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 20650d0..5c7e0d8 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -176,8 +176,8 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
     shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
   }
 
-  LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
-            << "') to all Shiftbosses";
+  DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+             << "') to all Shiftbosses";
   QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
                                        shiftboss_addresses,
                                        move(tagged_msg),


[3/3] incubator-quickstep git commit: Marked SingleNodeQuery for Insertions.

Posted by zu...@apache.org.
Marked SingleNodeQuery for Insertions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0859a17a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0859a17a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0859a17a

Branch: refs/heads/mark_single_node_query
Commit: 0859a17aa4e71ef8d3d261f15e52518b39f617f6
Parents: e50a2b7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 14:11:58 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 15:44:24 2016 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        | 16 +++++++---
 query_execution/PolicyEnforcerDistributed.hpp | 23 +++++++++++---
 query_optimizer/ExecutionGenerator.cpp        |  3 ++
 query_optimizer/QueryHandle.hpp               | 37 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 61f0603..0dad8b0 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   return true;
 }
 
+namespace {
+constexpr size_t kDefaultShiftbossIndex = 0u;
+}  // namespace
+
 void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
-  const size_t num_shiftbosses = shiftboss_directory_.size();
-  size_t shiftboss_index = 0u;
+  static size_t shiftboss_index = kDefaultShiftbossIndex;
+
+  PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get());
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
     const S::WorkOrderMessage &proto = *message;
     size_t shiftboss_index_for_particular_work_order_type;
-    if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+    if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
+      // Always schedule the single-node query to the same Shiftboss.
+      shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+    } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else {
       // TODO(zuyu): Take data-locality into account for scheduling.
@@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
     shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
 
     if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
-      shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+      shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size();
     } else {
       // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
       // <shiftboss_index_for_particular_work_order_type> might be scheduled one

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e8bc394..2c00a6b 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -17,14 +17,20 @@
 
 #include <cstddef>
 #include <memory>
+#include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 #include "tmb/id_typedefs.h"
 
 namespace tmb {
@@ -35,10 +41,6 @@ class TaggedMessage;
 namespace quickstep {
 
 class CatalogDatabaseLite;
-class QueryHandle;
-class QueryManagerBase;
-
-namespace serialization { class WorkOrderMessage; }
 
 /** \addtogroup QueryExecution
  *  @{
@@ -90,6 +92,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
 
   /**
+   * @brief Whether the query should be executed on one Shiftboss.
+   *
+   * @param query_id The query id.
+   *
+   * @return Whether the query should be executed on one Shiftboss.
+   **/
+  bool isSingleNodeQuery(const std::size_t query_id) const {
+    const auto cit = admitted_queries_.find(query_id);
+    DCHECK(cit != admitted_queries_.end());
+    return cit->second->query_handle()->is_single_node_query();
+  }
+
+  /**
    * @brief Get or set the index of Shiftboss for an Aggregation related
    * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
    * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e0d8f3..5a2c450 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable(
 void ExecutionGenerator::convertInsertTuple(
     const P::InsertTuplePtr &physical_plan) {
   // InsertTuple is converted to an Insert and a SaveBlocks.
+#ifdef QUICKSTEP_DISTRIBUTED
+  query_handle_->set_is_single_node_query();
+#endif  // QUICKSTEP_DISTRIBUTED
 
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 1ca6021..cbd1cd9 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/Catalog.pb.h"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
 #include "utility/Macros.hpp"
 
@@ -134,6 +135,22 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Whether the query will be executed in the single node.
+   */
+  bool is_single_node_query() const {
+    return is_single_node_query_;
+  }
+
+  /**
+   * @brief Set the query to be executed in the single node.
+   */
+  void set_is_single_node_query() {
+    is_single_node_query_ = true;
+  }
+#endif  // QUICKSTEP_DISTRIBUTED
+
  private:
   const std::size_t query_id_;
 
@@ -153,6 +170,26 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Indicate whether the query should be executed on the default Shiftboss for
+  // correctness purpose.
+  // An example would be the insert query that might otherwise need block
+  // invalidation among multiple StorageManagers. In this case, an insert query
+  // has scheduled on node 0, and the block is in the buffer pool of node 0.
+  // Another insert query on the same relation might be scheduled on another
+  // node, say node 1, which will pull the block from node 0, and do the
+  // insertion. Thus, two blocks with the same block id in two nodes
+  // have different contents, which is incorrect.
+  // One approach is to evict blocks cached in all other nodes for every
+  // change. It, however, does not scale, and even worse, it will also affect
+  // the performance of each select query.
+  // Alternatively, we choose to mark the query as a single-node query to
+  // modify blocks on the default node only. But if the changed block has also
+  // cached in another node, this approach would still produce inconsistent
+  // query result.
+  bool is_single_node_query_ = false;
+#endif  // QUICKSTEP_DISTRIBUTED
+
   DISALLOW_COPY_AND_ASSIGN(QueryHandle);
 };