You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/24 19:10:45 UTC

[1/2] incubator-quickstep git commit: Use partitioned aggregation for single-function DISTINCT aggregation. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/hdfs_text_scan c575048c1 -> c9d1f22e7 (forced update)


Use partitioned aggregation for single-function DISTINCT aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/17477f57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/17477f57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/17477f57

Branch: refs/heads/hdfs_text_scan
Commit: 17477f5756e599b4276d6d366c3144cad0be536f
Parents: 4be8e91
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Feb 20 20:05:08 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Feb 24 11:10:56 2017 -0600

----------------------------------------------------------------------
 storage/AggregationOperationState.cpp | 158 ++++++++++++++++++++---------
 storage/AggregationOperationState.hpp |   3 +
 storage/CMakeLists.txt                |   3 +-
 storage/PackedPayloadHashTable.cpp    |  33 +++---
 storage/PackedPayloadHashTable.hpp    |  32 ++++--
 utility/TemplateUtil.hpp              |  74 +++++++++++++-
 6 files changed, 228 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f39b41..eef2c9d 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -19,8 +19,10 @@
 
 #include "storage/AggregationOperationState.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <string>
 #include <utility>
@@ -87,6 +89,8 @@ AggregationOperationState::AggregationOperationState(
       is_aggregate_partitioned_(false),
       predicate_(predicate),
       is_distinct_(std::move(is_distinct)),
+      all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(),
+                                    !is_distinct_.empty(), std::logical_and<bool>())),
       storage_manager_(storage_manager) {
   if (!group_by.empty()) {
     if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
@@ -163,11 +167,6 @@ AggregationOperationState::AggregationOperationState(
     handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
 
     if (!group_by_key_ids_.empty()) {
-      // Aggregation with GROUP BY: combined payload is partially updated in
-      // the presence of DISTINCT.
-      if (*is_distinct_it) {
-        handles_.back()->blockUpdate();
-      }
       group_by_handles.emplace_back(handles_.back().get());
     } else {
       // Aggregation without GROUP BY: create a single global state.
@@ -180,17 +179,32 @@ AggregationOperationState::AggregationOperationState(
       std::vector<const Type *> key_types(group_by_types_);
       key_types.insert(
           key_types.end(), argument_types.begin(), argument_types.end());
+
       // TODO(jianqiao): estimated_num_entries is quite inaccurate for
       // estimating the number of entries in the distinctify hash table.
       // We need to estimate for each distinct aggregation an
       // estimated_num_distinct_keys value during query optimization.
-      distinctify_hashtables_.emplace_back(
-          AggregationStateHashTableFactory::CreateResizable(
-              *distinctify_hash_table_impl_types_it,
-              key_types,
-              estimated_num_entries,
-              {} /* handles */,
-              storage_manager));
+      if (is_aggregate_partitioned_) {
+        DCHECK(partitioned_group_by_hashtable_pool_ == nullptr);
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         FLAGS_num_aggregation_partitions,
+                                         *distinctify_hash_table_impl_types_it,
+                                         key_types,
+                                         {},
+                                         storage_manager));
+      } else {
+        distinctify_hashtables_.emplace_back(
+            AggregationStateHashTableFactory::CreateResizable(
+                *distinctify_hash_table_impl_types_it,
+                key_types,
+                estimated_num_entries,
+                {} /* handles */,
+                storage_manager));
+
+        // Combined payload is partially updated in the presence of DISTINCT.
+        handles_.back()->blockUpdate();
+      }
       ++distinctify_hash_table_impl_types_it;
     } else {
       distinctify_hashtables_.emplace_back(nullptr);
@@ -208,13 +222,24 @@ AggregationOperationState::AggregationOperationState(
               group_by_handles,
               storage_manager));
     } else if (is_aggregate_partitioned_) {
-      partitioned_group_by_hashtable_pool_.reset(
-          new PartitionedHashTablePool(estimated_num_entries,
-                                       FLAGS_num_aggregation_partitions,
-                                       hash_table_impl_type,
-                                       group_by_types_,
-                                       group_by_handles,
-                                       storage_manager));
+      if (all_distinct_) {
+        DCHECK_EQ(1u, group_by_handles.size());
+        DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+        group_by_hashtable_pool_.reset(
+            new HashTablePool(estimated_num_entries,
+                              hash_table_impl_type,
+                              group_by_types_,
+                              group_by_handles,
+                              storage_manager));
+      } else {
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         FLAGS_num_aggregation_partitions,
+                                         hash_table_impl_type,
+                                         group_by_types_,
+                                         group_by_handles,
+                                         storage_manager));
+      }
     } else {
       group_by_hashtable_pool_.reset(
           new HashTablePool(estimated_num_entries,
@@ -362,11 +387,13 @@ bool AggregationOperationState::checkAggregatePartitioned(
   if (aggregate_functions.empty()) {
     return false;
   }
-  // Check if there's a distinct operation involved in any aggregate, if so
-  // the aggregate can't be partitioned.
-  for (auto distinct : is_distinct) {
-    if (distinct) {
-      return false;
+  // If there is only only aggregate function, we allow distinct aggregation.
+  // Otherwise it can't be partitioned with distinct aggregation.
+  if (aggregate_functions.size() > 1) {
+    for (auto distinct : is_distinct) {
+      if (distinct) {
+        return false;
+      }
     }
   }
   // There's no distinct aggregation involved, Check if there's at least one
@@ -384,12 +411,17 @@ bool AggregationOperationState::checkAggregatePartitioned(
     }
   }
 
+  // Currently we always use partitioned aggregation to parallelize distinct
+  // aggregation.
+  if (all_distinct_) {
+    return true;
+  }
+
   // There are GROUP BYs without DISTINCT. Check if the estimated number of
   // groups is large enough to warrant a partitioned aggregation.
   return estimated_num_groups >
          static_cast<std::size_t>(
              FLAGS_partition_aggregation_num_groups_threshold);
-  return false;
 }
 
 std::size_t AggregationOperationState::getNumInitializationPartitions() const {
@@ -599,10 +631,19 @@ void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
       }
 
       ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get());
-      partitioned_group_by_hashtable_pool_->getHashTable(partition)
-          ->upsertValueAccessorCompositeKey(argument_ids_,
-                                            group_by_key_ids_,
-                                            local_mux);
+      if (all_distinct_) {
+        DCHECK_EQ(1u, handles_.size());
+        handles_.front()->insertValueAccessorIntoDistinctifyHashTable(
+            argument_ids_.front(),
+            group_by_key_ids_,
+            local_mux,
+            partitioned_group_by_hashtable_pool_->getHashTable(partition));
+      } else {
+        partitioned_group_by_hashtable_pool_->getHashTable(partition)
+            ->upsertValueAccessorCompositeKey(argument_ids_,
+                                              group_by_key_ids_,
+                                              local_mux);
+      }
     }
   });
 }
