You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2018/06/04 22:28:16 UTC

incubator-quickstep git commit: Data Provider thread added.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 835281ae8 -> dfefe6293


Data Provider thread added.

- Thread used for receiving block requests from clients and sending the
contents of a block back to the client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/dfefe629
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/dfefe629
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/dfefe629

Branch: refs/heads/master
Commit: dfefe62934c5c890650606782b50714cb2eb0366
Parents: 835281a
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Apr 6 16:00:54 2018 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Mon Jun 4 16:45:31 2018 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                |   6 +-
 query_execution/QueryExecutionTypedefs.hpp    |   4 +
 storage/BlockWire.proto                       |  34 +++++
 storage/CMakeLists.txt                        |  69 ++++++++-
 storage/DataProviderThread.cpp                |  92 +++++++++++
 storage/DataProviderThread.hpp                | 103 +++++++++++++
 storage/StorageManager.cpp                    |  23 +++
 storage/StorageManager.hpp                    |  17 ++-
 storage/tests/DataProviderThread_unittest.cpp | 169 +++++++++++++++++++++
 9 files changed, 512 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3f1c6cd..434019e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -702,7 +702,9 @@ set(ENABLE_MEMORYMIRROR OFF CACHE BOOL "Enable MemoryMirror TMB")
 set(ENABLE_NATIVELOG OFF CACHE BOOL "Enable NativeLog TMB")
 
 # The distributed version requires to use the NativeNet implementation.
-if (NOT ENABLE_DISTRIBUTED)
+if (ENABLE_DISTRIBUTED OR ENABLE_NETWORK_CLI)
+  set(ENABLE_NATIVENET ON CACHE BOOL "Enable NativeNet TMB")
+else()
   set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
 endif()
 
@@ -713,7 +715,7 @@ set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
 add_subdirectory("${THIRD_PARTY_SOURCE_DIR}/tmb" "${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb")
 include_directories(${TMB_INCLUDE_DIRS})
 
