You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/28 08:54:51 UTC
incubator-quickstep git commit: Added Pull-based DataExchange
service. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/data-exchange 659adfed5 -> 3a46073dd (forced update)
Added Pull-based DataExchange service.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3a46073d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3a46073d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3a46073d
Branch: refs/heads/data-exchange
Commit: 3a46073dd20cfaae8a9bd84e18cc772c61175834
Parents: 2bc4a0a
Author: Zuyu Zhang <zz...@pivotal.io>
Authored: Fri May 27 17:41:40 2016 -0700
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Sat May 28 01:52:59 2016 -0700
----------------------------------------------------------------------
CMakeLists.txt | 11 +-
query_execution/BlockLocator.cpp | 223 ++++++++++++++++
query_execution/BlockLocator.hpp | 124 +++++++++
query_execution/CMakeLists.txt | 20 ++
query_execution/QueryExecutionMessages.proto | 41 +++
query_execution/QueryExecutionTypedefs.hpp | 14 +
storage/CMakeLists.txt | 72 +++++-
storage/DataExchange.proto | 31 +++
storage/DataExchangerAsync.cpp | 165 ++++++++++++
storage/DataExchangerAsync.hpp | 97 +++++++
storage/StorageManager.cpp | 298 +++++++++++++++++++++-
storage/StorageManager.hpp | 131 +++++++++-
storage/tests/DataExchange_unittest.cpp | 239 +++++++++++++++++
third_party/iwyu/iwyu_helper.py | 1 +
validate_cmakelists.py | 5 +-
15 files changed, 1461 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 87a8f7c..3769eab 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -659,7 +659,12 @@ set(ENABLE_PUREMEMORY ON CACHE BOOL "Enable PureMemory TMB")
set(ENABLE_LEVELDB OFF CACHE BOOL "Enable LevelDB TMB")
set(ENABLE_MEMORYMIRROR OFF CACHE BOOL "Enable MemoryMirror TMB")
set(ENABLE_NATIVELOG OFF CACHE BOOL "Enable NativeLog TMB")
-set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+
+# The distributed version requires to use the NativeNet implementation.
+if (NOT ENABLE_DISTRIBUTED)
+ set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+endif()
+
set(ENABLE ENABLE_SQLITE OFF CACHE BOOL "Enable SQLite TMB")
set(ENABLE_VOLTDB OFF CACHE BOOL "Enable VoltDB TMB")
set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
@@ -667,6 +672,10 @@ set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
add_subdirectory("${THIRD_PARTY_SOURCE_DIR}/tmb" "${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb")
include_directories(${TMB_INCLUDE_DIRS})
+if (ENABLE_NATIVENET)
+ include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
+endif()
+
# Add all of the module subdirectories. CMakeLists.txt in each of the subdirectories
# defines how to build that module's libraries.
add_subdirectory(catalog)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
new file mode 100644
index 0000000..16a7216
--- /dev/null
+++ b/query_execution/BlockLocator.cpp
@@ -0,0 +1,223 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/BlockLocator.hpp"
+
+#include <cstdlib>
+#include <string>
+#include <utility>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+void BlockLocator::run() {
+ if (cpu_id_ >= 0) {
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ for (;;) {
+ // Receive() is a blocking call, causing this thread to sleep until next
+ // message is received.
+ const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const client_id sender = annotated_message.sender;
+ LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+ << "' message from TMB Client " << sender;
+ switch (tagged_message.message_type()) {
+ case kBlockDomainRegistrationMessage: {
+ serialization::BlockDomainRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
+ break;
+ }
+ case kAddBlockLocationMessage: {
+ serialization::BlockLocationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const block_id block = proto.block_id();
+ const block_id_domain domain = proto.block_domain();
+
+ const auto result_block_locations = block_locations_[block].insert(domain);
+ const auto result_domain_blocks = domain_blocks_[domain].insert(block);
+ DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+
+ if (result_domain_blocks.second) {
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+ } else {
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+ }
+ break;
+ }
+ case kDeleteBlockLocationMessage: {
+ serialization::BlockLocationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const block_id block = proto.block_id();
+ const block_id_domain domain = proto.block_domain();
+
+ const auto cit = block_locations_[block].find(domain);
+ if (cit != block_locations_[block].end()) {
+ block_locations_[block].erase(domain);
+ domain_blocks_[domain].erase(block);
+
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+ } else {
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+ }
+ break;
+ }
+ case kLocateBlockMessage: {
+ serialization::LocateBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processLocateBlockMessage(sender, proto.block_id());
+ break;
+ }
+ case kGetPeerDomainNetworkAddressesMessage: {
+ serialization::GetPeerDomainNetworkAddressesMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id());
+ break;
+ }
+ case kBlockDomainUnregistrationMessage: {
+ serialization::BlockDomainUnregistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const block_id_domain domain = proto.block_domain();
+
+ domain_network_addresses_.erase(domain);
+
+ for (const block_id block : domain_blocks_[domain]) {
+ block_locations_[block].erase(domain);
+ }
+ domain_blocks_.erase(domain);
+
+ LOG(INFO) << "Unregistered Domain " << domain;
+ break;
+ }
+ case kPoisonMessage: {
+ return;
+ }
+ }
+ }
+}
+
+void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver,
+ const std::string &network_address) {
+ DCHECK_LT(block_domain_, kMaxDomain);
+
+ domain_network_addresses_.emplace(++block_domain_, network_address);
+ domain_blocks_[block_domain_];
+
+ serialization::BlockDomainRegistrationResponseMessage proto;
+ proto.set_block_domain(block_domain_);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainRegistrationResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') to Worker (id '" << receiver << "')";
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ locator_client_id_,
+ receiver,
+ move(message)));
+}
+
+void BlockLocator::processLocateBlockMessage(const client_id receiver,
+ const block_id block) {
+ serialization::LocateBlockResponseMessage proto;
+
+ for (const block_id_domain domain : block_locations_[block]) {
+ proto.add_block_domains(domain);
+ }
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kLocateBlockResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ locator_client_id_,
+ receiver,
+ move(message)));
+}
+
+void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
+ const block_id block) {
+ serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
+
+ for (const block_id_domain domain : block_locations_[block]) {
+ proto.add_domain_network_addresses(domain_network_addresses_[domain]);
+ }
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kGetPeerDomainNetworkAddressesResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+ << kGetPeerDomainNetworkAddressesResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ locator_client_id_,
+ receiver,
+ move(message)));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
new file mode 100644
index 0000000..a158294
--- /dev/null
+++ b/query_execution/BlockLocator.hpp
@@ -0,0 +1,124 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+
+#include <atomic>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class for keeping trace of blocks loaded in a Worker's buffer pool
+ * in the distributed version.
+ **/
+class BlockLocator : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param cpu_id The ID of the CPU to which the BlockLocator thread can be pinned.
+ *
+ * @note If cpu_id is not specified, BlockLocator thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ BlockLocator(tmb::MessageBus *bus,
+ const int cpu_id = -1)
+ : bus_(DCHECK_NOTNULL(bus)),
+ cpu_id_(cpu_id),
+ block_domain_(0) {
+ locator_client_id_ = bus_->Connect();
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
+ }
+
+ ~BlockLocator() override {}
+
+ /**
+ * @brief Get the TMB client ID of BlockLocator thread.
+ *
+ * @return TMB client ID of BlockLocator thread.
+ **/
+ tmb::client_id getBusClientID() const {
+ return locator_client_id_;
+ }
+
+ protected:
+ void run() override;
+
+ private:
+ void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
+ void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
+ void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
+
+ tmb::MessageBus *bus_;
+
+ // The ID of the CPU that the BlockLocator thread can optionally be pinned to.
+ const int cpu_id_;
+
+ std::atomic<block_id_domain> block_domain_;
+
+ // From a block domain to its network info in the ip:port format, i.e.,
+ // "0.0.0.0:0".
+ std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
+
+ // From a block to its domains.
+ std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+
+ // From a block domain to all blocks loaded in its buffer pool.
+ std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
+
+ tmb::client_id locator_client_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(BlockLocator);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 04a0348..d5152f3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -20,6 +20,9 @@ QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs
QueryExecutionMessages.proto)
# Declare micro-libs:
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+endif()
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -40,6 +43,18 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag
add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
# Link dependencies:
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_BlockLocator
+ glog
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_StorageBlockInfo
+ quickstep_threading_Thread
+ quickstep_threading_ThreadUtil
+ quickstep_utility_Macros
+ tmb)
+endif()
target_link_libraries(quickstep_queryexecution_Foreman
glog
gtest
@@ -176,6 +191,11 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryexecution_WorkerSelectionPolicy)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution
+ quickstep_queryexecution_BlockLocator)
+endif()
+
# Tests:
add_executable(Foreman_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 8d2efd0..06972d1 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,6 +16,10 @@ syntax = "proto2";
package quickstep.serialization;
+// Used for any messages that do not carry payloads.
+message EmptyMessage {
+}
+
// Used for both Normal WorkOrders and RebuildWorkOrders.
// NOTE(zuyu): we might need to seperate the completion messages to contain
// run-time information for Foreman to make better decisions on scheduling
@@ -42,3 +46,40 @@ message DataPipelineMessage {
message WorkOrdersAvailableMessage {
required uint64 operator_index = 1;
}
+
+// BlockLocator related messages.
+message BlockDomainRegistrationMessage {
+ // Format IP:Port, i.e., "0.0.0.0:0".
+ required string domain_network_address = 1;
+}
+
+message BlockDomainRegistrationResponseMessage {
+ required uint32 block_domain = 1;
+}
+
+// Used when StorageManager loads or evicts a block or a blob from its buffer
+// pool.
+message BlockLocationMessage {
+ required fixed64 block_id = 1;
+ required uint32 block_domain = 2;
+}
+
+message LocateBlockMessage {
+ required fixed64 block_id = 1;
+}
+
+message LocateBlockResponseMessage {
+ repeated uint32 block_domains = 1;
+}
+
+message GetPeerDomainNetworkAddressesMessage {
+ required fixed64 block_id = 1;
+}
+
+message GetPeerDomainNetworkAddressesResponseMessage {
+ repeated string domain_network_addresses = 1;
+}
+
+message BlockDomainUnregistrationMessage {
+ required uint32 block_domain = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 36cfd82..6f9a88f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
#include "threading/ThreadIDBasedMap.hpp"
#include "tmb/address.h"
@@ -66,6 +67,19 @@ enum QueryExecutionMessageType : message_type_id {
kRebuildWorkOrderMessage, // From Foreman to Worker.
kRebuildWorkOrderCompleteMessage, // From Worker to Foreman.
kPoisonMessage, // From the CLI shell to Foreman, then from Foreman to Workers.
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ // BlockLocator related messages.
+ kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
+ kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker.
+ kAddBlockLocationMessage, // From StorageManager to BlockLocator.
+ kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
+ kLocateBlockMessage, // From StorageManager to BlockLocator.
+ kLocateBlockResponseMessage, // From BlockLocator to StorageManager.
+ kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
+ kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
+ kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator.
+#endif
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 115248c..233b9bb 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -126,6 +126,13 @@ QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
storage_StorageBlockLayout_proto_hdrs
StorageBlockLayout.proto)
+if (ENABLE_DISTRIBUTED)
+ GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
+ storage_DataExchange_proto_hdrs
+ .
+ DataExchange.proto)
+endif()
+
# Declare micro-libs:
add_library(quickstep_storage_AggregationOperationState
AggregationOperationState.cpp
@@ -171,6 +178,12 @@ add_library(quickstep_storage_CompressedTupleStorageSubBlock
CompressedTupleStorageSubBlock.hpp)
add_library(quickstep_storage_CountedReference ../empty_src.cpp CountedReference.hpp)
add_library(quickstep_storage_CSBTreeIndexSubBlock CSBTreeIndexSubBlock.cpp CSBTreeIndexSubBlock.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_storage_DataExchange_proto
+ ${storage_DataExchange_proto_srcs}
+ ${storage_DataExchange_proto_hdrs})
+ add_library(quickstep_storage_DataExchangerAsync DataExchangerAsync.cpp DataExchangerAsync.hpp)
+endif()
add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
@@ -573,6 +586,17 @@ target_link_libraries(quickstep_storage_CSBTreeIndexSubBlock
quickstep_utility_Macros
quickstep_utility_PtrVector
quickstep_utility_ScopedBuffer)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_storage_DataExchange_proto
+ ${PROTOBUF3_LIBRARY})
+ target_link_libraries(quickstep_storage_DataExchangerAsync
+ glog
+ quickstep_storage_DataExchange_proto
+ quickstep_storage_StorageManager
+ quickstep_threading_Thread
+ quickstep_utility_Macros
+ ${GRPCPLUSPLUS_LIBRARIES})
+endif()
target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageConstants
@@ -913,6 +937,7 @@ target_link_libraries(quickstep_storage_StorageManager
gflags_nothreads-static
glog
gtest
+ quickstep_catalog_CatalogTypedefs
quickstep_storage_CountedReference
quickstep_storage_EvictionPolicy
quickstep_storage_FileManager
@@ -929,7 +954,8 @@ target_link_libraries(quickstep_storage_StorageManager
quickstep_utility_Alignment
quickstep_utility_CalculateInstalledMemory
quickstep_utility_Macros
- quickstep_utility_ShardedLockManager)
+ quickstep_utility_ShardedLockManager
+ tmb)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_storage_StorageManager
quickstep_storage_FileManagerHdfs)
@@ -938,6 +964,14 @@ if (QUICKSTEP_HAVE_LIBNUMA)
target_link_libraries(quickstep_storage_StorageManager
${LIBNUMA_LIBRARY})
endif()
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_storage_StorageManager
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_DataExchange_proto
+ ${GRPCPLUSPLUS_LIBRARIES})
+endif()
target_link_libraries(quickstep_storage_SubBlockTypeRegistry
glog
quickstep_storage_StorageBlockLayout_proto
@@ -1051,6 +1085,11 @@ elseif (QUICKSTEP_HAVE_FILE_MANAGER_WINDOWS)
target_link_libraries(quickstep_storage
quickstep_storage_FileManagerWindows)
endif()
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_storage
+ quickstep_storage_DataExchange_proto
+ quickstep_storage_DataExchangerAsync)
+endif()
# CMAKE_VALIDATE_IGNORE_BEGIN
if(QUICKSTEP_HAVE_BITWEAVING)
target_link_libraries(quickstep_storage
@@ -1356,6 +1395,37 @@ target_link_libraries(CSBTreeIndexSubBlock_unittest
${LIBS})
add_test(CSBTreeIndexSubBlock_unittest CSBTreeIndexSubBlock_unittest)
+if (ENABLE_DISTRIBUTED)
+ add_executable(DataExchange_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/DataExchange_unittest.cpp")
+ target_link_libraries(DataExchange_unittest
+ gflags_nothreads-static
+ glog
+ gtest
+ quickstep_catalog_CatalogAttribute
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_CountedReference
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleStorageSubBlock
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_TypedValue
+ quickstep_types_containers_Tuple
+ tmb
+ ${LIBS})
+ add_test(DataExchange_unittest DataExchange_unittest)
+endif()
+
add_executable(EvictionPolicy_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/EvictionPolicy_unittest.cpp")
target_link_libraries(EvictionPolicy_unittest
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/DataExchange.proto
----------------------------------------------------------------------
diff --git a/storage/DataExchange.proto b/storage/DataExchange.proto
new file mode 100644
index 0000000..a2636e5
--- /dev/null
+++ b/storage/DataExchange.proto
@@ -0,0 +1,31 @@
+// Copyright 2016 Pivotal Software, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package quickstep;
+
+service DataExchange {
+ rpc Pull (PullRequest) returns (PullResponse) {}
+}
+
+message PullRequest {
+ fixed64 block_id = 1;
+}
+
+message PullResponse {
+ bool is_valid = 1;
+ uint64 num_slots = 2;
+ bytes block = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/DataExchangerAsync.cpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp
new file mode 100644
index 0000000..68230a7
--- /dev/null
+++ b/storage/DataExchangerAsync.cpp
@@ -0,0 +1,165 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "storage/DataExchangerAsync.hpp"
+
+#include <grpc++/grpc++.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#include "storage/StorageManager.hpp"
+
+#include "glog/logging.h"
+
+using grpc::ServerCompletionQueue;
+
+namespace quickstep {
+namespace {
+
+/**
+ * @brief RPC Request Context Instance.
+ **/
+class CallContext {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param service The async service.
+ * @param queue The RPC request queue.
+ * @param storage_manager The StorageManager to use.
+ **/
+ CallContext(DataExchange::AsyncService *service,
+ ServerCompletionQueue *queue,
+ StorageManager *storage_manager)
+ : service_(service),
+ queue_(queue),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ responder_(&context_),
+ status_(CallStatus::CREATE) {
+ Proceed();
+ }
+
+ /**
+ * @brief Process the RPC request.
+ **/
+ void Proceed();
+
+ private:
+ DataExchange::AsyncService *service_;
+ ServerCompletionQueue *queue_;
+
+ StorageManager *storage_manager_;
+
+ grpc::ServerContext context_;
+
+ PullRequest request_;
+ PullResponse response_;
+
+ grpc::ServerAsyncResponseWriter<PullResponse> responder_;
+
+ enum class CallStatus {
+ CREATE = 0,
+ PROCESS,
+ FINISH,
+ };
+ CallStatus status_;
+};
+
+void CallContext::Proceed() {
+ switch (status_) {
+ case CallStatus::CREATE: {
+ // Change this instance progress to the PROCESS state.
+ status_ = CallStatus::PROCESS;
+
+ // As part of the initial CREATE state, we *request* that the system
+ // start processing Pull requests. In this request, "this" acts are
+ // the tag uniquely identifying the request (so that different CallContext
+ // instances can serve different requests concurrently), in this case
+ // the memory address of this CallContext instance.
+ service_->RequestPull(&context_, &request_, &responder_, queue_, queue_, this);
+ break;
+ }
+ case CallStatus::PROCESS: {
+ // Spawn a new CallContext instance to serve new clients while we process
+ // the one for this CallContext. The instance will deallocate itself as
+ // part of its FINISH state.
+ new CallContext(service_, queue_, storage_manager_);
+
+ // The actual processing.
+ storage_manager_->pullBlockOrBlob(request_.block_id(), &response_);
+
+ // And we are done! Let the gRPC runtime know we've finished, using the
+ // memory address of this instance as the uniquely identifying tag for
+ // the event.
+ status_ = CallStatus::FINISH;
+ responder_.Finish(response_, grpc::Status::OK, this);
+ break;
+ }
+ case CallStatus::FINISH: {
+ // Once in the FINISH state, deallocate ourselves (CallContext).
+ delete this;
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown call status.";
+ }
+}
+
+} // namespace
+
+const char *DataExchangerAsync::kLocalNetworkAddress = "0.0.0.0:";
+
+DataExchangerAsync::DataExchangerAsync() {
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort(kLocalNetworkAddress, grpc::InsecureServerCredentials(), &port_);
+ builder.RegisterService(&service_);
+
+ queue_ = builder.AddCompletionQueue();
+ server_ = builder.BuildAndStart();
+
+ DCHECK_GT(port_, 0);
+ server_address_ = kLocalNetworkAddress + std::to_string(port_);
+ LOG(INFO) << "DataExchangerAsync Service listening on " << server_address_;
+}
+
+void DataExchangerAsync::run() {
+ // Self-destruct upon success.
+ new CallContext(&service_, queue_.get(), storage_manager_);
+
+ void *tag = nullptr; // Uniquely identify a request.
+ bool ok = false;
+
+ while (true) {
+ if (queue_->Next(&tag, &ok)) {
+ CallContext *call_context = static_cast<CallContext*>(tag);
+ if (ok) {
+ call_context->Proceed();
+ } else {
+ LOG(WARNING) << "Not ok\n";
+ delete call_context;
+ }
+ } else {
+ LOG(INFO) << "Shutdown\n";
+ return;
+ }
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/DataExchangerAsync.hpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.hpp b/storage/DataExchangerAsync.hpp
new file mode 100644
index 0000000..75a4e4d
--- /dev/null
+++ b/storage/DataExchangerAsync.hpp
@@ -0,0 +1,97 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+#define QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+
+#include <grpc++/grpc++.h>
+
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+/**
+ * @brief A class which exchanges data from a StorageManager to its peer.
+ **/
+class DataExchangerAsync final : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ DataExchangerAsync();
+
+ ~DataExchangerAsync() override {}
+
+ /**
+ * @brief Set the local StorageManager.
+ *
+ * @param storage_manager The StorageManager to use.
+ **/
+ void set_storage_manager(StorageManager *storage_manager) {
+ storage_manager_ = storage_manager;
+ }
+
+ /**
+ * @brief Return its network address for peers to connect.
+ *
+ * @return Its network address.
+ **/
+ std::string network_address() const {
+ DCHECK(!server_address_.empty());
+ return server_address_;
+ }
+
+ /**
+ * @brief Shutdown itself.
+ **/
+ void shutdown() {
+ server_->Shutdown();
+ // Always shutdown the completion queue after the server.
+ queue_->Shutdown();
+ }
+
+ protected:
+ void run() override;
+
+ private:
+ static const char *kLocalNetworkAddress;
+
+ DataExchange::AsyncService service_;
+
+ int port_ = -1;
+ // Format IP:port, i.e., "0.0.0.0:0".
+ std::string server_address_;
+
+ std::unique_ptr<grpc::ServerCompletionQueue> queue_;
+ std::unique_ptr<grpc::Server> server_;
+
+ StorageManager *storage_manager_ = nullptr;
+
+ DISALLOW_COPY_AND_ASSIGN(DataExchangerAsync);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index dfc95b8..ba9c7f2 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -18,6 +18,7 @@
// This is included before other files so that we can conditionally determine
// what else to include.
#include "catalog/CatalogConfig.h"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
#include "storage/StorageConfig.h"
// Define feature test macros to enable large page support for mmap.
@@ -29,6 +30,10 @@
#include "storage/StorageManager.hpp"
+#ifdef QUICKSTEP_DISTRIBUTED
+#include <grpc++/grpc++.h>
+#endif
+
#if defined(QUICKSTEP_HAVE_MMAP_LINUX_HUGETLB) \
|| defined(QUICKSTEP_HAVE_MMAP_BSD_SUPERPAGE) \
|| defined(QUICKSTEP_HAVE_MMAP_PLAIN)
@@ -52,7 +57,21 @@
#include <unordered_map>
#include <vector>
+#include "catalog/CatalogTypedefs.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#endif
+
#include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#endif
+
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManagerLocal.hpp"
#include "storage/StorageBlob.hpp"
@@ -74,6 +93,13 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#include "tmb/id_typedefs.h"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+#endif
+
using std::free;
using std::int32_t;
using std::memset;
@@ -81,6 +107,15 @@ using std::size_t;
using std::string;
using std::vector;
+#ifdef QUICKSTEP_DISTRIBUTED
+using std::malloc;
+using std::move;
+using std::unique_ptr;
+
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+#endif
+
namespace quickstep {
static bool ValidateBlockDomain(const char *flagname,
@@ -157,14 +192,21 @@ DEFINE_bool(use_hdfs, false, "Use HDFS as the persistent storage, instead of the
#endif
StorageManager::StorageManager(
- const std::string &storage_path,
- const block_id_domain block_domain,
- const size_t max_memory_usage,
- EvictionPolicy *eviction_policy)
+ const std::string &storage_path,
+ const block_id_domain block_domain,
+ const size_t max_memory_usage,
+ EvictionPolicy *eviction_policy,
+ const tmb::client_id block_locator_client_id,
+ tmb::MessageBus *bus)
: storage_path_(storage_path),
total_memory_usage_(0),
max_memory_usage_(max_memory_usage),
- eviction_policy_(eviction_policy) {
+ eviction_policy_(eviction_policy),
+#ifdef QUICKSTEP_DISTRIBUTED
+ block_domain_(block_domain),
+#endif
+ block_locator_client_id_(block_locator_client_id),
+ bus_(bus) {
#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
if (FLAGS_use_hdfs) {
file_manager_.reset(new FileManagerHdfs(storage_path));
@@ -175,10 +217,55 @@ StorageManager::StorageManager(
file_manager_.reset(new FileManagerLocal(storage_path));
#endif
+#ifdef QUICKSTEP_DISTRIBUTED
+ // NOTE(zuyu): The following if-condition is a workaround to bypass code for
+ // the distributed version in some unittests that does not use TMB. The
+ // end-to-end functional tests for the distributed version, however, would not
+ // be affected.
+ if (bus_) {
+ storage_manager_client_id_ = bus_->Connect();
+
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') starts with Domain " << block_domain;
+ }
+#endif
+
block_index_ = BlockIdUtil::GetBlockId(block_domain, file_manager_->getMaxUsedBlockCounter(block_domain));
}
StorageManager::~StorageManager() {
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ serialization::BlockDomainUnregistrationMessage proto;
+ proto.set_block_domain(block_domain_);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainUnregistrationMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockDomainUnregistrationMessage (typed '" << kBlockDomainUnregistrationMessage
+ << "') to BlockLocator";
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+ }
+#endif
+
for (std::unordered_map<block_id, BlockHandle>::iterator it = blocks_.begin();
it != blocks_.end();
++it) {
@@ -221,6 +308,12 @@ block_id StorageManager::createBlock(const CatalogRelationSchema &relation,
// Make '*eviction_policy_' aware of the new block's existence.
eviction_policy_->blockCreated(new_block_id);
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+ }
+#endif
+
return new_block_id;
}
@@ -248,6 +341,12 @@ block_id StorageManager::createBlob(const std::size_t num_slots,
// Make '*eviction_policy_' aware of the new blob's existence.
eviction_policy_->blockCreated(new_block_id);
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+ }
+#endif
+
return new_block_id;
}
@@ -314,6 +413,12 @@ bool StorageManager::saveBlockOrBlob(const block_id block, const bool force) {
}
void StorageManager::evictBlockOrBlob(const block_id block) {
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(block, kDeleteBlockLocationMessage);
+ }
+#endif
+
BlockHandle handle;
{
SpinSharedMutexExclusiveLock<false> write_lock(blocks_shared_mutex_);
@@ -361,11 +466,187 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
return ++block_index_;
}
+#ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::pullBlockOrBlob(const block_id block,
+ PullResponse *response) const {
+ SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
+ std::unordered_map<block_id, BlockHandle>::const_iterator cit = blocks_.find(block);
+ if (cit != blocks_.end()) {
+ response->set_is_valid(true);
+
+ const BlockHandle &block_handle = cit->second;
+ const std::size_t num_slots = block_handle.block_memory_size;
+
+ response->set_num_slots(num_slots);
+ response->set_block(block_handle.block_memory,
+ num_slots * kSlotSizeBytes);
+ } else {
+ response->set_is_valid(false);
+ }
+}
+
+StorageManager::DataExchangerClientAsync::DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+ StorageManager *storage_manager)
+ : stub_(DataExchange::NewStub(channel)),
+ storage_manager_(storage_manager) {
+}
+
+bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
+ const numa_node_id numa_node,
+ BlockHandle *block_handle) {
+ grpc::ClientContext context;
+
+ PullRequest request;
+ request.set_block_id(block);
+
+ grpc::CompletionQueue queue;
+
+ unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
+ stub_->AsyncPull(&context, request, &queue));
+
+ PullResponse response;
+ grpc::Status status;
+
+ rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
+
+ void *got_tag;
+ bool ok = false;
+
+ queue.Next(&got_tag, &ok);
+ CHECK(got_tag == reinterpret_cast<void*>(1));
+ CHECK(ok);
+
+ if (!status.ok()) {
+ LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed";
+ return false;
+ }
+
+ if (!response.is_valid()) {
+ LOG(INFO) << "The pulling block not found in all the peers";
+ return false;
+ }
+
+ const size_t num_slots = response.num_slots();
+ DCHECK_NE(num_slots, 0u);
+
+ const string &block_content = response.block();
+ DCHECK_EQ(kSlotSizeBytes * num_slots, block_content.size());
+
+ void *block_buffer = storage_manager_->allocateSlots(num_slots, numa_node);
+
+ block_handle->block_memory =
+ std::memcpy(block_buffer, block_content.c_str(), block_content.size());
+ block_handle->block_memory_size = num_slots;
+
+ return true;
+}
+
+vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
+ serialization::GetPeerDomainNetworkAddressesMessage proto;
+ proto.set_block_id(block);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kGetPeerDomainNetworkAddressesMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+ << "') to BlockLocator";
+
+ DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+ DCHECK(bus_ != nullptr);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+
+ const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+
+ serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ vector<string> domain_network_addresses;
+ for (int i = 0; i < response_proto.domain_network_addresses_size(); ++i) {
+ domain_network_addresses.push_back(response_proto.domain_network_addresses(i));
+ }
+
+ return domain_network_addresses;
+}
+
+void StorageManager::sendBlockLocationMessage(const block_id block,
+ const tmb::message_type_id message_type) {
+ switch (message_type) {
+ case kAddBlockLocationMessage:
+ LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ break;
+ case kDeleteBlockLocationMessage:
+ LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ break;
+ default:
+ LOG(FATAL) << "Unknown message type " << message_type;
+ }
+
+ serialization::BlockLocationMessage proto;
+ proto.set_block_id(block);
+ proto.set_block_domain(block_domain_);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ message_type);
+ free(proto_bytes);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockLocationMessage (typed '" << message_type
+ << "') to BlockLocator";
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+}
+#endif
+
StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
const block_id block, const int numa_node) {
// The caller of this function holds an exclusive lock on this block/blob's
// mutex in the lock manager. The caller has ensured that the block is not
// already loaded before this function gets called.
+ BlockHandle loaded_handle;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ // TODO(quickstep-team): Use a cost model to determine whether to load from
+ // a remote peer or the disk.
+ if (BlockIdUtil::Domain(block) != block_domain_) {
+ LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+ const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
+ for (const string &peer_domain_network_address : peer_domain_network_addresses) {
+ DataExchangerClientAsync client(
+ grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
+ this);
+
+ if (client.Pull(block, numa_node, &loaded_handle)) {
+ sendBlockLocationMessage(block, kAddBlockLocationMessage);
+ return loaded_handle;
+ }
+ }
+
+ LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+ << " from remote peers, so try to load from disk.";
+ }
+#endif
+
size_t num_slots = file_manager_->numSlots(block);
DEBUG_ASSERT(num_slots != 0);
void *block_buffer = allocateSlots(num_slots, numa_node);
@@ -373,10 +654,15 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
CHECK(status) << "Failed to read block from persistent storage: " << block;
- BlockHandle loaded_handle;
loaded_handle.block_memory = block_buffer;
loaded_handle.block_memory_size = num_slots;
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(block, kAddBlockLocationMessage);
+ }
+#endif
+
return loaded_handle;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 52326c2..7d0661e 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -20,12 +20,21 @@
#include <atomic>
#include <chrono>
+#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
+
+#include "catalog/CatalogTypedefs.hpp"
#include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#endif
+
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManager.hpp"
#include "storage/StorageBlob.hpp"
@@ -40,6 +49,14 @@
#include "gflags/gflags.h"
#include "gtest/gtest_prod.h"
+#include "tmb/id_typedefs.h"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+namespace grpc { class Channel; }
+#endif
+
+namespace tmb { class MessageBus; }
+
namespace quickstep {
DECLARE_int32(block_domain);
@@ -50,6 +67,11 @@ DECLARE_bool(use_hdfs);
#endif
class CatalogRelationSchema;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+class PullResponse;
+#endif
+
class StorageBlockLayout;
/** \addtogroup Storage
@@ -104,6 +126,33 @@ class StorageManager {
std::chrono::milliseconds(200))) {
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Constructor.
+ * @param storage_path The filesystem directory where blocks have persistent
+ * storage.
+ * @param block_domain The unique block domain.
+ * @param block_locator_client_id The TMB client ID of the block locator.
+ * @param bus A pointer to the TMB.
+ *
+ * @exception CorruptPersistentStorage The storage directory layout is not
+ * in the expected format.
+ **/
+ StorageManager(const std::string &storage_path,
+ const block_id_domain block_domain,
+ const tmb::client_id block_locator_client_id,
+ tmb::MessageBus *bus)
+ : StorageManager(storage_path,
+ block_domain,
+ FLAGS_buffer_pool_slots,
+ LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
+ 2,
+ std::chrono::milliseconds(200)),
+ block_locator_client_id,
+ bus) {
+ }
+#endif
+
/**
* @brief Constructor.
* @param storage_path The filesystem directory where blocks have persistent
@@ -121,13 +170,17 @@ class StorageManager {
* @param eviction_policy The eviction policy that the storage manager should
* use to manage the cache. The storage manager takes
* ownership of *eviction_policy.
+ * @param block_locator_client_id The TMB client ID of the block locator.
+ * @param bus A pointer to the TMB.
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
**/
StorageManager(const std::string &storage_path,
const block_id_domain block_domain,
const size_t max_memory_usage,
- EvictionPolicy *eviction_policy);
+ EvictionPolicy *eviction_policy,
+ const tmb::client_id block_locator_client_id = tmb::kClientIdNone,
+ tmb::MessageBus *bus = nullptr);
/**
* @brief Destructor which also destroys all managed blocks.
@@ -325,6 +378,16 @@ class StorageManager {
**/
bool blockOrBlobIsLoadedAndDirty(const block_id block);
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Pull a block or a blob.
+ *
+ * @param block The id of the block or blob.
+ * @param response Where to store the pulled block content.
+ **/
+ void pullBlockOrBlob(const block_id block, PullResponse *response) const;
+#endif
+
private:
struct BlockHandle {
void *block_memory;
@@ -332,6 +395,63 @@ class StorageManager {
StorageBlockBase *block;
};
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief A class which connects to DataExchangerAsync to exchange data from
+ * remote peers.
+ **/
+ class DataExchangerClientAsync {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param channel The RPC channel to connect DataExchangerAsync.
+ * @param storage_manager The StorageManager to use.
+ */
+ DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+ StorageManager *storage_manager);
+
+ /**
+ * @brief Pull a block or blob from a remote StorageManager.
+ *
+ * @param block The block or blob to pull.
+ * @param numa_node The NUMA node for placing this block.
+ * @param block_handle Where the pulled block or blob stores.
+ *
+ * @return Whether the pull operation is successful or not.
+ */
+ bool Pull(const block_id block,
+ const numa_node_id numa_node,
+ BlockHandle *block_handle);
+
+ private:
+ std::unique_ptr<DataExchange::Stub> stub_;
+
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(DataExchangerClientAsync);
+ };
+
+ /**
+ * @brief Get the network info of all the remote StorageManagers which may
+ * load the given block in the buffer pool.
+ *
+ * @param block The block or blob to pull.
+ *
+ * @return The network info of all the possible peers to pull.
+ **/
+ std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
+
+ /**
+ * @brief Update the block location info in BlockLocator.
+ *
+ * @param block The given block or blob.
+ * @param message_type Indicate whether to add or delete a block location.
+ **/
+ void sendBlockLocationMessage(const block_id block,
+ const tmb::message_type_id message_type);
+#endif
+
// Helper for createBlock() and createBlob(). Allocates a block ID and memory
// slots for a new StorageBlock or StorageBlob. Returns the allocated ID and
// writes the allocated slot range into 'handle->slot_index_low' and
@@ -459,6 +579,15 @@ class StorageManager {
std::unique_ptr<EvictionPolicy> eviction_policy_;
+#ifdef QUICKSTEP_DISTRIBUTED
+ const block_id_domain block_domain_;
+
+ tmb::client_id storage_manager_client_id_;
+#endif
+
+ const tmb::client_id block_locator_client_id_;
+ tmb::MessageBus *bus_;
+
std::unique_ptr<FileManager> file_manager_;
// Used to generate unique IDs in allocateNewBlockOrBlob().
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
new file mode 100644
index 0000000..83309b1
--- /dev/null
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -0,0 +1,239 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/CountedReference.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/Tuple.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class DataExchangeTest : public ::testing::Test {
+ protected:
+ static const char kStoragePath[];
+ static const char kCheckedDomainNetworkAddress[];
+
+ ~DataExchangeTest() {
+ data_exchanger_expected_.join();
+ locator_->join();
+ }
+
+ virtual void SetUp() {
+ bus_.Initialize();
+
+ locator_.reset(new BlockLocator(&bus_));
+ locator_client_id_ = locator_->getBusClientID();
+ locator_->start();
+
+ worker_client_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
+
+ bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
+
+ storage_manager_expected_.reset(new StorageManager(
+ kStoragePath,
+ getBlockDomain(data_exchanger_expected_.network_address()),
+ locator_client_id_,
+ &bus_));
+
+ data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get());
+ data_exchanger_expected_.start();
+
+ storage_manager_checked_.reset(new StorageManager(
+ kStoragePath,
+ getBlockDomain(kCheckedDomainNetworkAddress),
+ locator_client_id_,
+ &bus_));
+ }
+
+ virtual void TearDown() {
+ storage_manager_checked_.reset();
+
+ data_exchanger_expected_.shutdown();
+ storage_manager_expected_.reset();
+
+ serialization::EmptyMessage proto;
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kPoisonMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') sent PoisonMessage (typed '" << kPoisonMessage
+ << "') to BlockLocator (id '" << locator_client_id_ << "')";
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ worker_client_id_,
+ locator_client_id_,
+ move(message)));
+ }
+
+ unique_ptr<StorageManager> storage_manager_expected_, storage_manager_checked_;
+
+ private:
+ block_id_domain getBlockDomain(const string &network_address) {
+ serialization::BlockDomainRegistrationMessage proto;
+ proto.set_domain_network_address(network_address);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainRegistrationMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+ << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ worker_client_id_,
+ locator_client_id_,
+ move(message)));
+
+ const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ EXPECT_EQ(locator_client_id_, annotated_message.sender);
+ EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') received BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') from BlockLocator";
+
+ quickstep::serialization::BlockDomainRegistrationResponseMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ return static_cast<block_id_domain>(response_proto.block_domain());
+ }
+
+ MessageBusImpl bus_;
+
+ unique_ptr<BlockLocator> locator_;
+ tmb::client_id locator_client_id_;
+
+ tmb::client_id worker_client_id_;
+
+ DataExchangerAsync data_exchanger_expected_;
+};
+
+const char DataExchangeTest::kStoragePath[] = "./data_exchange_test_data/";
+const char DataExchangeTest::kCheckedDomainNetworkAddress[] = "0.0.0.0:0";
+
+TEST_F(DataExchangeTest, BlockPull) {
+ CatalogRelation relation(nullptr, "rel");
+ const attribute_id attr_id =
+ relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
+
+ const block_id block =
+ storage_manager_expected_->createBlock(relation, relation.getDefaultStorageBlockLayout());
+
+ {
+ MutableBlockReference block_expected = storage_manager_expected_->getBlockMutable(block, relation);
+
+ // Insert a tuple.
+ const int value_expected = -1;
+ {
+ vector<TypedValue> attrs(1, TypedValue(value_expected));
+ const Tuple tuple(move(attrs));
+
+ EXPECT_TRUE(block_expected->insertTuple(tuple));
+ }
+
+ const BlockReference block_checked = storage_manager_checked_->getBlock(block, relation);
+ EXPECT_FALSE(block_checked->isBlob());
+
+ const TupleStorageSubBlock &tuple_store_checked = block_checked->getTupleStorageSubBlock();
+
+ EXPECT_EQ(1, tuple_store_checked.numTuples());
+ EXPECT_EQ(value_expected, tuple_store_checked.getAttributeValueTyped(0 /* tuple_id */, attr_id).getLiteral<int>());
+ }
+
+ storage_manager_checked_->deleteBlockOrBlobFile(block);
+ storage_manager_expected_->deleteBlockOrBlobFile(block);
+}
+
+TEST_F(DataExchangeTest, BlobPull) {
+ const block_id blob = storage_manager_expected_->createBlob(kDefaultBlockSizeInSlots);
+ {
+ const BlobReference blob_expected = storage_manager_expected_->getBlob(blob);
+ const BlobReference blob_checked = storage_manager_checked_->getBlob(blob);
+ EXPECT_TRUE(blob_checked->isBlob());
+ EXPECT_EQ(blob, blob_checked->getID());
+ }
+
+ storage_manager_checked_->deleteBlockOrBlobFile(blob);
+ storage_manager_expected_->deleteBlockOrBlobFile(blob);
+}
+
+} // namespace quickstep
+
+int main(int argc, char **argv) {
+ google::InitGoogleLogging(argv[0]);
+ // Honor FLAGS_buffer_pool_slots in StorageManager.
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ ::testing::InitGoogleTest(&argc, argv);
+
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/third_party/iwyu/iwyu_helper.py
----------------------------------------------------------------------
diff --git a/third_party/iwyu/iwyu_helper.py b/third_party/iwyu/iwyu_helper.py
index a204c50..1d3a76f 100755
--- a/third_party/iwyu/iwyu_helper.py
+++ b/third_party/iwyu/iwyu_helper.py
@@ -21,6 +21,7 @@ QUICKSTEP_INCLUDES = [ '.',
'./build/third_party',
'./build/third_party/protobuf/include',
'./build/third_party/gflags/include',
+ './build/third_party/tmb/include',
'./third_party/benchmark/include',
'./third_party/glog/src',
'./third_party/googletest/googletest/include',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index c7b5883..7dd6fc5 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -17,7 +17,7 @@ TODO List / Known Issues & Limitations:
"""
# Copyright 2011-2015 Quickstep Technologies LLC.
-# Copyright 2015 Pivotal Software, Inc.
+# Copyright 2015-2016 Pivotal Software, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,7 +40,8 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
# Explicitly ignored dependencies (special headers with no other quickstep
# dependencies).
IGNORED_DEPENDENCIES = frozenset(
- ["quickstep_threading_WinThreadsAPI",
+ ["quickstep_storage_DataExchange.grpc_proto",
+ "quickstep_threading_WinThreadsAPI",
"quickstep_utility_textbasedtest_TextBasedTest",
"quickstep_utility_textbasedtest_TextBasedTestDriver",
"quickstep_storage_bitweaving_BitWeavingHIndexSubBlock",