You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/04/01 21:36:55 UTC

incubator-quickstep git commit: Measure memory consumption of a running query

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 9d2e0b6ba -> 4b04fb841


Measure memory consumption of a running query

- Methods to get memory footprints of join hash tables (for all of its
  implementations)
- Method to get the memory consumed by the active temporary relations
  during the query execution.
- AggregationOperationState can provide total memory used for a single
  aggregation node.
- QueryContext can aggregate the memory consumed by all temporary data
  structures (e.g. join hash tables, aggregation hash tables).
- QueryManagerBase provides an API to get the total memory footprint of
  a query (only implemented for single node version).
- Mutex protection for hash join tables' list, insert destinations' list and
  aggregation operation states' list in the QueryContext class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4b04fb84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4b04fb84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4b04fb84

Branch: refs/heads/master
Commit: 4b04fb841d7b6763b6b2de0a42e20c75aebf6999
Parents: 9d2e0b6
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Mar 21 14:53:38 2017 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Fri Mar 31 22:34:55 2017 -0500

----------------------------------------------------------------------
 catalog/CatalogRelation.hpp                     | 14 ++++
 query_execution/CMakeLists.txt                  |  2 +
 query_execution/QueryContext.cpp                | 42 ++++++++++++
 query_execution/QueryContext.hpp                | 67 +++++++++++++++++++-
 query_execution/QueryManagerBase.hpp            |  9 +++
 query_execution/QueryManagerSingleNode.cpp      | 24 ++++++-
 query_execution/QueryManagerSingleNode.hpp      | 11 ++++
 storage/AggregationOperationState.cpp           | 21 ++++++
 storage/AggregationOperationState.hpp           |  9 +++
 storage/CollisionFreeVectorTable.hpp            |  4 ++
 storage/HashTable.hpp                           |  5 ++
 storage/HashTableBase.hpp                       |  2 +
 storage/HashTablePool.hpp                       | 13 ++++
 storage/LinearOpenAddressingHashTable.hpp       |  4 ++
 storage/PackedPayloadHashTable.hpp              |  4 ++
 storage/PartitionedHashTablePool.hpp            | 13 ++++
 storage/SeparateChainingHashTable.hpp           |  4 ++
 .../SimpleScalarSeparateChainingHashTable.hpp   |  4 ++
 18 files changed, 249 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/catalog/CatalogRelation.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelation.hpp b/catalog/CatalogRelation.hpp
index e1fd79a..af7f273 100644
--- a/catalog/CatalogRelation.hpp
+++ b/catalog/CatalogRelation.hpp
@@ -396,6 +396,20 @@ class CatalogRelation : public CatalogRelationSchema {
     return statistics_.get();
   }
 
+  /**
+   * @brief Get the size of this relation in bytes.
+   *
+   * @note The output signifies the amount of memory allocated for this
+   *       relation. The true memory footprint of this relation could be lower
+   *       than the output of this method.
+   **/
+  inline std::size_t getRelationSizeBytes() const {
+    SpinSharedMutexSharedLock<false> lock(blocks_mutex_);
+    return blocks_.size() *
+           getDefaultStorageBlockLayout().getDescription().num_slots() *
+           kSlotSizeBytes;
+  }
+
  private:
   // A list of blocks belonged to the relation.
   std::vector<block_id> blocks_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index d81ab44..eeed791 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -216,6 +216,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext
                       quickstep_storage_InsertDestination
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_WindowAggregationOperationState
+                      quickstep_threading_SpinSharedMutex
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros
@@ -281,6 +282,7 @@ if (ENABLE_DISTRIBUTED)
                         tmb)
 endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
+                      quickstep_catalog_CatalogDatabase
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionState

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 71839a7..8ba77ab 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -269,4 +269,46 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
   return proto.IsInitialized();
 }
 
