You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/23 09:07:00 UTC
[1/2] incubator-quickstep git commit: Provided more info for the
OutOfMemory exception. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/hdfs_text_scan b21c85b6f -> c575048c1 (forced update)
Provided more info for the OutOfMemory exception.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4be8e91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4be8e91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4be8e91a
Branch: refs/heads/hdfs_text_scan
Commit: 4be8e91a4acddab22f4d71f62b4247863ddc766f
Parents: f5c063a
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Feb 23 01:06:37 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 23 01:06:37 2017 -0800
----------------------------------------------------------------------
storage/StorageErrors.cpp | 6 ++++++
storage/StorageErrors.hpp | 12 +++++++++++-
storage/StorageManager.cpp | 2 +-
3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4be8e91a/storage/StorageErrors.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageErrors.cpp b/storage/StorageErrors.cpp
index 310ee4c..81bd997 100644
--- a/storage/StorageErrors.cpp
+++ b/storage/StorageErrors.cpp
@@ -41,6 +41,12 @@ FileWriteError::FileWriteError(const std::string &filename)
message_.append(filename);
}
+OutOfMemory::OutOfMemory(const std::size_t num_slots)
+ : message_("OutOfMemory: The system has run out of memory when allocating ") {
+ message_.append(std::to_string(num_slots));
+ message_.append(" slots");
+}
+
TupleTooLargeForBlock::TupleTooLargeForBlock(const std::size_t tuple_size)
: tuple_size_(tuple_size),
message_("TupleTooLargeForBlock: ") {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4be8e91a/storage/StorageErrors.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageErrors.hpp b/storage/StorageErrors.hpp
index 67a3e42..b87f04e 100644
--- a/storage/StorageErrors.hpp
+++ b/storage/StorageErrors.hpp
@@ -141,9 +141,19 @@ class MalformedBlock : public std::exception {
**/
class OutOfMemory : public std::exception {
public:
+ /**
+ * @brief Constructor.
+ *
+ * @param num_slots The number of slots to allocate.
+ **/
+ explicit OutOfMemory(const std::size_t num_slots);
+
virtual const char* what() const throw() {
- return "OutOfMemory: The system has run out of memory";
+ return message_.c_str();
}
+
+ private:
+ std::string message_;
};
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4be8e91a/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 6f7d38b..783ccfe 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -757,7 +757,7 @@ void* StorageManager::allocateSlots(const std::size_t num_slots,
#endif
if (slots == nullptr) {
- throw OutOfMemory();
+ throw OutOfMemory(num_slots);
}
#if defined(QUICKSTEP_HAVE_LIBNUMA)
[2/2] incubator-quickstep git commit: Added HDFS Support For
TextScanWorkOrder.
Posted by zu...@apache.org.
Added HDFS Support For TextScanWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c575048c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c575048c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c575048c
Branch: refs/heads/hdfs_text_scan
Commit: c575048c1f47a574ecfd603ac7c856c4364c54a4
Parents: 4be8e91
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 6 14:42:42 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 23 01:06:53 2017 -0800
----------------------------------------------------------------------
cli/distributed/Executor.cpp | 2 +-
query_execution/CMakeLists.txt | 1 +
query_execution/Shiftboss.cpp | 3 +-
query_execution/Shiftboss.hpp | 14 +++
.../DistributedExecutionGeneratorTestRunner.cpp | 3 +-
relational_operators/CMakeLists.txt | 5 +
relational_operators/TextScanOperator.cpp | 107 ++++++++++++++++---
relational_operators/TextScanOperator.hpp | 10 +-
relational_operators/WorkOrderFactory.cpp | 6 +-
relational_operators/WorkOrderFactory.hpp | 4 +-
storage/FileManagerHdfs.hpp | 9 ++
storage/StorageManager.cpp | 9 ++
storage/StorageManager.hpp | 8 +-
13 files changed, 160 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 1d03579..3485298 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -76,7 +76,7 @@ void Executor::init() {
data_exchanger_.start();
shiftboss_ =
- make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+ make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
shiftboss_->start();
for (const auto &worker : workers_) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 50bf694..12d6be0 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -295,6 +295,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_WorkerMessage
quickstep_relationaloperators_RebuildWorkOrder
quickstep_relationaloperators_WorkOrderFactory
+ quickstep_storage_Flags
quickstep_storage_InsertDestination
quickstep_storage_StorageBlock
quickstep_storage_StorageManager
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2ed42d0..bae5205 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -104,7 +104,8 @@ void Shiftboss::run() {
query_contexts_[query_id].get(),
storage_manager_,
shiftboss_client_id_,
- bus_);
+ bus_,
+ hdfs_);
unique_ptr<WorkerMessage> worker_message(
WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 6538d48..e0b4312 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -30,6 +30,8 @@
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkerDirectory.hpp"
+#include "storage/Flags.hpp"
+#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
#include "threading/Thread.hpp"
#include "utility/Macros.hpp"
@@ -64,6 +66,7 @@ class Shiftboss : public Thread {
* @param bus A pointer to the TMB.
* @param storage_manager The StorageManager to use.
* @param workers A pointer to the WorkerDirectory.
+ * @param hdfs The HDFS connector via libhdfs3.
* @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
*
* @note If cpu_id is not specified, Shiftboss thread can be possibly moved
@@ -72,10 +75,12 @@ class Shiftboss : public Thread {
Shiftboss(tmb::MessageBus *bus,
StorageManager *storage_manager,
WorkerDirectory *workers,
+ void *hdfs,
const int cpu_id = -1)
: bus_(DCHECK_NOTNULL(bus)),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
workers_(DCHECK_NOTNULL(workers)),
+ hdfs_(hdfs),
cpu_id_(cpu_id),
shiftboss_client_id_(tmb::kClientIdNone),
foreman_client_id_(tmb::kClientIdNone),
@@ -84,6 +89,12 @@ class Shiftboss : public Thread {
// Check to have at least one Worker.
DCHECK_GT(workers->getNumWorkers(), 0u);
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (FLAGS_use_hdfs) {
+ CHECK(hdfs_);
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
shiftboss_client_id_ = bus_->Connect();
LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
@@ -228,6 +239,9 @@ class Shiftboss : public Thread {
StorageManager *storage_manager_;
WorkerDirectory *workers_;
+ // Not owned.
+ void *hdfs_;
+
// The ID of the CPU that the Shiftboss thread can optionally be pinned to.
const int cpu_id_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2e18467..c9f5a10 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -128,7 +128,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
data_exchangers_[i].set_storage_manager(storage_manager.get());
shiftbosses_.push_back(
- make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+ make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get(),
+ storage_manager->hdfs()));
storage_managers_.push_back(move(storage_manager));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 457d58a..1693ec2 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -491,6 +491,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
quickstep_relationaloperators_RelationalOperator
quickstep_relationaloperators_WorkOrder
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_Flags
quickstep_storage_InsertDestination
quickstep_types_Type
quickstep_types_TypedValue
@@ -500,6 +501,10 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
quickstep_utility_Glob
quickstep_utility_Macros
tmb)
+if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
+ target_link_libraries(quickstep_relationaloperators_TextScanOperator
+ ${LIBHDFS3_LIBRARIES})
+endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_relationaloperators_UpdateOperator
glog
quickstep_catalog_CatalogRelation
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 0a83a85..6333813 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -41,7 +41,14 @@
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
+#include "storage/Flags.hpp"
#include "storage/InsertDestination.hpp"
+#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include <hdfs/hdfs.h>
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
@@ -205,14 +212,56 @@ void TextScanWorkOrder::execute() {
std::vector<TypedValue> vector_tuple_returned;
constexpr std::size_t kSmallBufferSize = 0x4000;
- char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
-
- // Read text segment into buffer.
- FILE *file = std::fopen(filename_.c_str(), "rb");
- std::fseek(file, text_offset_, SEEK_SET);
- std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file);
- if (bytes_read != text_segment_size_) {
- throw TextScanReadError(filename_);
+ const size_t buffer_size = std::max(text_segment_size_, kSmallBufferSize);
+ char *buffer = reinterpret_cast<char *>(malloc(buffer_size));
+
+ bool use_hdfs = false;
+ std::size_t bytes_read;
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ hdfsFS hdfs = nullptr;
+ hdfsFile file_handle = nullptr;
+
+ if (FLAGS_use_hdfs) {
+ use_hdfs = true;
+ hdfs = static_cast<hdfsFS>(hdfs_);
+
+ file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, buffer_size,
+ 0 /* default replication */, 0 /* default block size */);
+ if (file_handle == nullptr) {
+ LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno);
+ return;
+ }
+
+ if (hdfsSeek(hdfs, file_handle, text_offset_)) {
+ LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+ hdfsCloseFile(hdfs, file_handle);
+ return;
+ }
+
+ bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_);
+ if (bytes_read != text_segment_size_) {
+ hdfsCloseFile(hdfs, file_handle);
+ throw TextScanReadError(filename_);
+ }
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
+ FILE *file = nullptr;
+ if (!use_hdfs) {
+ // Avoid unused-private-field warning.
+ (void) hdfs_;
+
+ // Read text segment into buffer.
+ file = std::fopen(filename_.c_str(), "rb");
+ std::fseek(file, text_offset_, SEEK_SET);
+ bytes_read = std::fread(buffer, 1, text_segment_size_, file);
+
+ if (bytes_read != text_segment_size_) {
+ std::fclose(file);
+ throw TextScanReadError(filename_);
+ }
}
// Locate the first newline character.
@@ -266,10 +315,36 @@ void TextScanWorkOrder::execute() {
// that the last tuple is very small / very large.
std::size_t dynamic_read_size = 1024;
std::string row_string;
- std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET);
+
+ const size_t dynamic_read_offset = text_offset_ + (end_ptr - buffer);
+ if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) {
+ LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+ hdfsCloseFile(hdfs, file_handle);
+ return;
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ } else {
+ std::fseek(file, dynamic_read_offset, SEEK_SET);
+ }
+
bool has_reached_end = false;
do {
- bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+ if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ bytes_read = hdfsRead(hdfs, file_handle, buffer, dynamic_read_size);
+
+ // Read again when acrossing the HDFS block boundary.
+ if (bytes_read != dynamic_read_size) {
+ bytes_read += hdfsRead(hdfs, file_handle, buffer + bytes_read, dynamic_read_size - bytes_read);
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ } else {
+ bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+ }
+
std::size_t bytes_to_copy = bytes_read;
for (std::size_t i = 0; i < bytes_read; ++i) {
@@ -303,7 +378,14 @@ void TextScanWorkOrder::execute() {
}
}
- std::fclose(file);
+ if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ hdfsCloseFile(hdfs, file_handle);
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ } else {
+ std::fclose(file);
+ }
+
free(buffer);
// Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
@@ -334,7 +416,8 @@ void TextScanWorkOrder::execute() {
}
std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
- const CatalogRelationSchema &relation, bool *is_faulty) const {
+ const CatalogRelationSchema &relation,
+ bool *is_faulty) const {
std::vector<TypedValue> attribute_values;
// Always assume current row is not faulty initially.
*is_faulty = false;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index eada190..59821fc 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -189,6 +189,7 @@ class TextScanWorkOrder : public WorkOrder {
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
* @param output_destination The InsertDestination to insert tuples.
+ * @param hdfs The HDFS connector via libhdfs3.
**/
TextScanWorkOrder(
const std::size_t query_id,
@@ -197,14 +198,16 @@ class TextScanWorkOrder : public WorkOrder {
const std::size_t text_segment_size,
const char field_terminator,
const bool process_escape_sequences,
- InsertDestination *output_destination)
+ InsertDestination *output_destination,
+ void *hdfs = nullptr)
: WorkOrder(query_id),
filename_(filename),
text_offset_(text_offset),
text_segment_size_(text_segment_size),
field_terminator_(field_terminator),
process_escape_sequences_(process_escape_sequences),
- output_destination_(DCHECK_NOTNULL(output_destination)) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ hdfs_(hdfs) {}
~TextScanWorkOrder() override {}
@@ -332,6 +335,9 @@ class TextScanWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
+ // Not owned.
+ void *hdfs_;
+
DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index d2c8251..cf0ee74 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -75,7 +75,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id shiftboss_client_id,
- tmb::MessageBus *bus) {
+ tmb::MessageBus *bus,
+ void *hdfs) {
DCHECK(query_context != nullptr);
DCHECK(ProtoIsValid(proto, *catalog_database, *query_context))
<< "Attempted to create WorkOrder from an invalid proto description:\n"
@@ -473,7 +474,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
query_context->getInsertDestination(
- proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)));
+ proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+ hdfs);
}
case serialization::UPDATE: {
LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/relational_operators/WorkOrderFactory.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp
index acf3855..ece687b 100644
--- a/relational_operators/WorkOrderFactory.hpp
+++ b/relational_operators/WorkOrderFactory.hpp
@@ -59,6 +59,7 @@ class WorkOrderFactory {
* @param storage_manager The StorageManager to use.
* @param shiftboss_client_id The TMB client id of Shiftboss.
* @param bus A pointer to the TMB.
+ * @param hdfs The HDFS connector via libhdfs3.
*
* @return A new WorkOrder reconstructed from the supplied Protocol Buffer.
**/
@@ -68,7 +69,8 @@ class WorkOrderFactory {
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id shiftboss_client_id,
- tmb::MessageBus *bus);
+ tmb::MessageBus *bus,
+ void *hdfs);
/**
* @brief Check whether a serialization::WorkOrder is fully-formed and
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/storage/FileManagerHdfs.hpp
----------------------------------------------------------------------
diff --git a/storage/FileManagerHdfs.hpp b/storage/FileManagerHdfs.hpp
index f47e4a8..a8feb50 100644
--- a/storage/FileManagerHdfs.hpp
+++ b/storage/FileManagerHdfs.hpp
@@ -55,6 +55,15 @@ class FileManagerHdfs : public FileManager {
block_id_counter getMaxUsedBlockCounter(const block_id_domain block_domain) const override;
+ /**
+ * @brief Get the HDFS connector via libhdfs3.
+ *
+ * @return The HDFS connector.
+ **/
+ void* hdfs() {
+ return static_cast<void*>(hdfs_);
+ }
+
private:
// libhdfs3 has an API to release this pointer.
hdfsFS hdfs_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 783ccfe..94e1b67 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -570,6 +570,15 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
return true;
}
+void* StorageManager::hdfs() {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ if (FLAGS_use_hdfs) {
+ return static_cast<FileManagerHdfs*>(file_manager_.get())->hdfs();
+ }
+#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+ return nullptr;
+}
+
vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
serialization::BlockMessage proto;
proto.set_block_id(block);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c575048c/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 42176ee..dc4b7e8 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -41,7 +41,6 @@
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConfig.h"
#include "storage/StorageConstants.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "utility/Macros.hpp"
@@ -395,6 +394,13 @@ class StorageManager {
void pullBlockOrBlob(const block_id block, PullResponse *response) const;
#endif
+ /**
+ * @brief Get the HDFS connector via libhdfs3.
+ *
+ * @return The HDFS connector.
+ **/
+ void* hdfs();
+
private:
struct BlockHandle {
void *block_memory;