@@ -621,13 +662,15 @@ void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
     }
   }
 
-  AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pool_->getHashTable();
+  if (!all_distinct_) {
+    AggregationStateHashTableBase *agg_hash_table =
+        group_by_hashtable_pool_->getHashTable();
 
-  agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
-                                                  group_by_key_ids_,
-                                                  accessor_mux);
-  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+    agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
+                                                    group_by_key_ids_,
+                                                    accessor_mux);
+    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  }
 }
 
 void AggregationOperationState::finalizeAggregate(
@@ -711,10 +754,24 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
 void AggregationOperationState::finalizeHashTableImplPartitioned(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
-  PackedPayloadHashTable *hash_table =
+  PackedPayloadHashTable *partitioned_hash_table =
       static_cast<PackedPayloadHashTable *>(
           partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
 
+  PackedPayloadHashTable *hash_table;
+  if (all_distinct_) {
+    DCHECK_EQ(1u, handles_.size());
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+
+    hash_table = static_cast<PackedPayloadHashTable *>(
+        group_by_hashtable_pool_->getHashTable());
+    handles_.front()->aggregateOnDistinctifyHashTableForGroupBy(
+        *partitioned_hash_table, 0, hash_table);
+    partitioned_hash_table->destroyPayload();
+  } else {
+    hash_table = partitioned_hash_table;
+  }
+
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
@@ -790,19 +847,24 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
   // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
   // e.g. Keep merging entries from smaller hash tables to larger.
 
-  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  DCHECK(hash_tables != nullptr);
-  if (hash_tables->empty()) {
-    return;
-  }
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr;
 
-  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
-      hash_tables->back().release());
-  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
-    std::unique_ptr<AggregationStateHashTableBase> hash_table(
-        hash_tables->at(i).release());
-    mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
-    hash_table->destroyPayload();
+  if (all_distinct_) {
+    final_hash_table_ptr.reset(group_by_hashtable_pool_->getHashTable());
+  } else {
+    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+    DCHECK(hash_tables != nullptr);
+    if (hash_tables->empty()) {
+      return;
+    }
+
+    final_hash_table_ptr.reset(hash_tables->back().release());
+    for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+      std::unique_ptr<AggregationStateHashTableBase> hash_table(
+          hash_tables->at(i).release());
+      mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
+      hash_table->destroyPayload();
+    }
   }
 
   PackedPayloadHashTable *final_hash_table =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c8930ee..6c9690a 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -273,6 +273,9 @@ class AggregationOperationState {
   // arguments.
   std::vector<bool> is_distinct_;
 
+  // A flag indicating whether all aggregate functions are DISTINCT aggregations.
+  const bool all_distinct_;
+
   // Non-trivial group-by/argument expressions that need to be evaluated.
   std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 293be17..8b68150 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -817,7 +817,8 @@ target_link_libraries(quickstep_storage_PackedPayloadHashTable
                       quickstep_utility_Alignment
                       quickstep_utility_HashPair
                       quickstep_utility_Macros
-                      quickstep_utility_PrimeNumber)
+                      quickstep_utility_PrimeNumber
+                      quickstep_utility_TemplateUtil)
 target_link_libraries(quickstep_storage_PartitionedHashTablePool
                       glog
                       quickstep_storage_HashTableBase

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index bf5eaee..3d672f2 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -40,6 +40,7 @@
 #include "utility/Alignment.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PrimeNumber.hpp"
+#include "utility/TemplateUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -234,23 +235,31 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
   ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
   ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
 
+  const bool has_derived_accessor = (derived_accessor != nullptr);
+
   base_accessor->beginIterationVirtual();
-  if (derived_accessor == nullptr) {
-    return upsertValueAccessorCompositeKeyInternal<false>(
-        argument_ids,
-        key_attr_ids,
-        base_accessor,
-        nullptr);
-  } else {
+  if (has_derived_accessor) {
     DCHECK(derived_accessor->getImplementationType()
                == ValueAccessor::Implementation::kColumnVectors);
     derived_accessor->beginIterationVirtual();
-    return upsertValueAccessorCompositeKeyInternal<true>(
-        argument_ids,
-        key_attr_ids,
-        base_accessor,
-        static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
   }
+
+  return InvokeOnBools(
+      has_derived_accessor,
+      handles_.empty(),
+      !all_keys_inline_,
+      [&](auto use_two_accessors,  // NOLINT(build/c++11)
+          auto key_only,
+          auto has_variable_size) -> bool {
+    return upsertValueAccessorCompositeKeyInternal<
+        decltype(use_two_accessors)::value,
+        decltype(key_only)::value,
+        decltype(has_variable_size)::value>(
+            argument_ids,
+            key_attr_ids,
+            base_accessor,
+            static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
+  });
 }
 
 void PackedPayloadHashTable::resize(const std::size_t extra_buckets,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index f87a1de..c49bdb4 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -20,10 +20,12 @@
 #ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
 #define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
 
+#include <algorithm>
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
+#include <functional>
 #include <limits>
 #include <vector>
 
@@ -336,11 +338,12 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
                                        const std::uint8_t **value,
                                        std::size_t *entry_num) const;
 
+  template <bool key_only = false>
   inline std::uint8_t* upsertCompositeKeyInternal(
       const std::vector<TypedValue> &key,
       const std::size_t variable_key_size);
 
-  template <bool use_two_accessors>
+  template <bool use_two_accessors, bool key_only, bool has_variable_size>
   inline bool upsertValueAccessorCompositeKeyInternal(
       const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
       const std::vector<MultiSourceAttributeId> &key_ids,
@@ -355,8 +358,9 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
   // comes from a HashTableKeyManager, and is set by the constructor of a
   // subclass of HashTable.
   inline void setKeyInline(const std::vector<bool> *key_inline) {
-    scalar_key_inline_ = key_inline->front();
     key_inline_ = key_inline;
+    all_keys_inline_ = std::accumulate(key_inline_->begin(), key_inline_->end(),
+                                       true, std::logical_and<bool>());
   }
 
   inline static std::size_t ComputeTotalPayloadSize(
@@ -407,7 +411,7 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
   // Information about whether key components are stored inline or in a
   // separate variable-length storage region. This is usually determined by a
   // HashTableKeyManager and set by calling setKeyInline().
-  bool scalar_key_inline_;
+  bool all_keys_inline_;
   const std::vector<bool> *key_inline_;
 
   const std::size_t num_handles_;
@@ -763,7 +767,7 @@ inline bool PackedPayloadHashTable::upsertCompositeKey(
   }
 }
 
-
+template <bool key_only>
 inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
     const std::vector<TypedValue> &key,
     const std::size_t variable_key_size) {
@@ -809,7 +813,9 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
   writeCompositeKeyToBucket(key, hash_code, bucket);
 
   std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
-  std::memcpy(value, init_payload_, this->total_payload_size_);
+  if (!key_only) {
+    std::memcpy(value, init_payload_, this->total_payload_size_);
+  }
 
   // Update the previous chaing pointer to point to the new bucket.
   pending_chain_ptr->store(pending_chain_ptr_finish_value,
@@ -819,13 +825,13 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
   return value;
 }
 
-template <bool use_two_accessors>
+template <bool use_two_accessors, bool key_only, bool has_variable_size>
 inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
     const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
     const std::vector<MultiSourceAttributeId> &key_ids,
     ValueAccessor *base_accessor,
     ColumnVectorsValueAccessor *derived_accessor) {
-  std::size_t variable_size;
+  std::size_t variable_size = 0;
   std::vector<TypedValue> key_vector;
   key_vector.resize(key_ids.size());
 
@@ -848,13 +854,17 @@ inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
                   &key_vector)) {
             continue;
           }
-          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
-          std::uint8_t *value = this->upsertCompositeKeyInternal(
-              key_vector, variable_size);
+          if (has_variable_size) {
+            variable_size =
+                this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+          }
+          std::uint8_t *value =
+              this->template upsertCompositeKeyInternal<key_only>(
+                  key_vector, variable_size);
           if (value == nullptr) {
             continuing = true;
             break;
-          } else {
+          } else if (!key_only) {
             SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
             for (unsigned int k = 0; k < num_handles_; ++k) {
               const auto &ids = argument_ids[k];

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/utility/TemplateUtil.hpp
----------------------------------------------------------------------
diff --git a/utility/TemplateUtil.hpp b/utility/TemplateUtil.hpp
index 33e4f42..dfae8e4 100644
--- a/utility/TemplateUtil.hpp
+++ b/utility/TemplateUtil.hpp
@@ -30,6 +30,8 @@ namespace quickstep {
  *  @{
  */
 
+namespace template_util_inner {
+
 /**
  * @brief Represents a compile-time sequence of integers.
  *
@@ -58,7 +60,6 @@ struct MakeSequence<0, S...> {
   typedef Sequence<S...> type;
 };
 
-
 /**
  * @brief Final step of CreateBoolInstantiatedInstance. Now all bool_values are
  *        ready. Instantiate the template and create (i.e. new) an instance.
@@ -72,6 +73,42 @@ inline ReturnT* CreateBoolInstantiatedInstanceInner(Tuple &&args,
 }
 
 /**
+ * @brief Invoke the functor with the compile-time bool values wrapped as
+ *        integral_constant types.
+ */
+template <typename FunctorT, bool ...bool_values>
+inline auto InvokeOnBoolsInner(const FunctorT &functor) {
+  return functor(std::integral_constant<bool, bool_values>()...);
+}
+
+/**
+ * @brief Recursive dispatching.
+ */
+template <typename FunctorT, bool ...bool_values, typename ...Bools>
+inline auto InvokeOnBoolsInner(const FunctorT &functor,
+                               const bool tparam,
+                               const Bools ...rest_params) {
+  if (tparam) {
+    return InvokeOnBoolsInner<FunctorT, bool_values..., true>(
+        functor, rest_params...);
+  } else {
+    return InvokeOnBoolsInner<FunctorT, bool_values..., false>(
+        functor, rest_params...);
+  }
+}
+
+/**
+ * @brief Move the functor to the first position in argument list.
+ */
+template <std::size_t last, std::size_t ...i, typename TupleT>
+inline auto InvokeOnBoolsInner(TupleT &&args, Sequence<i...> &&indices) {
+  return InvokeOnBoolsInner(std::get<last>(std::forward<TupleT>(args)),
+                            std::get<i>(std::forward<TupleT>(args))...);
+}
+
+}  // namespace template_util_inner
+
+/**
  * @brief Edge case of the recursive CreateBoolInstantiatedInstance function
  *        when all bool variables have been branched and replaced with compile-time
  *        bool constants.
@@ -85,8 +122,10 @@ inline ReturnT* CreateBoolInstantiatedInstance(Tuple &&args) {
   // for the tuple, so that the tuple can be unpacked as a sequence of constructor
   // parameters in CreateBoolInstantiatedInstanceInner.
   constexpr std::size_t n_args = std::tuple_size<Tuple>::value;
-  return CreateBoolInstantiatedInstanceInner<T, ReturnT, bool_values...>(
-      std::forward<Tuple>(args), typename MakeSequence<n_args>::type());
+  return template_util_inner::CreateBoolInstantiatedInstanceInner<
+      T, ReturnT, bool_values...>(
+          std::forward<Tuple>(args),
+          typename template_util_inner::MakeSequence<n_args>::type());
 }
 
 /**
@@ -160,6 +199,35 @@ inline ReturnT* CreateBoolInstantiatedInstance(Tuple &&args,
   }
 }
 
+/**
+ * @brief A helper function for bool branched template specialization.
+ *
+ * Usage example:
+ * --
+ * bool c1 = true, c2 = false;
+ *
+ * InvokeOnBools(
+ *     c1, c2,
+ *     [&](auto c1, auto c2) -> SomeBaseClass* {
+ *   using T1 = decltype(c1);  // T1 == std::true_type
+ *   using T2 = decltype(c2);  // T2 == std::false_type
+ *
+ *   constexpr bool cv1 = T1::value;  // cv1 == true
+ *   constexpr bool cv2 = T2::value;  // cv2 == false
+ *
+ *   SomeFunction<cv1, cv2>(...);
+ *   return new SomeClass<cv1, cv2>(...);
+ * });
+ * --
+ */
+template <typename ...ArgTypes>
+inline auto InvokeOnBools(ArgTypes ...args) {
+  constexpr std::size_t last = sizeof...(args) - 1;
+  return template_util_inner::InvokeOnBoolsInner<last>(
+      std::forward_as_tuple(args...),
+      typename template_util_inner::MakeSequence<last>::type());
+}
+
 /** @} */
 
 }  // namespace quickstep


[2/2] incubator-quickstep git commit: Added HDFS Support For TextScanWorkOrder.

Posted by zu...@apache.org.
Added HDFS Support For TextScanWorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c9d1f22e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c9d1f22e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c9d1f22e

Branch: refs/heads/hdfs_text_scan
Commit: c9d1f22e75a016f47d3f241a1b78efcbce0e5d52
Parents: 17477f5
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 6 14:42:42 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Feb 24 11:10:40 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Executor.cpp                    |   2 +-
 query_execution/CMakeLists.txt                  |   1 +
 query_execution/Shiftboss.cpp                   |   3 +-
 query_execution/Shiftboss.hpp                   |  14 +++
 .../DistributedExecutionGeneratorTestRunner.cpp |   3 +-
 relational_operators/CMakeLists.txt             |   5 +
 relational_operators/TextScanOperator.cpp       | 107 ++++++++++++++++---
 relational_operators/TextScanOperator.hpp       |  10 +-
 relational_operators/WorkOrderFactory.cpp       |   6 +-
 relational_operators/WorkOrderFactory.hpp       |   4 +-
 storage/FileManagerHdfs.hpp                     |   9 ++
 storage/StorageManager.cpp                      |   9 ++
 storage/StorageManager.hpp                      |   8 +-
 13 files changed, 160 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 1d03579..3485298 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -76,7 +76,7 @@ void Executor::init() {
   data_exchanger_.start();
 
   shiftboss_ =
-      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
   shiftboss_->start();
 
   for (const auto &worker : workers_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 50bf694..12d6be0 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -295,6 +295,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_WorkerMessage
                         quickstep_relationaloperators_RebuildWorkOrder
                         quickstep_relationaloperators_WorkOrderFactory
+                        quickstep_storage_Flags
                         quickstep_storage_InsertDestination
                         quickstep_storage_StorageBlock
                         quickstep_storage_StorageManager

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2ed42d0..bae5205 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -104,7 +104,8 @@ void Shiftboss::run() {
                                                                        query_contexts_[query_id].get(),
                                                                        storage_manager_,
                                                                        shiftboss_client_id_,
-                                                                       bus_);
+                                                                       bus_,
+                                                                       hdfs_);
 
         unique_ptr<WorkerMessage> worker_message(
             WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 6538d48..e0b4312 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -30,6 +30,8 @@
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/WorkerDirectory.hpp"
+#include "storage/Flags.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
 #include "threading/Thread.hpp"
 #include "utility/Macros.hpp"
 
@@ -64,6 +66,7 @@ class Shiftboss : public Thread {
    * @param bus A pointer to the TMB.
    * @param storage_manager The StorageManager to use.
    * @param workers A pointer to the WorkerDirectory.
+   * @param hdfs The HDFS connector via libhdfs3.
    * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
    *
    * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
@@ -72,10 +75,12 @@ class Shiftboss : public Thread {
   Shiftboss(tmb::MessageBus *bus,
             StorageManager *storage_manager,
             WorkerDirectory *workers,
+            void *hdfs,
             const int cpu_id = -1)
       : bus_(DCHECK_NOTNULL(bus)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
         workers_(DCHECK_NOTNULL(workers)),
+        hdfs_(hdfs),
         cpu_id_(cpu_id),
         shiftboss_client_id_(tmb::kClientIdNone),
         foreman_client_id_(tmb::kClientIdNone),
@@ -84,6 +89,12 @@ class Shiftboss : public Thread {
     // Check to have at least one Worker.
     DCHECK_GT(workers->getNumWorkers(), 0u);
 
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    if (FLAGS_use_hdfs) {
+      CHECK(hdfs_);
+    }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
     shiftboss_client_id_ = bus_->Connect();
     LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
     DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
@@ -228,6 +239,9 @@ class Shiftboss : public Thread {
   StorageManager *storage_manager_;
   WorkerDirectory *workers_;
 
+  // Not owned.
+  void *hdfs_;
+
   // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
   const int cpu_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2e18467..c9f5a10 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -128,7 +128,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
     data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
-        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get(),
+                               storage_manager->hdfs()));
 
     storage_managers_.push_back(move(storage_manager));
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 457d58a..1693ec2 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -491,6 +491,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_RelationalOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_Flags
                       quickstep_storage_InsertDestination
                       quickstep_types_Type
                       quickstep_types_TypedValue
@@ -500,6 +501,10 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_utility_Glob
                       quickstep_utility_Macros
                       tmb)
+if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
+  target_link_libraries(quickstep_relationaloperators_TextScanOperator
+                        ${LIBHDFS3_LIBRARIES})
+endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
 target_link_libraries(quickstep_relationaloperators_UpdateOperator
                       glog
                       quickstep_catalog_CatalogRelation

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 0a83a85..6333813 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -41,7 +41,14 @@
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/Flags.hpp"
 #include "storage/InsertDestination.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include <hdfs/hdfs.h>
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -205,14 +212,56 @@ void TextScanWorkOrder::execute() {
 
   std::vector<TypedValue> vector_tuple_returned;
   constexpr std::size_t kSmallBufferSize = 0x4000;
-  char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
-
-  // Read text segment into buffer.
-  FILE *file = std::fopen(filename_.c_str(), "rb");
-  std::fseek(file, text_offset_, SEEK_SET);
-  std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file);
-  if (bytes_read != text_segment_size_) {
-    throw TextScanReadError(filename_);
+  const size_t buffer_size = std::max(text_segment_size_, kSmallBufferSize);
+  char *buffer = reinterpret_cast<char *>(malloc(buffer_size));
+
+  bool use_hdfs = false;
+  std::size_t bytes_read;
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  hdfsFS hdfs = nullptr;
+  hdfsFile file_handle = nullptr;
+
+  if (FLAGS_use_hdfs) {
+    use_hdfs = true;
+    hdfs = static_cast<hdfsFS>(hdfs_);
+
+    file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, buffer_size,
+                               0 /* default replication */, 0 /* default block size */);
+    if (file_handle == nullptr) {
+      LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno);
+      return;
+    }
+
+    if (hdfsSeek(hdfs, file_handle, text_offset_)) {
+      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+      hdfsCloseFile(hdfs, file_handle);
+      return;
+    }
+
+    bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_);
+    if (bytes_read != text_segment_size_) {
+      hdfsCloseFile(hdfs, file_handle);
+      throw TextScanReadError(filename_);
+    }
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
+  FILE *file = nullptr;
+  if (!use_hdfs) {
+    // Avoid unused-private-field warning.
+    (void) hdfs_;
+
+    // Read text segment into buffer.
+    file = std::fopen(filename_.c_str(), "rb");
+    std::fseek(file, text_offset_, SEEK_SET);
+    bytes_read = std::fread(buffer, 1, text_segment_size_, file);
+
+    if (bytes_read != text_segment_size_) {
+      std::fclose(file);
+      throw TextScanReadError(filename_);
+    }
   }
 
   // Locate the first newline character.
@@ -266,10 +315,36 @@ void TextScanWorkOrder::execute() {
   // that the last tuple is very small / very large.
   std::size_t dynamic_read_size = 1024;
   std::string row_string;
-  std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET);
+
+  const size_t dynamic_read_offset = text_offset_ + (end_ptr - buffer);
+  if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) {
+      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+      hdfsCloseFile(hdfs, file_handle);
+      return;
+    }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  } else {
+    std::fseek(file, dynamic_read_offset, SEEK_SET);
+  }
+
   bool has_reached_end = false;
   do {
-    bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+    if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+      bytes_read = hdfsRead(hdfs, file_handle, buffer, dynamic_read_size);
+
+      // Read again when acrossing the HDFS block boundary.
+      if (bytes_read != dynamic_read_size) {
+        bytes_read += hdfsRead(hdfs, file_handle, buffer + bytes_read, dynamic_read_size - bytes_read);
+      }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    } else {
+      bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+    }
+
     std::size_t bytes_to_copy = bytes_read;
 
     for (std::size_t i = 0; i < bytes_read; ++i) {
@@ -303,7 +378,14 @@ void TextScanWorkOrder::execute() {
     }
   }
 
-  std::fclose(file);
+  if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    hdfsCloseFile(hdfs, file_handle);
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  } else {
+    std::fclose(file);
+  }
+
   free(buffer);
 
   // Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
@@ -334,7 +416,8 @@ void TextScanWorkOrder::execute() {
 }
 
 std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
-                                  const CatalogRelationSchema &relation, bool *is_faulty) const {
+                                                    const CatalogRelationSchema &relation,
+                                                    bool *is_faulty) const {
   std::vector<TypedValue> attribute_values;
   // Always assume current row is not faulty initially.
   *is_faulty = false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index eada190..59821fc 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -189,6 +189,7 @@ class TextScanWorkOrder : public WorkOrder {
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
    * @param output_destination The InsertDestination to insert tuples.
+   * @param hdfs The HDFS connector via libhdfs3.
    **/
   TextScanWorkOrder(
       const std::size_t query_id,
@@ -197,14 +198,16 @@ class TextScanWorkOrder : public WorkOrder {
       const std::size_t text_segment_size,
       const char field_terminator,
       const bool process_escape_sequences,
-      InsertDestination *output_destination)
+      InsertDestination *output_destination,
+      void *hdfs = nullptr)
       : WorkOrder(query_id),
         filename_(filename),
         text_offset_(text_offset),
         text_segment_size_(text_segment_size),
         field_terminator_(field_terminator),
         process_escape_sequences_(process_escape_sequences),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        hdfs_(hdfs) {}
 
   ~TextScanWorkOrder() override {}
 
@@ -332,6 +335,9 @@ class TextScanWorkOrder : public WorkOrder {
 
   InsertDestination *output_destination_;
 
+  // Not owned.
+  void *hdfs_;
+
   DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index d2c8251..cf0ee74 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -75,7 +75,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
                                                   QueryContext *query_context,
                                                   StorageManager *storage_manager,
                                                   const tmb::client_id shiftboss_client_id,
-                                                  tmb::MessageBus *bus) {
+                                                  tmb::MessageBus *bus,
+                                                  void *hdfs) {
   DCHECK(query_context != nullptr);
   DCHECK(ProtoIsValid(proto, *catalog_database, *query_context))
       << "Attempted to create WorkOrder from an invalid proto description:\n"
@@ -473,7 +474,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
           proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)));
+              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+          hdfs);
     }
     case serialization::UPDATE: {
       LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/WorkOrderFactory.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp
index acf3855..ece687b 100644
--- a/relational_operators/WorkOrderFactory.hpp
+++ b/relational_operators/WorkOrderFactory.hpp
@@ -59,6 +59,7 @@ class WorkOrderFactory {
    * @param storage_manager The StorageManager to use.
    * @param shiftboss_client_id The TMB client id of Shiftboss.
    * @param bus A pointer to the TMB.
+   * @param hdfs The HDFS connector via libhdfs3.
    *
    * @return A new WorkOrder reconstructed from the supplied Protocol Buffer.
    **/
@@ -68,7 +69,8 @@ class WorkOrderFactory {
                                          QueryContext *query_context,
                                          StorageManager *storage_manager,
                                          const tmb::client_id shiftboss_client_id,
-                                         tmb::MessageBus *bus);
+                                         tmb::MessageBus *bus,
+                                         void *hdfs);
 
   /**
    * @brief Check whether a serialization::WorkOrder is fully-formed and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/storage/FileManagerHdfs.hpp
----------------------------------------------------------------------
diff --git a/storage/FileManagerHdfs.hpp b/storage/FileManagerHdfs.hpp
index f47e4a8..a8feb50 100644
--- a/storage/FileManagerHdfs.hpp
+++ b/storage/FileManagerHdfs.hpp
@@ -55,6 +55,15 @@ class FileManagerHdfs : public FileManager {
 
   block_id_counter getMaxUsedBlockCounter(const block_id_domain block_domain) const override;
 
+  /**
+   * @brief Get the HDFS connector via libhdfs3.
+   *
+   * @return The HDFS connector.
+   **/
+  void* hdfs() {
+    return static_cast<void*>(hdfs_);
+  }
+
  private:
   // libhdfs3 has an API to release this pointer.
   hdfsFS hdfs_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 783ccfe..94e1b67 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -570,6 +570,15 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
   return true;
 }
 
+void* StorageManager::hdfs() {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  if (FLAGS_use_hdfs) {
+    return static_cast<FileManagerHdfs*>(file_manager_.get())->hdfs();
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  return nullptr;
+}
+
 vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
   serialization::BlockMessage proto;
   proto.set_block_id(block);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 42176ee..dc4b7e8 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -41,7 +41,6 @@
 #include "storage/StorageBlob.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConfig.h"
 #include "storage/StorageConstants.hpp"
 #include "threading/SpinSharedMutex.hpp"
 #include "utility/Macros.hpp"
@@ -395,6 +394,13 @@ class StorageManager {
   void pullBlockOrBlob(const block_id block, PullResponse *response) const;
 #endif
 
+  /**
+   * @brief Get the HDFS connector via libhdfs3.
+   *
+   * @return The HDFS connector.
+   **/
+  void* hdfs();
+
  private:
   struct BlockHandle {
     void *block_memory;