+std::size_t QueryContext::getJoinHashTablesMemoryBytes() const {
+  SpinSharedMutexSharedLock<false> lock(hash_tables_mutex_);
+  std::size_t memory = 0;
+  for (std::size_t hashtable_id = 0;
+       hashtable_id < join_hash_tables_.size();
+       ++hashtable_id) {
+    for (std::size_t partition_num = 0;
+         partition_num < join_hash_tables_[hashtable_id].size();
+         ++partition_num) {
+      if (join_hash_tables_[hashtable_id][partition_num] != nullptr) {
+        memory += join_hash_tables_[hashtable_id][partition_num]
+                      ->getHashTableMemorySizeBytes();
+      }
+    }
+  }
+  return memory;
+}
+
+std::size_t QueryContext::getAggregationStatesMemoryBytes() const {
+  SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
+  std::size_t memory = 0;
+  for (std::size_t agg_state_id = 0;
+       agg_state_id < aggregation_states_.size();
+       ++agg_state_id) {
+    if (aggregation_states_[agg_state_id] != nullptr) {
+      memory += aggregation_states_[agg_state_id]->getMemoryConsumptionBytes();
+    }
+  }
+  return memory;
+}
+
+void QueryContext::getTempRelationIDs(
+    std::vector<relation_id> *temp_relation_ids) const {
+  SpinSharedMutexSharedLock<false> lock(insert_destinations_mutex_);
+  DCHECK(temp_relation_ids != nullptr);
+  for (std::size_t id = 0; id < insert_destinations_.size(); ++id) {
+    InsertDestination *curr_insert_dest = insert_destinations_[id].get();
+    DCHECK(curr_insert_dest != nullptr);
+    temp_relation_ids->emplace_back(curr_insert_dest->getRelation().getID());
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index ed0f99c..ebc9506 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -34,6 +34,7 @@
 #include "storage/HashTable.hpp"
 #include "storage/InsertDestination.hpp"
 #include "storage/WindowAggregationOperationState.hpp"
+#include "threading/SpinSharedMutex.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/Macros.hpp"
 #include "utility/SortConfiguration.hpp"
@@ -172,6 +173,7 @@ class QueryContext {
    * @return True if valid, otherwise false.
    **/
   bool isValidAggregationStateId(const aggregation_state_id id) const {
+    SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
     return id < aggregation_states_.size();
   }
 
@@ -183,6 +185,7 @@ class QueryContext {
    * @return The AggregationOperationState, alreadly created in the constructor.
    **/
   inline AggregationOperationState* getAggregationState(const aggregation_state_id id) {
+    SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
     DCHECK_LT(id, aggregation_states_.size());
     DCHECK(aggregation_states_[id]);
     return aggregation_states_[id].get();
@@ -194,6 +197,7 @@ class QueryContext {
    * @param id The ID of the AggregationOperationState to destroy.
    **/
   inline void destroyAggregationState(const aggregation_state_id id) {
+    SpinSharedMutexExclusiveLock<false> lock(aggregation_states_mutex_);
     DCHECK_LT(id, aggregation_states_.size());
     DCHECK(aggregation_states_[id]);
     aggregation_states_[id].reset(nullptr);
@@ -231,6 +235,7 @@ class QueryContext {
    * @return True if valid, otherwise false.
    **/
   bool isValidInsertDestinationId(const insert_destination_id id) const {
+    SpinSharedMutexSharedLock<false> lock(insert_destinations_mutex_);
     return id != kInvalidInsertDestinationId
         && id >= 0
         && static_cast<std::size_t>(id) < insert_destinations_.size();
@@ -244,6 +249,7 @@ class QueryContext {
    * @return The InsertDestination, alreadly created in the constructor.
    **/
   inline InsertDestination* getInsertDestination(const insert_destination_id id) {
+    SpinSharedMutexSharedLock<false> lock(insert_destinations_mutex_);
     DCHECK_GE(id, 0);
     DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size());
     return insert_destinations_[id].get();
@@ -255,6 +261,7 @@ class QueryContext {
    * @param id The id of the InsertDestination to destroy.
    **/
   inline void destroyInsertDestination(const insert_destination_id id) {
+    SpinSharedMutexExclusiveLock<false> lock(insert_destinations_mutex_);
     DCHECK_GE(id, 0);
     DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size());
     insert_destinations_[id].reset();
@@ -263,12 +270,16 @@ class QueryContext {
   /**
    * @brief Whether the given JoinHashTable id is valid.
    *
+   * @note This is a thread-safe function. Check isValidJoinHashTableIdUnsafe
+   *       for the the unsafe version.
+   *
    * @param id The JoinHashTable id.
    * @param part_id The partition id.
    *
    * @return True if valid, otherwise false.
    **/
   bool isValidJoinHashTableId(const join_hash_table_id id, const partition_id part_id) const {
+    SpinSharedMutexSharedLock<false> lock(hash_tables_mutex_);
     return id < join_hash_tables_.size() &&
            part_id < join_hash_tables_[id].size();
   }
@@ -282,7 +293,8 @@ class QueryContext {
    * @return The JoinHashTable, already created in the constructor.
    **/
   inline JoinHashTable* getJoinHashTable(const join_hash_table_id id, const partition_id part_id) {
-    DCHECK(isValidJoinHashTableId(id, part_id));
+    SpinSharedMutexSharedLock<false> lock(hash_tables_mutex_);
+    DCHECK(isValidJoinHashTableIdUnsafe(id, part_id));
     return join_hash_tables_[id][part_id].get();
   }
 
@@ -293,7 +305,8 @@ class QueryContext {
    * @param part_id The partition id.
    **/
   inline void destroyJoinHashTable(const join_hash_table_id id, const partition_id part_id) {
-    DCHECK(isValidJoinHashTableId(id, part_id));
+    SpinSharedMutexExclusiveLock<false> lock(hash_tables_mutex_);
+    DCHECK(isValidJoinHashTableIdUnsafe(id, part_id));
     join_hash_tables_[id][part_id].reset();
   }
 
@@ -551,7 +564,53 @@ class QueryContext {
     return window_aggregation_states_[id].release();
   }
 
+  /**
+   * @brief Get the total memory footprint of the temporary data structures
+   *        used for query execution (e.g. join hash tables, aggregation hash
+   *        tables) in bytes.
+   **/
+  std::size_t getTempStructuresMemoryBytes() const {
+    return getJoinHashTablesMemoryBytes() + getAggregationStatesMemoryBytes();
+  }
+
+  /**
+   * @brief Get the total memory footprint in bytes of the join hash tables
+   *        used for query execution.
+   **/
+  std::size_t getJoinHashTablesMemoryBytes() const;
+
+  /**
+   * @brief Get the total memory footprint in bytes of the aggregation hash
+   *        tables used for query execution.
+   **/
+  std::size_t getAggregationStatesMemoryBytes() const;
+
+  /**
+   * @brief Get the list of IDs of temporary relations in this query.
+   *
+   * @param temp_relation_ids A pointer to the vector that will store the
+   *        relation IDs.
+   **/
+  void getTempRelationIDs(std::vector<relation_id> *temp_relation_ids) const;
+
  private:
+  /**
+   * @brief Whether the given JoinHashTable id is valid.
+   *
+   * @note This is a thread-unsafe function. Check isValidJoinHashTableId
+   *       for the the thread-safe version.
+   *
+   * @param id The JoinHashTable id.
+   * @param part_id The partition id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidJoinHashTableIdUnsafe(const join_hash_table_id id,
+                                    const partition_id part_id) const {
+    return id < join_hash_tables_.size() &&
+           part_id < join_hash_tables_[id].size();
+  }
+
   // Per hash join, the index is the partition id.
   typedef std::vector<std::unique_ptr<JoinHashTable>> PartitionedJoinHashTables;
 
@@ -568,6 +627,10 @@ class QueryContext {
   std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_;
   std::vector<std::unique_ptr<WindowAggregationOperationState>> window_aggregation_states_;
 
+  mutable SpinSharedMutex<false> hash_tables_mutex_;
+  mutable SpinSharedMutex<false> aggregation_states_mutex_;
+  mutable SpinSharedMutex<false> insert_destinations_mutex_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryContext);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 27fa6dc..b6ed247 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -271,6 +271,15 @@ class QueryManagerBase {
     return query_exec_state_->hasRebuildInitiated(index);
   }
 
+  /**
+   * @brief Get the query's current memory consumption in bytes.
+   *
+   * @note This method returns a best guess consumption, at the time of the call.
+   **/
+  virtual std::size_t getQueryMemoryConsumptionBytes() const {
+    return 0;
+  }
+
   std::unique_ptr<QueryHandle> query_handle_;
 
   const std::size_t query_id_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 237796f..e3f349f 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -24,6 +24,7 @@
 #include <utility>
 #include <vector>
 
+#include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/WorkerMessage.hpp"
 #include "query_optimizer/QueryHandle.hpp"
@@ -58,7 +59,8 @@ QueryManagerSingleNode::QueryManagerSingleNode(
                                       foreman_client_id_,
                                       bus_)),
       workorders_container_(
-          new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
+          new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)),
+      database_(static_cast<const CatalogDatabase&>(*catalog_database)) {
   // Collect all the workorders from all the relational operators in the DAG.
   for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
     if (checkAllBlockingDependenciesMet(index)) {
@@ -193,4 +195,24 @@ void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
   }
 }
 
+std::size_t QueryManagerSingleNode::getQueryMemoryConsumptionBytes() const {
+  const std::size_t temp_relations_memory =
+      getTotalTempRelationMemoryInBytes();
+  const std::size_t temp_data_structures_memory =
+      query_context_->getTempStructuresMemoryBytes();
+  return temp_relations_memory + temp_data_structures_memory;
+}
+
+std::size_t QueryManagerSingleNode::getTotalTempRelationMemoryInBytes() const {
+  std::vector<relation_id> temp_relation_ids;
+  query_context_->getTempRelationIDs(&temp_relation_ids);
+  std::size_t memory = 0;
+  for (std::size_t rel_id : temp_relation_ids) {
+    if (database_.hasRelationWithId(rel_id)) {
+      memory += database_.getRelationById(rel_id)->getRelationSizeBytes();
+    }
+  }
+  return memory;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index dd044a5..6c5e38e 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -36,6 +36,7 @@ namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
+class CatalogDatabase;
 class CatalogDatabaseLite;
 class QueryHandle;
 class StorageManager;
@@ -94,6 +95,8 @@ class QueryManagerSingleNode final : public QueryManagerBase {
     return query_context_.get();
   }
 
+  std::size_t getQueryMemoryConsumptionBytes() const override;
+
  private:
   bool checkNormalExecutionOver(const dag_node_index index) const override {
     return (checkAllDependenciesMet(index) &&
@@ -123,6 +126,12 @@ class QueryManagerSingleNode final : public QueryManagerBase {
   void getRebuildWorkOrders(const dag_node_index index,
                             WorkOrdersContainer *container);
 
+  /**
+   * @brief Get the total memory (in bytes) occupied by temporary relations
+   *        created during the query execution.
+   **/
+  std::size_t getTotalTempRelationMemoryInBytes() const;
+
   const tmb::client_id foreman_client_id_;
 
   StorageManager *storage_manager_;
@@ -132,6 +141,8 @@ class QueryManagerSingleNode final : public QueryManagerBase {
 
   std::unique_ptr<WorkOrdersContainer> workorders_container_;
 
+  const CatalogDatabase &database_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index aea5bf5..9388bdb 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -946,4 +946,25 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
+std::size_t AggregationOperationState::getMemoryConsumptionBytes() const {
+  std::size_t memory = getMemoryConsumptionBytesHelper(distinctify_hashtables_);
+  memory += getMemoryConsumptionBytesHelper(group_by_hashtables_);
+  memory += collision_free_hashtable_->getMemoryConsumptionBytes();
+  memory += group_by_hashtable_pool_->getMemoryConsumptionPoolBytes();
+  memory += partitioned_group_by_hashtable_pool_->getMemoryConsumptionPoolBytes();
+  return memory;
+}
+
+std::size_t AggregationOperationState::getMemoryConsumptionBytesHelper(
+    const std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+        &hashtables) const {
+  std::size_t memory = 0;
+  for (std::size_t ht_id = 0; ht_id < hashtables.size(); ++ht_id) {
+    if (hashtables[ht_id] != nullptr) {
+      memory += hashtables[ht_id]->getMemoryConsumptionBytes();
+    }
+  }
+  return memory;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 6c9690a..e6af494 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -207,6 +207,11 @@ class AggregationOperationState {
    */
   CollisionFreeVectorTable* getCollisionFreeVectorTable() const;
 
+  /**
+   * @brief Get the memory footprint of the AggregationOperationState.
+   **/
+  std::size_t getMemoryConsumptionBytes() const;
+
  private:
   // Check whether partitioned aggregation can be applied.
   bool checkAggregatePartitioned(
@@ -253,6 +258,10 @@ class AggregationOperationState {
 
   void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
 
+  std::size_t getMemoryConsumptionBytesHelper(
+      const std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+          &hashtables) const;
+
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 772d47d..490a5cc 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -169,6 +169,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
                      const std::size_t handle_id,
                      NativeColumnVector *output_cv) const;
 
+  std::size_t getMemoryConsumptionBytes() const override {
+    return memory_size_;
+  }
+
  private:
   inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
     return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 786a9bb..3bf8709 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -541,6 +541,11 @@ class HashTable : public HashTableBase<resizable,
       FunctorT *functor);
 
   /**
+   * @brief Get the size of the hash table at the time this function is called.
+   **/
+  virtual std::size_t getHashTableMemorySizeBytes() const = 0;
+
+  /**
    * @brief Determine the number of entries (key-value pairs) contained in this
    *        HashTable.
    * @note For some HashTable implementations, this is O(1), but for others it

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index b4b6918..8be388a 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -115,6 +115,8 @@ class AggregationStateHashTableBase {
 
   virtual void destroyPayload() = 0;
 
+  virtual std::size_t getMemoryConsumptionBytes() const = 0;
+
  protected:
   AggregationStateHashTableBase() {}
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index df527b8..6dbd7f9 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -123,6 +123,19 @@ class HashTablePool {
     return &hash_tables_;
   }
 
+  /**
+   * @brief Get the total memory consumed by the hash tables in this pool.
+   **/
+  std::size_t getMemoryConsumptionPoolBytes() const {
+    std::size_t memory = 0;
+    for (std::size_t ht_id = 0; ht_id <  hash_tables_.size(); ++ht_id) {
+      if (hash_tables_[ht_id] != nullptr) {
+        memory += hash_tables_[ht_id]->getMemoryConsumptionBytes();
+      }
+    }
+    return memory;
+  }
+
  private:
   AggregationStateHashTableBase* createNewHashTable() {
     return AggregationStateHashTableFactory::CreateResizable(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/LinearOpenAddressingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/LinearOpenAddressingHashTable.hpp b/storage/LinearOpenAddressingHashTable.hpp
index 4953b4d..044ec6f 100644
--- a/storage/LinearOpenAddressingHashTable.hpp
+++ b/storage/LinearOpenAddressingHashTable.hpp
@@ -120,6 +120,10 @@ class LinearOpenAddressingHashTable : public HashTable<ValueT,
   void getAllCompositeKey(const std::vector<TypedValue> &key,
                           std::vector<const ValueT*> *values) const override;
 
+  std::size_t getHashTableMemorySizeBytes() const override {
+     return sizeof(Header) + numEntries() * bucket_size_;
+  }
+
  protected:
   HashTablePutResult putInternal(const TypedValue &key,
                                  const std::size_t variable_key_size,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index b06200d..960d5a7 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -314,6 +314,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
   inline std::size_t forEachCompositeKey(FunctorT *functor,
                                          const std::size_t index) const;
 
+  std::size_t getMemoryConsumptionBytes() const override {
+    return sizeof(Header) + numEntries() * bucket_size_;
+  }
+
  private:
   void resize(const std::size_t extra_buckets,
               const std::size_t extra_variable_storage,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
index 0e62511..bfe8b8b 100644
--- a/storage/PartitionedHashTablePool.hpp
+++ b/storage/PartitionedHashTablePool.hpp
@@ -113,6 +113,19 @@ class PartitionedHashTablePool {
     return num_partitions_;
   }
 
+  /**
+   * @brief Get the total memory consumed by the hash tables in this pool.
+   **/
+  std::size_t getMemoryConsumptionPoolBytes() const {
+    std::size_t memory = 0;
+    for (std::size_t ht_id = 0; ht_id <  hash_tables_.size(); ++ht_id) {
+      if (hash_tables_[ht_id] != nullptr) {
+        memory += hash_tables_[ht_id]->getMemoryConsumptionBytes();
+      }
+    }
+    return memory;
+  }
+
  private:
   void initializeAllHashTables() {
     for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/SeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/SeparateChainingHashTable.hpp b/storage/SeparateChainingHashTable.hpp
index 3ca060d..2403623 100644
--- a/storage/SeparateChainingHashTable.hpp
+++ b/storage/SeparateChainingHashTable.hpp
@@ -114,6 +114,10 @@ class SeparateChainingHashTable : public HashTable<ValueT,
   void getAllCompositeKey(const std::vector<TypedValue> &key,
                           std::vector<const ValueT*> *values) const override;
 
+  std::size_t getHashTableMemorySizeBytes() const override {
+    return sizeof(Header) + numEntries() * bucket_size_;
+  }
+
  protected:
   HashTablePutResult putInternal(const TypedValue &key,
                                  const std::size_t variable_key_size,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b04fb84/storage/SimpleScalarSeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/SimpleScalarSeparateChainingHashTable.hpp b/storage/SimpleScalarSeparateChainingHashTable.hpp
index 8448896..81f2044 100644
--- a/storage/SimpleScalarSeparateChainingHashTable.hpp
+++ b/storage/SimpleScalarSeparateChainingHashTable.hpp
@@ -135,6 +135,10 @@ class SimpleScalarSeparateChainingHashTable : public HashTable<ValueT,
     return getAll(key.front(), values);
   }
 
+  std::size_t getHashTableMemorySizeBytes() const override {
+    return sizeof(Header) + numEntries() * sizeof(Bucket);
+  }
+
  protected:
   HashTablePutResult putInternal(const TypedValue &key,
                                  const std::size_t variable_key_size,