You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/12/04 23:44:35 UTC
[1/3] incubator-quickstep git commit: Logged only in the debug mode.
[Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/mark_single_node_query 5e03cdd17 -> 0859a17aa (forced update)
Logged only in the debug mode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ff89dd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ff89dd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ff89dd5
Branch: refs/heads/mark_single_node_query
Commit: 5ff89dd5427bf904e1406111bc2877836f586196
Parents: c608e99
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Nov 30 23:07:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Nov 30 23:07:25 2016 -0800
----------------------------------------------------------------------
storage/StorageManager.cpp | 26 +++++++++++++-------------
1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ff89dd5/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 56ca323..a202f71 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -557,9 +557,9 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
kGetPeerDomainNetworkAddressesMessage);
free(proto_bytes);
- LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
- << "') to BlockLocator";
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+ << "') to BlockLocator";
DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
DCHECK(bus_ != nullptr);
@@ -573,8 +573,8 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
const TaggedMessage &tagged_message = annotated_message.tagged_message;
CHECK_EQ(block_locator_client_id_, annotated_message.sender);
CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
- LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -591,10 +591,10 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
const tmb::message_type_id message_type) {
switch (message_type) {
case kAddBlockLocationMessage:
- LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ DLOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
break;
case kDeleteBlockLocationMessage:
- LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ DLOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
break;
default:
LOG(FATAL) << "Unknown message type " << message_type;
@@ -613,9 +613,9 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
message_type);
free(proto_bytes);
- LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent BlockLocationMessage (typed '" << message_type
- << "') to BlockLocator";
+ DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockLocationMessage (typed '" << message_type
+ << "') to BlockLocator";
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
storage_manager_client_id_,
@@ -635,7 +635,7 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
// TODO(quickstep-team): Use a cost model to determine whether to load from
// a remote peer or the disk.
if (BlockIdUtil::Domain(block) != block_domain_) {
- LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+ DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
for (const string &peer_domain_network_address : peer_domain_network_addresses) {
DataExchangerClientAsync client(
@@ -648,8 +648,8 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
}
}
- LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
- << " from remote peers, so try to load from disk.";
+ DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+ << " from remote peers, so try to load from disk.";
}
#endif
[2/3] incubator-quickstep git commit: LOG only in the debug mode for
BlockLocator.
Posted by zu...@apache.org.
LOG only in the debug mode for BlockLocator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e50a2b7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e50a2b7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e50a2b7a
Branch: refs/heads/mark_single_node_query
Commit: e50a2b7aaed97f3282c338be0b36071a5cffd523
Parents: 5ff89dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Dec 3 21:24:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Dec 3 21:24:25 2016 -0800
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 48 ++++++++++++------------
query_execution/QueryManagerDistributed.cpp | 4 +-
2 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 81684ba..5de6a54 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -55,8 +55,8 @@ void BlockLocator::run() {
const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const client_id sender = annotated_message.sender;
- LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
- << "' message from TMB Client " << sender;
+ DLOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+ << "' message from TMB Client " << sender;
switch (tagged_message.message_type()) {
case kBlockDomainRegistrationMessage: {
serialization::BlockDomainRegistrationMessage proto;
@@ -77,9 +77,9 @@ void BlockLocator::run() {
DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
if (result_domain_blocks.second) {
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
} else {
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
}
break;
}
@@ -95,9 +95,9 @@ void BlockLocator::run() {
block_locations_[block].erase(domain);
domain_blocks_[domain].erase(block);
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
} else {
- LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+ DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
}
break;
}
@@ -128,7 +128,7 @@ void BlockLocator::run() {
}
domain_blocks_.erase(domain);
- LOG(INFO) << "Unregistered Domain " << domain;
+ DLOG(INFO) << "Unregistered Domain " << domain;
break;
}
case kPoisonMessage: {
@@ -153,14 +153,14 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kBlockDomainRegistrationResponseMessage);
+ proto_length,
+ kBlockDomainRegistrationResponseMessage);
free(proto_bytes);
- LOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent BlockDomainRegistrationResponseMessage (typed '"
- << kBlockDomainRegistrationResponseMessage
- << "') to Worker (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') to TMB Client (id '" << receiver << "')";
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
@@ -181,13 +181,13 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kLocateBlockResponseMessage);
+ proto_length,
+ kLocateBlockResponseMessage);
free(proto_bytes);
- LOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
- << "') to StorageManager (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
@@ -208,14 +208,14 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kGetPeerDomainNetworkAddressesResponseMessage);
+ proto_length,
+ kGetPeerDomainNetworkAddressesResponseMessage);
free(proto_bytes);
- LOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
- << kGetPeerDomainNetworkAddressesResponseMessage
- << "') to StorageManager (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+ << kGetPeerDomainNetworkAddressesResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 20650d0..5c7e0d8 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -176,8 +176,8 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
}
- LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
- << "') to all Shiftbosses";
+ DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+ << "') to all Shiftbosses";
QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
shiftboss_addresses,
move(tagged_msg),
[3/3] incubator-quickstep git commit: Marked SingleNodeQuery for
Insertions.
Posted by zu...@apache.org.
Marked SingleNodeQuery for Insertions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0859a17a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0859a17a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0859a17a
Branch: refs/heads/mark_single_node_query
Commit: 0859a17aa4e71ef8d3d261f15e52518b39f617f6
Parents: e50a2b7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 14:11:58 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 15:44:24 2016 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 16 +++++++---
query_execution/PolicyEnforcerDistributed.hpp | 23 +++++++++++---
query_optimizer/ExecutionGenerator.cpp | 3 ++
query_optimizer/QueryHandle.hpp | 37 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 61f0603..0dad8b0 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
return true;
}
+namespace {
+constexpr size_t kDefaultShiftbossIndex = 0u;
+} // namespace
+
void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
- const size_t num_shiftbosses = shiftboss_directory_.size();
- size_t shiftboss_index = 0u;
+ static size_t shiftboss_index = kDefaultShiftbossIndex;
+
+ PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get());
for (const auto &message : messages) {
DCHECK(message != nullptr);
const S::WorkOrderMessage &proto = *message;
size_t shiftboss_index_for_particular_work_order_type;
- if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
+ // Always schedule the single-node query to the same Shiftboss.
+ shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+ } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else {
// TODO(zuyu): Take data-locality into account for scheduling.
@@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
- shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+ shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size();
} else {
// NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
// <shiftboss_index_for_particular_work_order_type> might be scheduled one
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e8bc394..2c00a6b 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -17,14 +17,20 @@
#include <cstddef>
#include <memory>
+#include <unordered_map>
+#include <utility>
#include <vector>
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
+#include "glog/logging.h"
+
#include "tmb/id_typedefs.h"
namespace tmb {
@@ -35,10 +41,6 @@ class TaggedMessage;
namespace quickstep {
class CatalogDatabaseLite;
-class QueryHandle;
-class QueryManagerBase;
-
-namespace serialization { class WorkOrderMessage; }
/** \addtogroup QueryExecution
* @{
@@ -90,6 +92,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
/**
+ * @brief Whether the query should be executed on one Shiftboss.
+ *
+ * @param query_id The query id.
+ *
+ * @return Whether the query should be executed on one Shiftboss.
+ **/
+ bool isSingleNodeQuery(const std::size_t query_id) const {
+ const auto cit = admitted_queries_.find(query_id);
+ DCHECK(cit != admitted_queries_.end());
+ return cit->second->query_handle()->is_single_node_query();
+ }
+
+ /**
* @brief Get or set the index of Shiftboss for an Aggregation related
* WorkOrder. If it is the first Aggregation on <aggr_state_index>,
* <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e0d8f3..5a2c450 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable(
void ExecutionGenerator::convertInsertTuple(
const P::InsertTuplePtr &physical_plan) {
// InsertTuple is converted to an Insert and a SaveBlocks.
+#ifdef QUICKSTEP_DISTRIBUTED
+ query_handle_->set_is_single_node_query();
+#endif // QUICKSTEP_DISTRIBUTED
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 1ca6021..cbd1cd9 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -26,6 +26,7 @@
#include "catalog/Catalog.pb.h"
#include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "query_optimizer/QueryPlan.hpp"
#include "utility/Macros.hpp"
@@ -134,6 +135,22 @@ class QueryHandle {
query_result_relation_ = relation;
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Whether the query will be executed in the single node.
+ */
+ bool is_single_node_query() const {
+ return is_single_node_query_;
+ }
+
+ /**
+ * @brief Set the query to be executed in the single node.
+ */
+ void set_is_single_node_query() {
+ is_single_node_query_ = true;
+ }
+#endif // QUICKSTEP_DISTRIBUTED
+
private:
const std::size_t query_id_;
@@ -153,6 +170,26 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;
+#ifdef QUICKSTEP_DISTRIBUTED
+ // Indicate whether the query should be executed on the default Shiftboss for
+ // correctness purpose.
+ // An example would be the insert query that might otherwise need block
+ // invalidation among multiple StorageManagers. In this case, an insert query
+ // has scheduled on node 0, and the block is in the buffer pool of node 0.
+ // Another insert query on the same relation might be scheduled on another
+ // node, say node 1, which will pull the block from node 0, and do the
+ // insertion. Thus, two blocks with the same block id in two nodes
+ // have different contents, which is incorrect.
+ // One approach is to evict blocks cached in all other nodes for every
+ // change. It, however, does not scale, and even worse, it will also affect
+ // the performance of each select query.
+ // Alternatively, we choose to mark the query as a single-node query to
+ // modify blocks on the default node only. But if the changed block has also
+ // cached in another node, this approach would still produce inconsistent
+ // query result.
+ bool is_single_node_query_ = false;
+#endif // QUICKSTEP_DISTRIBUTED
+
DISALLOW_COPY_AND_ASSIGN(QueryHandle);
};