You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/07 08:44:52 UTC
[1/2] incubator-quickstep git commit: A workaround to remove query
result relation in the distributed version. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/hdfs_text_scan c946735f7 -> ae9ac342f (forced update)
A workaround to remove query result relation in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/aef1c358
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/aef1c358
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/aef1c358
Branch: refs/heads/hdfs_text_scan
Commit: aef1c3586580cfa72eb031fafe08700f6d5d9a86
Parents: 27a8055
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Feb 7 00:41:45 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Feb 7 00:41:45 2017 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aef1c358/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 4d95f16..e6f22ec 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -201,8 +201,12 @@ void ForemanDistributed::run() {
// TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) {
- processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+ const relation_id result_relation_id = proto.relation_id();
+ processSaveQueryResultResponseMessage(proto.cli_id(), result_relation_id);
query_result_saved_shiftbosses_.erase(query_id);
+
+ // TODO(zuyu): Refactor to clean-up blocks in Shiftbosses.
+ catalog_database_->dropRelationById(result_relation_id);
}
break;
}
[2/2] incubator-quickstep git commit: Added HDFS Support For
TextScanWorkOrder.
Posted by zu...@apache.org.
Added HDFS Support For TextScanWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ae9ac342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ae9ac342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ae9ac342
Branch: refs/heads/hdfs_text_scan
Commit: ae9ac342f37d395ce0c654e846fe3735df113471
Parents: aef1c35
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 6 14:42:42 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Feb 7 00:44:42 2017 -0800
----------------------------------------------------------------------
cli/distributed/Executor.cpp | 2 +-
query_execution/CMakeLists.txt | 1 +
query_execution/Shiftboss.cpp | 3 +-
query_execution/Shiftboss.hpp | 14 ++++
relational_operators/CMakeLists.txt | 5 ++
relational_operators/TextScanOperator.cpp | 102 +++++++++++++++++++++----
relational_operators/TextScanOperator.hpp | 14 +++-
relational_operators/WorkOrderFactory.cpp | 6 +-
relational_operators/WorkOrderFactory.hpp | 4 +-
storage/FileManagerHdfs.hpp | 9 +++
storage/StorageManager.cpp | 9 +++
storage/StorageManager.hpp | 8 +-
12 files changed, 153 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 1d03579..3485298 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -76,7 +76,7 @@ void Executor::init() {
data_exchanger_.start();
shiftboss_ =
- make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+ make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
shiftboss_->start();
for (const auto &worker : workers_) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index e26bde0..f251825 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -293,6 +293,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_WorkerMessage
quickstep_relationaloperators_RebuildWorkOrder
quickstep_relationaloperators_WorkOrderFactory
+ quickstep_storage_Flags
quickstep_storage_InsertDestination
quickstep_storage_StorageBlock
quickstep_storage_StorageManager
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2ed42d0..bae5205 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -104,7 +104,8 @@ void Shiftboss::run() {
query_contexts_[query_id].get(),
storage_manager_,
shiftboss_client_id_,
- bus_);
+ bus_,
+ hdfs_);
unique_ptr<WorkerMessage> worker_message(
WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 6538d48..c48bd59 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -30,6 +30,8 @@
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkerDirectory.hpp"
+#include "storage/Flags.hpp"
+#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
#include "threading/Thread.hpp"
#include "utility/Macros.hpp"
@@ -64,6 +66,7 @@ class Shiftboss : public Thread {
* @param bus A pointer to the TMB.
* @param storage_manager The StorageManager to use.
* @param workers A pointer to the WorkerDirectory.
+ * @param hdfs The HDFS connector via libhdfs3.
* @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
*
* @note If cpu_id is not specified, Shiftboss thread can be possibly moved
@@ -72,10 +75,12 @@ class Shiftboss : public Thread {
Shiftboss(tmb::MessageBus *bus,
StorageManager *storage_manager,
WorkerDirectory *workers,
+ void *hdfs = nullptr,
const int cpu_id = -1)
: bus_(DCHECK_NOTNULL(bus)),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
workers_(DCHECK_NOTNULL(workers)),
+ hdfs_(hdfs),
cpu_id_(cpu_id),
shiftboss_client_id_(tmb::kClientIdNone),
foreman_client_id_(tmb::kClientIdNone),
@@ -84,6 +89,12 @@ class Shiftboss : public Thread {
// Check to have at least one Worker.
DCHECK_GT(workers->getNumWorkers(), 0u);
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (FLAGS_use_hdfs) {
+ CHECK(hdfs_);
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
shiftboss_client_id_ = bus_->Connect();
LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
@@ -228,6 +239,9 @@ class Shiftboss : public Thread {
StorageManager *storage_manager_;
WorkerDirectory *workers_;
+ // Not owned.
+ void *hdfs_;
+
// The ID of the CPU that the Shiftboss thread can optionally be pinned to.
const int cpu_id_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c18dc77..89ad0b0 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -449,6 +449,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
quickstep_relationaloperators_RelationalOperator
quickstep_relationaloperators_WorkOrder
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_Flags
quickstep_storage_InsertDestination
quickstep_types_Type
quickstep_types_TypedValue
@@ -458,6 +459,10 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
quickstep_utility_Glob
quickstep_utility_Macros
tmb)
+if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
+ target_link_libraries(quickstep_relationaloperators_TextScanOperator
+ ${LIBHDFS3_LIBRARIES})
+endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_relationaloperators_UpdateOperator
glog
quickstep_catalog_CatalogRelation
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 6650319..d8d856e 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -41,7 +41,14 @@
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
+#include "storage/Flags.hpp"
#include "storage/InsertDestination.hpp"
+#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include <hdfs/hdfs.h>
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
@@ -209,12 +216,50 @@ void TextScanWorkOrder::execute() {
constexpr std::size_t kSmallBufferSize = 0x4000;
char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
- // Read text segment into buffer.
- FILE *file = std::fopen(filename_.c_str(), "rb");
- std::fseek(file, text_offset_, SEEK_SET);
- std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file);
- if (bytes_read != text_segment_size_) {
- throw TextScanReadError(filename_);
+ bool use_hdfs = false;
+ std::size_t bytes_read;
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ hdfsFS hdfs = nullptr;
+ hdfsFile file_handle = nullptr;
+
+ if (FLAGS_use_hdfs) {
+ use_hdfs = true;
+ hdfs = static_cast<hdfsFS>(hdfs_);
+
+ file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, 0 /* default buffer size */,
+ 0 /* default replication */, 0 /* default block size */);
+ if (file_handle == nullptr) {
+ LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno);
+ return;
+ }
+
+ if (hdfsSeek(hdfs, file_handle, text_offset_)) {
+ LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+ hdfsCloseFile(hdfs, file_handle);
+ return;
+ }
+
+ bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_);
+ if (bytes_read != text_segment_size_) {
+ hdfsCloseFile(hdfs, file_handle);
+ throw TextScanReadError(filename_);
+ }
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
+ FILE *file = nullptr;
+ if (!use_hdfs) {
+ // Read text segment into buffer.
+ file = std::fopen(filename_.c_str(), "rb");
+ std::fseek(file, text_offset_, SEEK_SET);
+ bytes_read = std::fread(buffer, 1, text_segment_size_, file);
+
+ if (bytes_read != text_segment_size_) {
+ std::fclose(file);
+ throw TextScanReadError(filename_);
+ }
}
// Locate the first newline character.
@@ -255,11 +300,11 @@ void TextScanWorkOrder::execute() {
} else {
vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty);
if (is_faulty) {
- // Skip faulty rows
- LOG(INFO) << "Faulty row found. Hence switching to next row.";
+ // Skip faulty rows
+ LOG(INFO) << "Faulty row found. Hence switching to next row.";
} else {
- // Convert vector returned to tuple only when a valid row is encountered.
- tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
+ // Convert vector returned to tuple only when a valid row is encountered.
+ tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
}
}
}
@@ -268,10 +313,31 @@ void TextScanWorkOrder::execute() {
// that the last tuple is very small / very large.
std::size_t dynamic_read_size = 1024;
std::string row_string;
- std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET);
+
+ const size_t dynamic_read_offset = text_offset_ + (end_ptr - buffer);
+ if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) {
+ LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+ hdfsCloseFile(hdfs, file_handle);
+ return;
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ } else {
+ std::fseek(file, dynamic_read_offset, SEEK_SET);
+ }
+
bool has_reached_end = false;
do {
- bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+ if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ bytes_read = hdfsRead(hdfs, file_handle, buffer, dynamic_read_size);
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ } else {
+ bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+ }
+
std::size_t bytes_to_copy = bytes_read;
for (std::size_t i = 0; i < bytes_read; ++i) {
@@ -305,7 +371,14 @@ void TextScanWorkOrder::execute() {
}
}
- std::fclose(file);
+ if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ hdfsCloseFile(hdfs, file_handle);
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ } else {
+ std::fclose(file);
+ }
+
free(buffer);
// Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
@@ -336,7 +409,8 @@ void TextScanWorkOrder::execute() {
}
std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
- const CatalogRelationSchema &relation, bool *is_faulty) const {
+ const CatalogRelationSchema &relation,
+ bool *is_faulty) const {
std::vector<TypedValue> attribute_values;
// Always assume current row is not faulty initially.
*is_faulty = false;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 65863b3..f6c4c2a 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -189,6 +189,7 @@ class TextScanWorkOrder : public WorkOrder {
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
* @param output_destination The InsertDestination to insert tuples.
+ * @param hdfs The HDFS connector via libhdfs3.
**/
TextScanWorkOrder(
const std::size_t query_id,
@@ -197,14 +198,16 @@ class TextScanWorkOrder : public WorkOrder {
const std::size_t text_segment_size,
const char field_terminator,
const bool process_escape_sequences,
- InsertDestination *output_destination)
+ InsertDestination *output_destination,
+ void *hdfs = nullptr)
: WorkOrder(query_id),
filename_(filename),
text_offset_(text_offset),
text_segment_size_(text_segment_size),
field_terminator_(field_terminator),
process_escape_sequences_(process_escape_sequences),
- output_destination_(DCHECK_NOTNULL(output_destination)) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ hdfs_(hdfs) {}
~TextScanWorkOrder() override {}
@@ -265,8 +268,8 @@ class TextScanWorkOrder : public WorkOrder {
* @return The tuple parsed from the char stream.
*/
std::vector<TypedValue> parseRow(const char **row_ptr,
- const CatalogRelationSchema &relation,
- bool *is_faulty) const;
+ const CatalogRelationSchema &relation,
+ bool *is_faulty) const;
/**
* @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
@@ -332,6 +335,9 @@ std::vector<TypedValue> parseRow(const char **row_ptr,
InsertDestination *output_destination_;
+ // Not owned.
+ void *hdfs_;
+
DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 5e8d03d..f3571fd 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -74,7 +74,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id shiftboss_client_id,
- tmb::MessageBus *bus) {
+ tmb::MessageBus *bus,
+ void *hdfs) {
DCHECK(query_context != nullptr);
DCHECK(ProtoIsValid(proto, *catalog_database, *query_context))
<< "Attempted to create WorkOrder from an invalid proto description:\n"
@@ -456,7 +457,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
query_context->getInsertDestination(
- proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)));
+ proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+ hdfs);
}
case serialization::UPDATE: {
LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/relational_operators/WorkOrderFactory.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp
index acf3855..ece687b 100644
--- a/relational_operators/WorkOrderFactory.hpp
+++ b/relational_operators/WorkOrderFactory.hpp
@@ -59,6 +59,7 @@ class WorkOrderFactory {
* @param storage_manager The StorageManager to use.
* @param shiftboss_client_id The TMB client id of Shiftboss.
* @param bus A pointer to the TMB.
+ * @param hdfs The HDFS connector via libhdfs3.
*
* @return A new WorkOrder reconstructed from the supplied Protocol Buffer.
**/
@@ -68,7 +69,8 @@ class WorkOrderFactory {
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id shiftboss_client_id,
- tmb::MessageBus *bus);
+ tmb::MessageBus *bus,
+ void *hdfs);
/**
* @brief Check whether a serialization::WorkOrder is fully-formed and
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/storage/FileManagerHdfs.hpp
----------------------------------------------------------------------
diff --git a/storage/FileManagerHdfs.hpp b/storage/FileManagerHdfs.hpp
index f47e4a8..a8feb50 100644
--- a/storage/FileManagerHdfs.hpp
+++ b/storage/FileManagerHdfs.hpp
@@ -55,6 +55,15 @@ class FileManagerHdfs : public FileManager {
block_id_counter getMaxUsedBlockCounter(const block_id_domain block_domain) const override;
+ /**
+ * @brief Get the HDFS connector via libhdfs3.
+ *
+ * @return The HDFS connector.
+ **/
+ void* hdfs() {
+ return static_cast<void*>(hdfs_);
+ }
+
private:
// libhdfs3 has an API to release this pointer.
hdfsFS hdfs_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 6f7d38b..872e8cc 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -570,6 +570,15 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
return true;
}
+void* StorageManager::hdfs() {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (FLAGS_use_hdfs) {
+ return static_cast<FileManagerHdfs*>(file_manager_.get())->hdfs();
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ return nullptr;
+}
+
vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
serialization::BlockMessage proto;
proto.set_block_id(block);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ae9ac342/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 42176ee..dc4b7e8 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -41,7 +41,6 @@
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConfig.h"
#include "storage/StorageConstants.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "utility/Macros.hpp"
@@ -395,6 +394,13 @@ class StorageManager {
void pullBlockOrBlob(const block_id block, PullResponse *response) const;
#endif
+ /**
+ * @brief Get the HDFS connector via libhdfs3.
+ *
+ * @return The HDFS connector.
+ **/
+ void* hdfs();
+
private:
struct BlockHandle {
void *block_memory;