-if (ENABLE_DISTRIBUTED)
+if (ENABLE_DISTRIBUTED OR ENABLE_NETWORK_CLI)
   include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
 endif()
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 80da7c5..ea4f8d2 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,10 @@ enum QueryExecutionMessageType : message_type_id {
   kRebuildWorkOrderMessage,  // From Foreman to Worker.
   kRebuildWorkOrderCompleteMessage,  // From Worker to Foreman.
   kWorkloadCompletionMessage,  // From Foreman to main thread.
+  kBlockRequestMessage,  // To DataProviderThread.
+  kBlockResponseMessage,  // From DataProviderThread.
+  kFinishReadingRelationMessage,  // To DataProviderThread.
+
   kPoisonMessage,  // From the main thread to Foreman and Workers.
 
 #ifdef QUICKSTEP_DISTRIBUTED

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/BlockWire.proto
----------------------------------------------------------------------
diff --git a/storage/BlockWire.proto b/storage/BlockWire.proto
new file mode 100644
index 0000000..c1a1763
--- /dev/null
+++ b/storage/BlockWire.proto
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package quickstep;
+
+message BlockResponse {
+  required bool is_valid = 1;
+  required bytes block = 2;
+}
+
+message BlockRequest {
+  required uint64 block_id = 1;
+  required int32 relation_id = 2;
+}
+
+message FinishReadingRelationMessage {
+  required int32 relation_id = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 832e667..2bb9a7d 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -120,6 +120,11 @@ configure_file (
 QS_PROTOBUF_GENERATE_CPP(storage_AggregationOperationState_proto_srcs
                          storage_AggregationOperationState_proto_hdrs
                          AggregationOperationState.proto)
+if (ENABLE_NETWORK_CLI)
+  QS_PROTOBUF_GENERATE_CPP(storage_BlockWire_proto_srcs
+                           storage_BlockWire_proto_hdrs
+                           BlockWire.proto)
+endif()
 QS_PROTOBUF_GENERATE_CPP(storage_HashTable_proto_srcs
                          storage_HashTable_proto_hdrs
                          HashTable.proto)
@@ -256,7 +261,10 @@ if (ENABLE_DISTRIBUTED)
               ${storage_DataExchange_proto_hdrs})
   add_library(quickstep_storage_DataExchangerAsync DataExchangerAsync.cpp DataExchangerAsync.hpp)
 endif()
-
+if (ENABLE_NETWORK_CLI)
+  add_library(quickstep_storage_BlockWire_proto ${storage_BlockWire_proto_srcs})
+  add_library(quickstep_storage_DataProviderThread DataProviderThread.cpp DataProviderThread.hpp)
+endif()
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
 endif()
@@ -352,6 +360,10 @@ target_link_libraries(quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_utility_BitVector
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector)
+if (ENABLE_NETWORK_CLI)
+  target_link_libraries(quickstep_storage_BlockWire_proto
+                        ${PROTOBUF_LIBRARY})
+endif()
 target_link_libraries(quickstep_storage_BloomFilterIndexSubBlock
                       glog
                       quickstep_catalog_CatalogAttribute
@@ -1089,6 +1101,29 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_storage_DataExchange_proto)
 endif()
 
+if (ENABLE_NETWORK_CLI)
+  target_link_libraries(quickstep_storage_DataProviderThread
+                        glog
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_cli_DropRelation
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryoptimizer_QueryProcessor
+                        quickstep_storage_BlockWire_proto
+                        quickstep_storage_StorageBlock
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
+                        quickstep_threading_Thread
+                        quickstep_threading_ThreadUtil
+                        quickstep_utility_Macros
+                        tmb
+                        ${GRPCPLUSPLUS_LIBRARIES})
+  target_link_libraries(quickstep_storage_StorageManager
+                        ${GRPCPLUSPLUS_LIBRARIES}
+                        quickstep_storage_BlockWire_proto)
+endif()
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   target_link_libraries(quickstep_storage_FileManagerHdfs
                         ${LIBHDFS3_LIBRARIES}
@@ -1215,6 +1250,12 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_storage_DataExchangerAsync)
 endif()
 
+if (ENABLE_NETWORK_CLI)
+  target_link_libraries(quickstep_storage
+                        quickstep_storage_BlockWire_proto
+                        quickstep_storage_DataProviderThread)
+endif()
+
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   target_link_libraries(quickstep_storage
                         quickstep_storage_FileManagerHdfs)
@@ -1522,6 +1563,32 @@ if (ENABLE_DISTRIBUTED)
   add_test(DataExchange_unittest DataExchange_unittest)
 endif(ENABLE_DISTRIBUTED)
 
+if (ENABLE_NETWORK_CLI)
+  add_executable(DataProviderThread_unittest
+                 "${CMAKE_CURRENT_SOURCE_DIR}/tests/DataProviderThread_unittest.cpp")
+  target_link_libraries(DataProviderThread_unittest
+                        ${GFLAGS_LIB_NAME}
+                        ${LIBS}
+                        glog
+                        gtest
+                        gtest_main
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogRelation
+                        quickstep_queryexecution_ForemanSingleNode
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_Worker
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_queryoptimizer_QueryProcessor
+                        quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_storage_DataProviderThread
+                        quickstep_storage_StorageConstants
+                        quickstep_storage_BlockWire_proto 
+                        tmb)
+  add_test(DataProviderThread_unittest DataProviderThread_unittest)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/data_provider_thread_test_data)
+endif()
+
 add_executable(CSBTreeIndexSubBlock_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/CSBTreeIndexSubBlock_unittest.cpp")
 target_link_libraries(CSBTreeIndexSubBlock_unittest
                       ${LIBS}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/DataProviderThread.cpp
----------------------------------------------------------------------
diff --git a/storage/DataProviderThread.cpp b/storage/DataProviderThread.cpp
new file mode 100644
index 0000000..0afb596
--- /dev/null
+++ b/storage/DataProviderThread.cpp
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/DataProviderThread.hpp"
+
+#include <cstddef>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/DropRelation.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/BlockWire.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+void DataProviderThread::run() {
+  if (cpu_id_ >= 0) {
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+  for (;;) {
+    const AnnotatedMessage annotated_message =
+        bus_->Receive(dpt_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const message_type_id message_type = tagged_message.message_type();
+    switch (message_type) {
+      case kPoisonMessage: {
+        return;
+      }
+      case kBlockRequestMessage: {
+        BlockRequest request_proto;
+        CHECK(request_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+        // extract block ID from the message.
+        BlockResponse proto;
+        storage_manager_->sendBlockContents(request_proto.block_id(), &proto);
+        if (proto.is_valid()) {
+          // send response back to the client.
+          const int proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+          TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                proto_length,
+                                kBlockResponseMessage);
+          free(proto_bytes);
+          QueryExecutionUtil::SendTMBMessage(
+              bus_, dpt_client_id_, annotated_message.sender, std::move(message));
+        }
+        break;
+      }
+      case kFinishReadingRelationMessage: {
+        FinishReadingRelationMessage request_proto;
+        DropRelation::Drop(*database_->getRelationById(request_proto.relation_id()),
+                           database_,
+                           storage_manager_);
+        // TODO(harshad) - Check if the call below is thread safe?
+        query_processor_->saveCatalog();
+        break;
+      }
+    }
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/DataProviderThread.hpp
----------------------------------------------------------------------
diff --git a/storage/DataProviderThread.hpp b/storage/DataProviderThread.hpp
new file mode 100644
index 0000000..5b22db9
--- /dev/null
+++ b/storage/DataProviderThread.hpp
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_DATA_PROVIDER_THREAD_HPP_
+#define QUICKSTEP_STORAGE_DATA_PROVIDER_THREAD_HPP_
+
+#include <cstddef>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageConfig.h"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+class CatalogDatabase;
+class CatalogRelation;
+class QueryProcessor;
+class StorageManager;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief A thread that provides access to query results (e.g. storage blocks),
+ *        to Quickstep clients.
+ **/
+class DataProviderThread : public Thread {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param database The CatalogDatabase.
+   * @param query_processor The QueryProcessor object.
+   * @param storage_manager The storage manager.
+   * @param bus The TMB.
+   * @param cpu_id The ID of the CPU to affinitize this thread to, or -1 to
+   *        indicate no affinity.
+   **/
+  DataProviderThread(CatalogDatabase *database,
+                     QueryProcessor *query_processor,
+                     StorageManager *storage_manager,
+                     tmb::MessageBus *bus,
+                     const int cpu_id = -1)
+      : database_(database),
+        query_processor_(query_processor),
+        storage_manager_(storage_manager),
+        bus_(bus),
+        cpu_id_(cpu_id),
+        dpt_client_id_(bus_->Connect()) {
+    bus_->RegisterClientAsReceiver(dpt_client_id_, kPoisonMessage);
+    bus_->RegisterClientAsReceiver(dpt_client_id_, kBlockRequestMessage);
+    bus_->RegisterClientAsReceiver(dpt_client_id_, kFinishReadingRelationMessage);
+    bus_->RegisterClientAsSender(dpt_client_id_, kBlockResponseMessage);
+  }
+
+  ~DataProviderThread() override {
+  }
+
+  tmb::client_id getTMBClientID() const {
+    return dpt_client_id_;
+  }
+
+ protected:
+  void run() override;
+
+ private:
+  CatalogDatabase *database_;
+  QueryProcessor *query_processor_;
+  StorageManager *storage_manager_;
+  tmb::MessageBus *bus_;
+
+  const int cpu_id_;
+
+  tmb::client_id dpt_client_id_;
+
+  DISALLOW_COPY_AND_ASSIGN(DataProviderThread);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_DATA_PROVIDER_THREAD_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index e2efebf..26fc2d5 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -75,6 +75,10 @@
 #include "storage/DataExchange.pb.h"
 #endif
 
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+#include "storage/BlockWire.pb.h"
+#endif
+
 #include "storage/EvictionPolicy.hpp"
 #include "storage/FileManagerLocal.hpp"
 #include "storage/Flags.hpp"
@@ -665,6 +669,25 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
 }
 #endif
 
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+void StorageManager::sendBlockContents(const block_id block,
+                                       BlockResponse *response) const {
+  SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
+  std::unordered_map<block_id, BlockHandle>::const_iterator cit = blocks_.find(block);
+  if (cit != blocks_.end()) {
+    response->set_is_valid(true);
+
+    const BlockHandle &block_handle = cit->second;
+    const std::size_t num_slots = block_handle.block_memory_size;
+
+    response->set_block(block_handle.block_memory,
+                        num_slots * kSlotSizeBytes);
+  } else {
+    response->set_is_valid(false);
+  }
+}
+#endif
+
 StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
     const block_id block, const int numa_node) {
   // The caller of this function holds an exclusive lock on this block/blob's

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index eb40891..66dfdfe 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -62,12 +62,15 @@ namespace quickstep {
 DECLARE_int32(block_domain);
 DECLARE_uint64(buffer_pool_slots);
 
-class CatalogRelationSchema;
-
 #ifdef QUICKSTEP_DISTRIBUTED
 class PullResponse;
 #endif
 
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+class BlockResponse;
+#endif
+
+class CatalogRelationSchema;
 class StorageBlockLayout;
 
 /** \addtogroup Storage
@@ -375,6 +378,16 @@ class StorageManager {
    **/
   bool blockOrBlobIsLoadedAndDirty(const block_id block);
 
+#ifdef QUICKSTEP_ENABLE_NETWORK_CLI
+  /**
+   * @brief Send the contents of the given block wrapped in a response message.
+   *
+   * @param block The ID of the requested block.
+   * @param response The message used to send the contents of the block.
+   **/
+  void sendBlockContents(const block_id block, BlockResponse *response) const;
+#endif
+
 #ifdef QUICKSTEP_DISTRIBUTED
   /**
    * @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dfefe629/storage/tests/DataProviderThread_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataProviderThread_unittest.cpp b/storage/tests/DataProviderThread_unittest.cpp
new file mode 100644
index 0000000..7b91250
--- /dev/null
+++ b/storage/tests/DataProviderThread_unittest.cpp
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include <fstream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/BlockWire.pb.h"
+#include "storage/DataProviderThread.hpp"
+#include "storage/StorageConstants.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc++/security/server_credentials.h"
+#include "grpc++/server.h"
+#include "grpc++/server_builder.h"
+#include "grpc/grpc.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/internal/net_service_impl.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class DataProviderThreadTest : public ::testing::Test {
+ protected:
+  static const char kStoragePath[];
+
+  ~DataProviderThreadTest() {
+    data_provider_thread_->join();
+  }
+
+  virtual void SetUp() {
+    quickstep::ClientIDMap::Instance();
+
+    bus_.Initialize();
+
+    client_id_ = bus_.Connect();
+
+    bus_.RegisterClientAsSender(client_id_, kPoisonMessage);
+    bus_.RegisterClientAsSender(client_id_, kBlockRequestMessage);
+    bus_.RegisterClientAsSender(client_id_, kFinishReadingRelationMessage);
+    bus_.RegisterClientAsReceiver(client_id_, kBlockResponseMessage);
+    bus_.RegisterClientAsReceiver(client_id_, kDataPipelineMessage);
+    bus_.RegisterClientAsSender(client_id_, kAdmitRequestMessage);
+    bus_.RegisterClientAsSender(client_id_, kPoisonMessage);
+    bus_.RegisterClientAsReceiver(client_id_, kWorkloadCompletionMessage);
+    bus_.RegisterClientAsSender(client_id_, kCatalogRelationNewBlockMessage);
+
+    std::string catalog_path(kStoragePath);
+    catalog_path.append(kCatalogFilename);
+
+    std::ofstream catalog_file(catalog_path.c_str());
+    Catalog catalog;
+    catalog.addDatabase(new CatalogDatabase(nullptr, "default"));
+    catalog.getProto().SerializeToOstream(&catalog_file);
+    catalog_file.close();
+
+    query_processor_.reset(new QueryProcessor(std::move(catalog_path)));
+
+    database_loader_.reset(new optimizer::TestDatabaseLoader(kStoragePath));
+    database_loader_->createTestRelation(false);
+    database_loader_->loadTestRelation();
+
+    data_provider_thread_.reset(new DataProviderThread(database_loader_->catalog_database(),
+                                                       query_processor_.get(),
+                                                       database_loader_->storage_manager(),
+                                                       &bus_));
+
+    worker_.reset(new Worker(0, &bus_));
+    worker_directory_.reset(
+        new WorkerDirectory(1, {worker_->getBusClientID()}, {-1}));
+
+    foreman_.reset(
+        new ForemanSingleNode(client_id_,
+                              worker_directory_.get(),
+                              &bus_,
+                              query_processor_->getDefaultDatabase(),
+                              database_loader_->storage_manager()));
+
+    worker_->start();
+    foreman_->start();
+    data_provider_thread_->start();
+  }
+
+  virtual void TearDown() {
+    QueryExecutionUtil::BroadcastPoisonMessage(client_id_, &bus_);
+    worker_->join();
+    foreman_->join();
+    database_loader_.reset(nullptr);
+  }
+
+  std::unique_ptr<optimizer::TestDatabaseLoader> database_loader_;
+
+  std::unique_ptr<DataProviderThread> data_provider_thread_;
+
+  tmb::client_id client_id_;
+
+  tmb::PureMemoryMessageBus<true> bus_;
+
+  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<WorkerDirectory> worker_directory_;
+  std::unique_ptr<ForemanSingleNode> foreman_;
+
+ private:
+  std::unique_ptr<QueryProcessor> query_processor_;
+};
+
+const char DataProviderThreadTest::kStoragePath[] = "./data_provider_thread_test_data/";
+
+TEST_F(DataProviderThreadTest, BlockAccessTest) {
+  // Get the relation ID and block ID.
+  const relation_id rid = database_loader_->test_relation()->getID();
+  const std::vector<block_id> relation_block_ids = database_loader_->test_relation()->getBlocksSnapshot();
+  for (block_id b : relation_block_ids) {
+    BlockRequest proto;
+    proto.set_block_id(b);
+    proto.set_relation_id(rid);
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kBlockRequestMessage);
+    free(proto_bytes);
+    QueryExecutionUtil::SendTMBMessage(
+        &bus_, client_id_, data_provider_thread_->getTMBClientID(), std::move(message));
+
+    const AnnotatedMessage annotated_message = bus_.Receive(client_id_, 0, true);
+    ASSERT_EQ(kBlockResponseMessage, annotated_message.tagged_message.message_type());
+  }
+}
+
+// NOTE(harshad)- We could write a test case for kFinishReadingRelationMessage,
+// but there may be race conditions involved in checking whether the relation
+// has been dropped or not. An alternative could be to send an acknowledgement
+// once the relation has been dropped. This alternative does not seem a high
+// priority right now.
+}   // namespace quickstep