You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/30 23:19:23 UTC
[48/50] [abbrv] incubator-quickstep git commit: Added Async
DataExchange Service.
Added Async DataExchange Service.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/50b4e55a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/50b4e55a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/50b4e55a
Branch: refs/heads/work-order-serialization
Commit: 50b4e55a48c97fd5ce9800d8db92d64ee59ad2e3
Parents: 2221b7e
Author: Zuyu Zhang <zz...@pivotal.io>
Authored: Sat May 28 22:55:05 2016 -0700
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Mon May 30 15:47:54 2016 -0700
----------------------------------------------------------------------
CMakeLists.txt | 11 +-
storage/CMakeLists.txt | 69 +++++++-
storage/DataExchange.proto | 31 ++++
storage/DataExchangerAsync.cpp | 165 ++++++++++++++++++
storage/DataExchangerAsync.hpp | 97 +++++++++++
storage/StorageManager.cpp | 113 ++++++++++++-
storage/StorageManager.hpp | 61 ++++++-
storage/tests/DataExchange_unittest.cpp | 240 +++++++++++++++++++++++++++
third_party/iwyu/iwyu_helper.py | 3 +-
validate_cmakelists.py | 5 +-
10 files changed, 787 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index dc51ca6..ef7fd50 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -662,7 +662,12 @@ set(ENABLE_PUREMEMORY ON CACHE BOOL "Enable PureMemory TMB")
set(ENABLE_LEVELDB OFF CACHE BOOL "Enable LevelDB TMB")
set(ENABLE_MEMORYMIRROR OFF CACHE BOOL "Enable MemoryMirror TMB")
set(ENABLE_NATIVELOG OFF CACHE BOOL "Enable NativeLog TMB")
-set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+
+# The distributed version requires to use the NativeNet implementation.
+if (NOT ENABLE_DISTRIBUTED)
+ set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+endif()
+
set(ENABLE ENABLE_SQLITE OFF CACHE BOOL "Enable SQLite TMB")
set(ENABLE_VOLTDB OFF CACHE BOOL "Enable VoltDB TMB")
set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
@@ -670,6 +675,10 @@ set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
add_subdirectory("${THIRD_PARTY_SOURCE_DIR}/tmb" "${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb")
include_directories(${TMB_INCLUDE_DIRS})
+if (ENABLE_DISTRIBUTED)
+ include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
+endif()
+
# Add all of the module subdirectories. CMakeLists.txt in each of the subdirectories
# defines how to build that module's libraries.
add_subdirectory(catalog)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 4da16ea..a77976a 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -126,6 +126,13 @@ QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
storage_StorageBlockLayout_proto_hdrs
StorageBlockLayout.proto)
+if (ENABLE_DISTRIBUTED)
+ GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
+ storage_DataExchange_proto_hdrs
+ .
+ DataExchange.proto)
+endif()
+
# Declare micro-libs:
add_library(quickstep_storage_AggregationOperationState
AggregationOperationState.cpp
@@ -171,6 +178,14 @@ add_library(quickstep_storage_CompressedTupleStorageSubBlock
CompressedTupleStorageSubBlock.hpp)
add_library(quickstep_storage_CountedReference ../empty_src.cpp CountedReference.hpp)
add_library(quickstep_storage_CSBTreeIndexSubBlock CSBTreeIndexSubBlock.cpp CSBTreeIndexSubBlock.hpp)
+
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_storage_DataExchange_proto
+ ${storage_DataExchange_proto_srcs}
+ ${storage_DataExchange_proto_hdrs})
+ add_library(quickstep_storage_DataExchangerAsync DataExchangerAsync.cpp DataExchangerAsync.hpp)
+endif()
+
add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
@@ -575,6 +590,19 @@ target_link_libraries(quickstep_storage_CSBTreeIndexSubBlock
quickstep_utility_Macros
quickstep_utility_PtrVector
quickstep_utility_ScopedBuffer)
+
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_storage_DataExchange_proto
+ ${PROTOBUF3_LIBRARY})
+ target_link_libraries(quickstep_storage_DataExchangerAsync
+ glog
+ quickstep_storage_DataExchange_proto
+ quickstep_storage_StorageManager
+ quickstep_threading_Thread
+ quickstep_utility_Macros
+ ${GRPCPLUSPLUS_LIBRARIES})
+endif()
+
target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageConstants
@@ -925,6 +953,7 @@ target_link_libraries(quickstep_storage_StorageManager
gflags_nothreads-static
glog
gtest
+ quickstep_catalog_CatalogTypedefs
quickstep_storage_CountedReference
quickstep_storage_EvictionPolicy
quickstep_storage_FileManager
@@ -955,7 +984,9 @@ if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_storage_StorageManager
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
- quickstep_queryexecution_QueryExecutionUtil)
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_DataExchange_proto
+ ${GRPCPLUSPLUS_LIBRARIES})
endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_storage_SubBlockTypeRegistry
glog
@@ -1071,6 +1102,11 @@ elseif (QUICKSTEP_HAVE_FILE_MANAGER_WINDOWS)
target_link_libraries(quickstep_storage
quickstep_storage_FileManagerWindows)
endif()
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_storage
+ quickstep_storage_DataExchange_proto
+ quickstep_storage_DataExchangerAsync)
+endif()
# CMAKE_VALIDATE_IGNORE_BEGIN
if(QUICKSTEP_HAVE_BITWEAVING)
target_link_libraries(quickstep_storage
@@ -1340,6 +1376,37 @@ target_link_libraries(CompressedPackedRowStoreTupleStorageSubBlock_unittest
${LIBS})
add_test(CompressedPackedRowStoreTupleStorageSubBlock_unittest CompressedPackedRowStoreTupleStorageSubBlock_unittest)
+if (ENABLE_DISTRIBUTED)
+ add_executable(DataExchange_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/DataExchange_unittest.cpp")
+ target_link_libraries(DataExchange_unittest
+ gflags_nothreads-static
+ glog
+ gtest
+ quickstep_catalog_CatalogAttribute
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_CountedReference
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleStorageSubBlock
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_TypedValue
+ quickstep_types_containers_Tuple
+ tmb
+ ${LIBS})
+ add_test(DataExchange_unittest DataExchange_unittest)
+endif(ENABLE_DISTRIBUTED)
+
add_executable(CSBTreeIndexSubBlock_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/CSBTreeIndexSubBlock_unittest.cpp")
target_link_libraries(CSBTreeIndexSubBlock_unittest
gtest
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/storage/DataExchange.proto
----------------------------------------------------------------------
diff --git a/storage/DataExchange.proto b/storage/DataExchange.proto
new file mode 100644
index 0000000..a2636e5
--- /dev/null
+++ b/storage/DataExchange.proto
@@ -0,0 +1,31 @@
+// Copyright 2016 Pivotal Software, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package quickstep;
+
+service DataExchange {
+ rpc Pull (PullRequest) returns (PullResponse) {}
+}
+
+message PullRequest {
+ fixed64 block_id = 1;
+}
+
+message PullResponse {
+ bool is_valid = 1;
+ uint64 num_slots = 2;
+ bytes block = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/storage/DataExchangerAsync.cpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp
new file mode 100644
index 0000000..78c6565
--- /dev/null
+++ b/storage/DataExchangerAsync.cpp
@@ -0,0 +1,165 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "storage/DataExchangerAsync.hpp"
+
+#include <grpc++/grpc++.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#include "storage/StorageManager.hpp"
+
+#include "glog/logging.h"
+
+using grpc::ServerCompletionQueue;
+
+namespace quickstep {
+namespace {
+
+/**
+ * @brief RPC Request Context Instance.
+ **/
+class CallContext {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param service The async service.
+ * @param queue The RPC request queue.
+ * @param storage_manager The StorageManager to use.
+ **/
+ CallContext(DataExchange::AsyncService *service,
+ ServerCompletionQueue *queue,
+ StorageManager *storage_manager)
+ : service_(service),
+ queue_(queue),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ responder_(&context_),
+ status_(CallStatus::CREATE) {
+ Proceed();
+ }
+
+ /**
+ * @brief Process the RPC request.
+ **/
+ void Proceed();
+
+ private:
+ DataExchange::AsyncService *service_;
+ ServerCompletionQueue *queue_;
+
+ StorageManager *storage_manager_;
+
+ grpc::ServerContext context_;
+
+ PullRequest request_;
+ PullResponse response_;
+
+ grpc::ServerAsyncResponseWriter<PullResponse> responder_;
+
+ enum class CallStatus {
+ CREATE = 0,
+ PROCESS,
+ FINISH,
+ };
+ CallStatus status_;
+};
+
+void CallContext::Proceed() {
+ switch (status_) {
+ case CallStatus::CREATE: {
+ // Change this instance progress to the PROCESS state.
+ status_ = CallStatus::PROCESS;
+
+ // As part of the initial CREATE state, we *request* that the system
+ // start processing Pull requests. In this request, "this" acts are
+ // the tag uniquely identifying the request (so that different CallContext
+ // instances can serve different requests concurrently), in this case
+ // the memory address of this CallContext instance.
+ service_->RequestPull(&context_, &request_, &responder_, queue_, queue_, this);
+ break;
+ }
+ case CallStatus::PROCESS: {
+ // Spawn a new CallContext instance to serve new clients while we process
+ // the one for this CallContext. The instance will deallocate itself as
+ // part of its FINISH state.
+ new CallContext(service_, queue_, storage_manager_);
+
+ // The actual processing.
+ storage_manager_->pullBlockOrBlob(request_.block_id(), &response_);
+
+ // And we are done! Let the gRPC runtime know we've finished, using the
+ // memory address of this instance as the uniquely identifying tag for
+ // the event.
+ status_ = CallStatus::FINISH;
+ responder_.Finish(response_, grpc::Status::OK, this);
+ break;
+ }
+ case CallStatus::FINISH: {
+ // Once in the FINISH state, deallocate ourselves (CallContext).
+ delete this;
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown call status.";
+ }
+}
+
+} // namespace
+
+const char *DataExchangerAsync::kLocalNetworkAddress = "0.0.0.0:";
+
+DataExchangerAsync::DataExchangerAsync() {
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort(kLocalNetworkAddress, grpc::InsecureServerCredentials(), &port_);
+ builder.RegisterService(&service_);
+
+ queue_ = builder.AddCompletionQueue();
+ server_ = builder.BuildAndStart();
+
+ DCHECK_GT(port_, 0);
+ server_address_ = kLocalNetworkAddress + std::to_string(port_);
+ LOG(INFO) << "DataExchangerAsync Service listening on " << server_address_;
+}
+
+void DataExchangerAsync::run() {
+ // Self-destruct upon success.
+ new CallContext(&service_, queue_.get(), storage_manager_);
+
+ void *tag = nullptr; // Uniquely identify a request.
+ bool ok = false;
+
+ while (true) {
+ if (queue_->Next(&tag, &ok)) {
+ CallContext *call_context = static_cast<CallContext*>(tag);
+ if (ok) {
+ call_context->Proceed();
+ } else {
+ LOG(WARNING) << "Not ok\n";
+ delete call_context;
+ }
+ } else {
+ LOG(INFO) << "Shutdown\n";
+ return;
+ }
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/storage/DataExchangerAsync.hpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.hpp b/storage/DataExchangerAsync.hpp
new file mode 100644
index 0000000..75a4e4d
--- /dev/null
+++ b/storage/DataExchangerAsync.hpp
@@ -0,0 +1,97 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+#define QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+
+#include <grpc++/grpc++.h>
+
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+/**
+ * @brief A class which exchanges data from a StorageManager to its peer.
+ **/
+class DataExchangerAsync final : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ DataExchangerAsync();
+
+ ~DataExchangerAsync() override {}
+
+ /**
+ * @brief Set the local StorageManager.
+ *
+ * @param storage_manager The StorageManager to use.
+ **/
+ void set_storage_manager(StorageManager *storage_manager) {
+ storage_manager_ = storage_manager;
+ }
+
+ /**
+ * @brief Return its network address for peers to connect.
+ *
+ * @return Its network address.
+ **/
+ std::string network_address() const {
+ DCHECK(!server_address_.empty());
+ return server_address_;
+ }
+
+ /**
+ * @brief Shutdown itself.
+ **/
+ void shutdown() {
+ server_->Shutdown();
+ // Always shutdown the completion queue after the server.
+ queue_->Shutdown();
+ }
+
+ protected:
+ void run() override;
+
+ private:
+ static const char *kLocalNetworkAddress;
+
+ DataExchange::AsyncService service_;
+
+ int port_ = -1;
+ // Format IP:port, i.e., "0.0.0.0:0".
+ std::string server_address_;
+
+ std::unique_ptr<grpc::ServerCompletionQueue> queue_;
+ std::unique_ptr<grpc::Server> server_;
+
+ StorageManager *storage_manager_ = nullptr;
+
+ DISALLOW_COPY_AND_ASSIGN(DataExchangerAsync);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 15e2503..8fc1224 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -41,6 +41,10 @@
#include <numaif.h>
#endif
+#ifdef QUICKSTEP_DISTRIBUTED
+#include <grpc++/grpc++.h>
+#endif
+
#include <atomic>
#include <cerrno>
#include <cstddef>
@@ -53,6 +57,8 @@
#include <unordered_map>
#include <vector>
+#include "catalog/CatalogTypedefs.hpp"
+
#ifdef QUICKSTEP_DISTRIBUTED
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
@@ -60,6 +66,12 @@
#endif
#include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#endif
+
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManagerLocal.hpp"
#include "storage/StorageBlob.hpp"
@@ -456,6 +468,80 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
}
#ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::pullBlockOrBlob(const block_id block,
+ PullResponse *response) const {
+ SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
+ std::unordered_map<block_id, BlockHandle>::const_iterator cit = blocks_.find(block);
+ if (cit != blocks_.end()) {
+ response->set_is_valid(true);
+
+ const BlockHandle &block_handle = cit->second;
+ const std::size_t num_slots = block_handle.block_memory_size;
+
+ response->set_num_slots(num_slots);
+ response->set_block(block_handle.block_memory,
+ num_slots * kSlotSizeBytes);
+ } else {
+ response->set_is_valid(false);
+ }
+}
+
+StorageManager::DataExchangerClientAsync::DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+ StorageManager *storage_manager)
+ : stub_(DataExchange::NewStub(channel)),
+ storage_manager_(storage_manager) {
+}
+
+bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
+ const numa_node_id numa_node,
+ BlockHandle *block_handle) {
+ grpc::ClientContext context;
+
+ PullRequest request;
+ request.set_block_id(block);
+
+ grpc::CompletionQueue queue;
+
+ unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
+ stub_->AsyncPull(&context, request, &queue));
+
+ PullResponse response;
+ grpc::Status status;
+
+ rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
+
+ void *got_tag;
+ bool ok = false;
+
+ queue.Next(&got_tag, &ok);
+ CHECK(got_tag == reinterpret_cast<void*>(1));
+ CHECK(ok);
+
+ if (!status.ok()) {
+ LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed";
+ return false;
+ }
+
+ if (!response.is_valid()) {
+ LOG(INFO) << "The pulling block not found in all the peers";
+ return false;
+ }
+
+ const size_t num_slots = response.num_slots();
+ DCHECK_NE(num_slots, 0u);
+
+ const string &block_content = response.block();
+ DCHECK_EQ(kSlotSizeBytes * num_slots, block_content.size());
+
+ void *block_buffer = storage_manager_->allocateSlots(num_slots, numa_node);
+
+ block_handle->block_memory =
+ std::memcpy(block_buffer, block_content.c_str(), block_content.size());
+ block_handle->block_memory_size = num_slots;
+
+ return true;
+}
+
vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
serialization::BlockMessage proto;
proto.set_block_id(block);
@@ -541,14 +627,37 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
// The caller of this function holds an exclusive lock on this block/blob's
// mutex in the lock manager. The caller has ensured that the block is not
// already loaded before this function gets called.
- size_t num_slots = file_manager_->numSlots(block);
+ BlockHandle loaded_handle;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ // TODO(quickstep-team): Use a cost model to determine whether to load from
+ // a remote peer or the disk.
+ if (BlockIdUtil::Domain(block) != block_domain_) {
+ LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+ const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
+ for (const string &peer_domain_network_address : peer_domain_network_addresses) {
+ DataExchangerClientAsync client(
+ grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
+ this);
+
+ if (client.Pull(block, numa_node, &loaded_handle)) {
+ sendBlockLocationMessage(block, kAddBlockLocationMessage);
+ return loaded_handle;
+ }
+ }
+
+ LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+ << " from remote peers, so try to load from disk.";
+ }
+#endif
+
+ const size_t num_slots = file_manager_->numSlots(block);
DEBUG_ASSERT(num_slots != 0);
void *block_buffer = allocateSlots(num_slots, numa_node);
const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
CHECK(status) << "Failed to read block from persistent storage: " << block;
- BlockHandle loaded_handle;
loaded_handle.block_memory = block_buffer;
loaded_handle.block_memory_size = num_slots;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 55a011e..50ddb0f 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -26,9 +26,14 @@
#include <unordered_map>
#include <vector>
+#include "catalog/CatalogTypedefs.hpp"
#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
-
#include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#endif
+
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManager.hpp"
#include "storage/StorageBlob.hpp"
@@ -45,6 +50,10 @@
#include "tmb/id_typedefs.h"
+#ifdef QUICKSTEP_DISTRIBUTED
+namespace grpc { class Channel; }
+#endif
+
namespace tmb { class MessageBus; }
namespace quickstep {
@@ -58,6 +67,10 @@ DECLARE_bool(use_hdfs);
class CatalogRelationSchema;
+#ifdef QUICKSTEP_DISTRIBUTED
+class PullResponse;
+#endif
+
class StorageBlockLayout;
/** \addtogroup Storage
@@ -365,6 +378,16 @@ class StorageManager {
**/
bool blockOrBlobIsLoadedAndDirty(const block_id block);
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Pull a block or a blob. Used by DataExchangerAsync.
+ *
+ * @param block The id of the block or blob.
+ * @param response Where to store the pulled block content.
+ **/
+ void pullBlockOrBlob(const block_id block, PullResponse *response) const;
+#endif
+
private:
struct BlockHandle {
void *block_memory;
@@ -374,6 +397,42 @@ class StorageManager {
#ifdef QUICKSTEP_DISTRIBUTED
/**
+ * @brief A class which connects to DataExchangerAsync to exchange data from
+ * remote peers.
+ **/
+ class DataExchangerClientAsync {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param channel The RPC channel to connect DataExchangerAsync.
+ * @param storage_manager The StorageManager to use.
+ */
+ DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+ StorageManager *storage_manager);
+
+ /**
+ * @brief Pull a block or blob from a remote StorageManager.
+ *
+ * @param block The block or blob to pull.
+ * @param numa_node The NUMA node for placing this block.
+ * @param block_handle Where the pulled block or blob stores.
+ *
+ * @return Whether the pull operation is successful or not.
+ */
+ bool Pull(const block_id block,
+ const numa_node_id numa_node,
+ BlockHandle *block_handle);
+
+ private:
+ std::unique_ptr<DataExchange::Stub> stub_;
+
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(DataExchangerClientAsync);
+ };
+
+ /**
* @brief Get the network info of all the remote StorageManagers which may
* load the given block in the buffer pool.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
new file mode 100644
index 0000000..38d12f6
--- /dev/null
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -0,0 +1,240 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/CountedReference.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/Tuple.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class DataExchangeTest : public ::testing::Test {
+ protected:
+ static const char kStoragePath[];
+ static const char kCheckedDomainNetworkAddress[];
+
+ ~DataExchangeTest() {
+ data_exchanger_expected_.join();
+ locator_->join();
+ }
+
+ virtual void SetUp() {
+ bus_.Initialize();
+
+ locator_.reset(new BlockLocator(&bus_));
+ locator_client_id_ = locator_->getBusClientID();
+ locator_->start();
+
+ worker_client_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
+
+ bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
+
+ storage_manager_expected_.reset(new StorageManager(
+ kStoragePath,
+ getBlockDomain(data_exchanger_expected_.network_address()),
+ locator_client_id_,
+ &bus_));
+
+ data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get());
+ data_exchanger_expected_.start();
+
+ storage_manager_checked_.reset(new StorageManager(
+ kStoragePath,
+ getBlockDomain(kCheckedDomainNetworkAddress),
+ locator_client_id_,
+ &bus_));
+ }
+
+ virtual void TearDown() {
+ storage_manager_checked_.reset();
+
+ data_exchanger_expected_.shutdown();
+ storage_manager_expected_.reset();
+
+ serialization::EmptyMessage proto;
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kPoisonMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') sent PoisonMessage (typed '" << kPoisonMessage
+ << "') to BlockLocator (id '" << locator_client_id_ << "')";
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ worker_client_id_,
+ locator_client_id_,
+ move(message)));
+ }
+
+ unique_ptr<StorageManager> storage_manager_expected_, storage_manager_checked_;
+
+ private:
+ block_id_domain getBlockDomain(const string &network_address) {
+ serialization::BlockDomainRegistrationMessage proto;
+ proto.set_domain_network_address(network_address);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainRegistrationMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+ << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ worker_client_id_,
+ locator_client_id_,
+ move(message)));
+
+ const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ EXPECT_EQ(locator_client_id_, annotated_message.sender);
+ EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') received BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') from BlockLocator";
+
+ serialization::BlockDomainMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ return static_cast<block_id_domain>(response_proto.block_domain());
+ }
+
+ MessageBusImpl bus_;
+
+ unique_ptr<BlockLocator> locator_;
+ tmb::client_id locator_client_id_;
+
+ tmb::client_id worker_client_id_;
+
+ DataExchangerAsync data_exchanger_expected_;
+};
+
+const char DataExchangeTest::kStoragePath[] = "./data_exchange_test_data/";
+const char DataExchangeTest::kCheckedDomainNetworkAddress[] = "ip:port";
+
+TEST_F(DataExchangeTest, BlockPull) {
+ CatalogRelation relation(nullptr, "rel");
+ const attribute_id attr_id =
+ relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
+
+ const block_id block =
+ storage_manager_expected_->createBlock(relation, relation.getDefaultStorageBlockLayout());
+
+ {
+ MutableBlockReference block_expected = storage_manager_expected_->getBlockMutable(block, relation);
+
+ // Insert a tuple.
+ const int value_expected = -1;
+ {
+ vector<TypedValue> attrs(1, TypedValue(value_expected));
+ const Tuple tuple(move(attrs));
+
+ EXPECT_TRUE(block_expected->insertTuple(tuple));
+ }
+
+ const BlockReference block_checked = storage_manager_checked_->getBlock(block, relation);
+ EXPECT_FALSE(block_checked->isBlob());
+
+ const TupleStorageSubBlock &tuple_store_checked = block_checked->getTupleStorageSubBlock();
+
+ EXPECT_EQ(1, tuple_store_checked.numTuples());
+ EXPECT_EQ(value_expected, tuple_store_checked.getAttributeValueTyped(0 /* tuple_id */, attr_id).getLiteral<int>());
+ }
+
+ storage_manager_checked_->deleteBlockOrBlobFile(block);
+ storage_manager_expected_->deleteBlockOrBlobFile(block);
+}
+
+TEST_F(DataExchangeTest, BlobPull) {
+ const block_id blob = storage_manager_expected_->createBlob(kDefaultBlockSizeInSlots);
+ {
+ const BlobReference blob_expected = storage_manager_expected_->getBlob(blob);
+ const BlobReference blob_checked = storage_manager_checked_->getBlob(blob);
+ EXPECT_TRUE(blob_checked->isBlob());
+ EXPECT_EQ(blob, blob_checked->getID());
+ }
+
+ storage_manager_checked_->deleteBlockOrBlobFile(blob);
+ storage_manager_expected_->deleteBlockOrBlobFile(blob);
+}
+
+} // namespace quickstep
+
+int main(int argc, char **argv) {
+ google::InitGoogleLogging(argv[0]);
+ // Honor FLAGS_buffer_pool_slots in StorageManager.
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ ::testing::InitGoogleTest(&argc, argv);
+
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/third_party/iwyu/iwyu_helper.py
----------------------------------------------------------------------
diff --git a/third_party/iwyu/iwyu_helper.py b/third_party/iwyu/iwyu_helper.py
index a204c50..13697be 100755
--- a/third_party/iwyu/iwyu_helper.py
+++ b/third_party/iwyu/iwyu_helper.py
@@ -19,8 +19,9 @@ import sys
QUICKSTEP_INCLUDES = [ '.',
'./build',
'./build/third_party',
- './build/third_party/protobuf/include',
'./build/third_party/gflags/include',
+ './build/third_party/protobuf/include',
+ './build/third_party/tmb/include',
'./third_party/benchmark/include',
'./third_party/glog/src',
'./third_party/googletest/googletest/include',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/50b4e55a/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index c7b5883..7dd6fc5 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -17,7 +17,7 @@ TODO List / Known Issues & Limitations:
"""
# Copyright 2011-2015 Quickstep Technologies LLC.
-# Copyright 2015 Pivotal Software, Inc.
+# Copyright 2015-2016 Pivotal Software, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,7 +40,8 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
# Explicitly ignored dependencies (special headers with no other quickstep
# dependencies).
IGNORED_DEPENDENCIES = frozenset(
- ["quickstep_threading_WinThreadsAPI",
+ ["quickstep_storage_DataExchange.grpc_proto",
+ "quickstep_threading_WinThreadsAPI",
"quickstep_utility_textbasedtest_TextBasedTest",
"quickstep_utility_textbasedtest_TextBasedTestDriver",
"quickstep_storage_bitweaving_BitWeavingHIndexSubBlock",