You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2018/06/04 22:28:16 UTC
incubator-quickstep git commit: Data Provider thread added.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 835281ae8 -> dfefe6293
Data Provider thread added.
- Thread used for receiving block requests from clients and sending the
contents of a block back to the client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/dfefe629
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/dfefe629
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/dfefe629
Branch: refs/heads/master
Commit: dfefe62934c5c890650606782b50714cb2eb0366
Parents: 835281a
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Apr 6 16:00:54 2018 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Mon Jun 4 16:45:31 2018 -0500
----------------------------------------------------------------------
CMakeLists.txt | 6 +-
query_execution/QueryExecutionTypedefs.hpp | 4 +
storage/BlockWire.proto | 34 +++++
storage/CMakeLists.txt | 69 ++++++++-
storage/DataProviderThread.cpp | 92 +++++++++++
storage/DataProviderThread.hpp | 103 +++++++++++++
storage/StorageManager.cpp | 23 +++
storage/StorageManager.hpp | 17 ++-
storage/tests/DataProviderThread_unittest.cpp | 169 +++++++++++++++++++++
9 files changed, 512 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3f1c6cd..434019e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -702,7 +702,9 @@ set(ENABLE_MEMORYMIRROR OFF CACHE BOOL "Enable MemoryMirror TMB")
set(ENABLE_NATIVELOG OFF CACHE BOOL "Enable NativeLog TMB")
# The distributed version requires to use the NativeNet implementation.
-if (NOT ENABLE_DISTRIBUTED)
+if (ENABLE_DISTRIBUTED OR ENABLE_NETWORK_CLI)
+ set(ENABLE_NATIVENET ON CACHE BOOL "Enable NativeNet TMB")
+else()
set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
endif()
@@ -713,7 +715,7 @@ set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
add_subdirectory("${THIRD_PARTY_SOURCE_DIR}/tmb" "${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb")
include_directories(${TMB_INCLUDE_DIRS})
-if (ENABLE_DISTRIBUTED)
+if (ENABLE_DISTRIBUTED OR ENABLE_NETWORK_CLI)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
endif()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 80da7c5..ea4f8d2 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,10 @@ enum QueryExecutionMessageType : message_type_id {
kRebuildWorkOrderMessage, // From Foreman to Worker.
kRebuildWorkOrderCompleteMessage, // From Worker to Foreman.
kWorkloadCompletionMessage, // From Foreman to main thread.
+ kBlockRequestMessage, // To DataProviderThread.
+ kBlockResponseMessage, // From DataProviderThread.
+ kFinishReadingRelationMessage, // To DataProviderThread.
+
kPoisonMessage, // From the main thread to Foreman and Workers.
#ifdef QUICKSTEP_DISTRIBUTED
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/BlockWire.proto
----------------------------------------------------------------------
diff --git a/storage/BlockWire.proto b/storage/BlockWire.proto
new file mode 100644
index 0000000..c1a1763
--- /dev/null
+++ b/storage/BlockWire.proto
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package quickstep;
+
+message BlockResponse {
+ required bool is_valid = 1;
+ required bytes block = 2;
+}
+
+message BlockRequest {
+ required uint64 block_id = 1;
+ required int32 relation_id = 2;
+}
+
+message FinishReadingRelationMessage {
+ required int32 relation_id = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 832e667..2bb9a7d 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -120,6 +120,11 @@ configure_file (
QS_PROTOBUF_GENERATE_CPP(storage_AggregationOperationState_proto_srcs
storage_AggregationOperationState_proto_hdrs
AggregationOperationState.proto)
+if (ENABLE_NETWORK_CLI)
+ QS_PROTOBUF_GENERATE_CPP(storage_BlockWire_proto_srcs
+ storage_BlockWire_proto_hdrs
+ BlockWire.proto)
+endif()
QS_PROTOBUF_GENERATE_CPP(storage_HashTable_proto_srcs
storage_HashTable_proto_hdrs
HashTable.proto)
@@ -256,7 +261,10 @@ if (ENABLE_DISTRIBUTED)
${storage_DataExchange_proto_hdrs})
add_library(quickstep_storage_DataExchangerAsync DataExchangerAsync.cpp DataExchangerAsync.hpp)
endif()
-
+if (ENABLE_NETWORK_CLI)
+ add_library(quickstep_storage_BlockWire_proto ${storage_BlockWire_proto_srcs})
+ add_library(quickstep_storage_DataProviderThread DataProviderThread.cpp DataProviderThread.hpp)
+endif()
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
endif()
@@ -352,6 +360,10 @@ target_link_libraries(quickstep_storage_BasicColumnStoreValueAccessor
quickstep_utility_BitVector
quickstep_utility_Macros
quickstep_utility_PtrVector)
+if (ENABLE_NETWORK_CLI)
+ target_link_libraries(quickstep_storage_BlockWire_proto
+ ${PROTOBUF_LIBRARY})
+endif()
target_link_libraries(quickstep_storage_BloomFilterIndexSubBlock
glog
quickstep_catalog_CatalogAttribute
@@ -1089,6 +1101,29 @@ if (ENABLE_DISTRIBUTED)
quickstep_storage_DataExchange_proto)
endif()
+if (ENABLE_NETWORK_CLI)
+ target_link_libraries(quickstep_storage_DataProviderThread
+ glog
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_cli_DropRelation
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryoptimizer_QueryProcessor
+ quickstep_storage_BlockWire_proto
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_threading_Thread
+ quickstep_threading_ThreadUtil
+ quickstep_utility_Macros
+ tmb
+ ${GRPCPLUSPLUS_LIBRARIES})
+ target_link_libraries(quickstep_storage_StorageManager
+ ${GRPCPLUSPLUS_LIBRARIES}
+ quickstep_storage_BlockWire_proto)
+endif()
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_storage_FileManagerHdfs
${LIBHDFS3_LIBRARIES}
@@ -1215,6 +1250,12 @@ if (ENABLE_DISTRIBUTED)
quickstep_storage_DataExchangerAsync)
endif()
+if (ENABLE_NETWORK_CLI)
+ target_link_libraries(quickstep_storage
+ quickstep_storage_BlockWire_proto
+ quickstep_storage_DataProviderThread)
+endif()
+
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_storage
quickstep_storage_FileManagerHdfs)
@@ -1522,6 +1563,32 @@ if (ENABLE_DISTRIBUTED)
add_test(DataExchange_unittest DataExchange_unittest)
endif(ENABLE_DISTRIBUTED)
+if (ENABLE_NETWORK_CLI)
+ add_executable(DataProviderThread_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/DataProviderThread_unittest.cpp")
+ target_link_libraries(DataProviderThread_unittest
+ ${GFLAGS_LIB_NAME}
+ ${LIBS}
+ glog
+ gtest
+ gtest_main
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_queryexecution_ForemanSingleNode
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_Worker
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_queryoptimizer_QueryProcessor
+ quickstep_queryoptimizer_tests_TestDatabaseLoader
+ quickstep_storage_DataProviderThread
+ quickstep_storage_StorageConstants
+ quickstep_storage_BlockWire_proto
+ tmb)
+ add_test(DataProviderThread_unittest DataProviderThread_unittest)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/data_provider_thread_test_data)
+endif()
+
add_executable(CSBTreeIndexSubBlock_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/CSBTreeIndexSubBlock_unittest.cpp")
target_link_libraries(CSBTreeIndexSubBlock_unittest
${LIBS}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/DataProviderThread.cpp
----------------------------------------------------------------------
diff --git a/storage/DataProviderThread.cpp b/storage/DataProviderThread.cpp
new file mode 100644
index 0000000..0afb596
--- /dev/null
+++ b/storage/DataProviderThread.cpp
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/DataProviderThread.hpp"
+
+#include <cstddef>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/DropRelation.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/BlockWire.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+void DataProviderThread::run() {
+ if (cpu_id_ >= 0) {
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+ for (;;) {
+ const AnnotatedMessage annotated_message =
+ bus_->Receive(dpt_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const message_type_id message_type = tagged_message.message_type();
+ switch (message_type) {
+ case kPoisonMessage: {
+ return;
+ }
+ case kBlockRequestMessage: {
+ BlockRequest request_proto;
+ CHECK(request_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ // extract block ID from the message.
+ BlockResponse proto;
+ storage_manager_->sendBlockContents(request_proto.block_id(), &proto);
+ if (proto.is_valid()) {
+ // send response back to the client.
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockResponseMessage);
+ free(proto_bytes);
+ QueryExecutionUtil::SendTMBMessage(
+ bus_, dpt_client_id_, annotated_message.sender, std::move(message));
+ }
+ break;
+ }
+ case kFinishReadingRelationMessage: {
+ FinishReadingRelationMessage request_proto;
+ DropRelation::Drop(*database_->getRelationById(request_proto.relation_id()),
+ database_,
+ storage_manager_);
+ // TODO(harshad) - Check if the call below is thread safe?
+ query_processor_->saveCatalog();
+ break;
+ }
+ }
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/DataProviderThread.hpp
----------------------------------------------------------------------
diff --git a/storage/DataProviderThread.hpp b/storage/DataProviderThread.hpp
new file mode 100644
index 0000000..5b22db9
--- /dev/null
+++ b/storage/DataProviderThread.hpp
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_DATA_PROVIDER_THREAD_HPP_
+#define QUICKSTEP_STORAGE_DATA_PROVIDER_THREAD_HPP_
+
+#include <cstddef>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageConfig.h"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+class CatalogDatabase;
+class CatalogRelation;
+class QueryProcessor;
+class StorageManager;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief A thread that provides access to query results (e.g. storage blocks),
+ * to Quickstep clients.
+ **/
+class DataProviderThread : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param database The CatalogDatabase.
+ * @param query_processor The QueryProcessor object.
+ * @param storage_manager The storage manager.
+ * @param bus The TMB.
+ * @param cpu_id The ID of the CPU to affinitize this thread to, or -1 to
+ * indicate no affinity.
+ **/
+ DataProviderThread(CatalogDatabase *database,
+ QueryProcessor *query_processor,
+ StorageManager *storage_manager,
+ tmb::MessageBus *bus,
+ const int cpu_id = -1)
+ : database_(database),
+ query_processor_(query_processor),
+ storage_manager_(storage_manager),
+ bus_(bus),
+ cpu_id_(cpu_id),
+ dpt_client_id_(bus_->Connect()) {
+ bus_->RegisterClientAsReceiver(dpt_client_id_, kPoisonMessage);
+ bus_->RegisterClientAsReceiver(dpt_client_id_, kBlockRequestMessage);
+ bus_->RegisterClientAsReceiver(dpt_client_id_, kFinishReadingRelationMessage);
+ bus_->RegisterClientAsSender(dpt_client_id_, kBlockResponseMessage);
+ }
+
+ ~DataProviderThread() override {
+ }
+
+ tmb::client_id getTMBClientID() const {
+ return dpt_client_id_;
+ }
+
+ protected:
+ void run() override;
+
+ private:
+ CatalogDatabase *database_;
+ QueryProcessor *query_processor_;
+ StorageManager *storage_manager_;
+ tmb::MessageBus *bus_;
+
+ const int cpu_id_;
+
+ tmb::client_id dpt_client_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(DataProviderThread);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_DATA_PROVIDER_THREAD_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index e2efebf..26fc2d5 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -75,6 +75,10 @@
#include "storage/DataExchange.pb.h"
#endif
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+#include "storage/BlockWire.pb.h"
+#endif
+
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManagerLocal.hpp"
#include "storage/Flags.hpp"
@@ -665,6 +669,25 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
}
#endif
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+void StorageManager::sendBlockContents(const block_id block,
+ BlockResponse *response) const {
+ SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
+ std::unordered_map<block_id, BlockHandle>::const_iterator cit = blocks_.find(block);
+ if (cit != blocks_.end()) {
+ response->set_is_valid(true);
+
+ const BlockHandle &block_handle = cit->second;
+ const std::size_t num_slots = block_handle.block_memory_size;
+
+ response->set_block(block_handle.block_memory,
+ num_slots * kSlotSizeBytes);
+ } else {
+ response->set_is_valid(false);
+ }
+}
+#endif
+
StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
const block_id block, const int numa_node) {
// The caller of this function holds an exclusive lock on this block/blob's
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index eb40891..66dfdfe 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -62,12 +62,15 @@ namespace quickstep {
DECLARE_int32(block_domain);
DECLARE_uint64(buffer_pool_slots);
-class CatalogRelationSchema;
-
#ifdef QUICKSTEP_DISTRIBUTED
class PullResponse;
#endif
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+class BlockResponse;
+#endif
+
+class CatalogRelationSchema;
class StorageBlockLayout;
/** \addtogroup Storage
@@ -375,6 +378,16 @@ class StorageManager {
**/
bool blockOrBlobIsLoadedAndDirty(const block_id block);
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+ /**
+ * @brief Send the contents of the given block wrapped in a response message.
+ *
+ * @param block The ID of the requested block.
+ * @param response The message used to send the contents of the block.
+ **/
+ void sendBlockContents(const block_id block, BlockResponse *response) const;
+#endif
+
#ifdef QUICKSTEP_DISTRIBUTED
/**
* @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/tests/DataProviderThread_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataProviderThread_unittest.cpp b/storage/tests/DataProviderThread_unittest.cpp
new file mode 100644
index 0000000..7b91250
--- /dev/null
+++ b/storage/tests/DataProviderThread_unittest.cpp
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include <fstream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/BlockWire.pb.h"
+#include "storage/DataProviderThread.hpp"
+#include "storage/StorageConstants.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc++/security/server_credentials.h"
+#include "grpc++/server.h"
+#include "grpc++/server_builder.h"
+#include "grpc/grpc.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/internal/net_service_impl.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class DataProviderThreadTest : public ::testing::Test {
+ protected:
+ static const char kStoragePath[];
+
+ ~DataProviderThreadTest() {
+ data_provider_thread_->join();
+ }
+
+ virtual void SetUp() {
+ quickstep::ClientIDMap::Instance();
+
+ bus_.Initialize();
+
+ client_id_ = bus_.Connect();
+
+ bus_.RegisterClientAsSender(client_id_, kPoisonMessage);
+ bus_.RegisterClientAsSender(client_id_, kBlockRequestMessage);
+ bus_.RegisterClientAsSender(client_id_, kFinishReadingRelationMessage);
+ bus_.RegisterClientAsReceiver(client_id_, kBlockResponseMessage);
+ bus_.RegisterClientAsReceiver(client_id_, kDataPipelineMessage);
+ bus_.RegisterClientAsSender(client_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsSender(client_id_, kPoisonMessage);
+ bus_.RegisterClientAsReceiver(client_id_, kWorkloadCompletionMessage);
+ bus_.RegisterClientAsSender(client_id_, kCatalogRelationNewBlockMessage);
+
+ std::string catalog_path(kStoragePath);
+ catalog_path.append(kCatalogFilename);
+
+ std::ofstream catalog_file(catalog_path.c_str());
+ Catalog catalog;
+ catalog.addDatabase(new CatalogDatabase(nullptr, "default"));
+ catalog.getProto().SerializeToOstream(&catalog_file);
+ catalog_file.close();
+
+ query_processor_.reset(new QueryProcessor(std::move(catalog_path)));
+
+ database_loader_.reset(new optimizer::TestDatabaseLoader(kStoragePath));
+ database_loader_->createTestRelation(false);
+ database_loader_->loadTestRelation();
+
+ data_provider_thread_.reset(new DataProviderThread(database_loader_->catalog_database(),
+ query_processor_.get(),
+ database_loader_->storage_manager(),
+ &bus_));
+
+ worker_.reset(new Worker(0, &bus_));
+ worker_directory_.reset(
+ new WorkerDirectory(1, {worker_->getBusClientID()}, {-1}));
+
+ foreman_.reset(
+ new ForemanSingleNode(client_id_,
+ worker_directory_.get(),
+ &bus_,
+ query_processor_->getDefaultDatabase(),
+ database_loader_->storage_manager()));
+
+ worker_->start();
+ foreman_->start();
+ data_provider_thread_->start();
+ }
+
+ virtual void TearDown() {
+ QueryExecutionUtil::BroadcastPoisonMessage(client_id_, &bus_);
+ worker_->join();
+ foreman_->join();
+ database_loader_.reset(nullptr);
+ }
+
+ std::unique_ptr<optimizer::TestDatabaseLoader> database_loader_;
+
+ std::unique_ptr<DataProviderThread> data_provider_thread_;
+
+ tmb::client_id client_id_;
+
+ tmb::PureMemoryMessageBus<true> bus_;
+
+ std::unique_ptr<Worker> worker_;
+ std::unique_ptr<WorkerDirectory> worker_directory_;
+ std::unique_ptr<ForemanSingleNode> foreman_;
+
+ private:
+ std::unique_ptr<QueryProcessor> query_processor_;
+};
+
+const char DataProviderThreadTest::kStoragePath[] = "./data_provider_thread_test_data/";
+
+TEST_F(DataProviderThreadTest, BlockAccessTest) {
+ // Get the relation ID and block ID.
+ const relation_id rid = database_loader_->test_relation()->getID();
+ const std::vector<block_id> relation_block_ids = database_loader_->test_relation()->getBlocksSnapshot();
+ for (block_id b : relation_block_ids) {
+ BlockRequest proto;
+ proto.set_block_id(b);
+ proto.set_relation_id(rid);
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockRequestMessage);
+ free(proto_bytes);
+ QueryExecutionUtil::SendTMBMessage(
+ &bus_, client_id_, data_provider_thread_->getTMBClientID(), std::move(message));
+
+ const AnnotatedMessage annotated_message = bus_.Receive(client_id_, 0, true);
+ ASSERT_EQ(kBlockResponseMessage, annotated_message.tagged_message.message_type());
+ }
+}
+
+// NOTE(harshad)- We could write a test case for kFinishReadingRelationMessage,
+// but there may be race conditions involved in checking whether the relation
+// has been dropped or not. An alternative could be to send an acknowledgement
+// once the relation has been dropped. This alternative does not seem a high
+// priority right now.
+} // namespace quickstep