You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/30 23:19:17 UTC
[42/50] [abbrv] incubator-quickstep git commit: Groupby hashtable
pool (#236)
Groupby hashtable pool (#236)
- Created a HashTablePool class for group by clause.
- Each thread can checkout it's own hash table while doing group by
aggregation.
- AggregationOperationState uses one hash table pool per group by
clause.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2ddb67bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2ddb67bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2ddb67bf
Branch: refs/heads/work-order-serialization
Commit: 2ddb67bf438878b572e997caec2397e4c7ac9b8f
Parents: 5bda90e
Author: Harshad Deshmukh <d....@gmail.com>
Authored: Tue May 24 19:04:39 2016 -0500
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Mon May 30 15:47:53 2016 -0700
----------------------------------------------------------------------
.../aggregation/AggregationConcreteHandle.hpp | 105 ++++++++++++
expressions/aggregation/AggregationHandle.hpp | 15 +-
.../aggregation/AggregationHandleAvg.cpp | 9 +
.../aggregation/AggregationHandleAvg.hpp | 4 +
.../aggregation/AggregationHandleCount.cpp | 11 ++
.../aggregation/AggregationHandleCount.hpp | 4 +
.../aggregation/AggregationHandleDistinct.hpp | 7 +
.../aggregation/AggregationHandleMax.cpp | 9 +
.../aggregation/AggregationHandleMax.hpp | 4 +
.../aggregation/AggregationHandleMin.cpp | 9 +
.../aggregation/AggregationHandleMin.hpp | 4 +
.../aggregation/AggregationHandleSum.cpp | 9 +
.../aggregation/AggregationHandleSum.hpp | 4 +
expressions/aggregation/CMakeLists.txt | 2 +
.../tests/AggregationHandleAvg_unittest.cpp | 109 ++++++++++++
.../tests/AggregationHandleCount_unittest.cpp | 126 +++++++++++++-
.../tests/AggregationHandleMax_unittest.cpp | 122 ++++++++++++++
.../tests/AggregationHandleMin_unittest.cpp | 121 ++++++++++++++
.../tests/AggregationHandleSum_unittest.cpp | 124 ++++++++++++++
query_execution/QueryContext.hpp | 2 +-
storage/AggregationOperationState.cpp | 84 ++++++++--
storage/AggregationOperationState.hpp | 4 +
storage/CMakeLists.txt | 10 ++
storage/HashTablePool.hpp | 166 +++++++++++++++++++
storage/StorageManager.cpp | 3 +-
25 files changed, 1045 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 52249f7..0267e17 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -44,6 +44,90 @@ class ValueAccessor;
* @{
*/
+/**
+ * @brief An upserter class for modifying the destination hash table while
+ * merging two group by hash tables.
+ **/
+template <typename HandleT, typename StateT>
+class HashTableStateUpserter {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param handle The aggregation handle being used.
+ * @param source_state The aggregation state in the source aggregation hash
+ * table. The corresponding state (for the same key) in the destination
+ * hash table will be upserted.
+ **/
+ HashTableStateUpserter(const HandleT &handle, const StateT &source_state)
+ : handle_(handle), source_state_(source_state) {}
+
+ /**
+ * @brief The operator for the functor required for the upsert.
+ *
+ * @param destination_state The aggregation state in the aggregation hash
+ * table that is being upserted.
+ **/
+ void operator()(StateT *destination_state) {
+ handle_.mergeStates(source_state_, destination_state);
+ }
+
+ private:
+ const HandleT &handle_;
+ const StateT &source_state_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
+};
+
+/**
+ * @brief A class to support the functor for merging group by hash tables.
+ **/
+template <typename HandleT, typename StateT, typename HashTableT>
+class HashTableMerger {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param handle The Aggregation handle being used.
+ * @param destination_hash_table The destination hash table to which other
+ * hash tables will be merged.
+ **/
+ HashTableMerger(const HandleT &handle,
+ AggregationStateHashTableBase *destination_hash_table)
+ : handle_(handle),
+ destination_hash_table_(
+ static_cast<HashTableT *>(destination_hash_table)) {}
+
+ /**
+ * @brief The operator for the functor.
+ *
+ * @param group_by_key The group by key being merged.
+ * @param source_state The aggregation state for the given key in the source
+ * aggregation hash table.
+ **/
+ inline void operator()(const std::vector<TypedValue> &group_by_key,
+ const StateT &source_state) {
+ const StateT *original_state =
+ destination_hash_table_->getSingleCompositeKey(group_by_key);
+ if (original_state != nullptr) {
+ HashTableStateUpserter<HandleT, StateT> upserter(
+ handle_, source_state);
+ // The CHECK is required as upsertCompositeKey can return false if the
+ // hash table runs out of space during the upsert process. The ideal
+ // solution will be to retry again if the upsert fails.
+ CHECK(destination_hash_table_->upsertCompositeKey(
+ group_by_key, *original_state, &upserter));
+ } else {
+ destination_hash_table_->putCompositeKey(group_by_key, source_state);
+ }
+ }
+
+ private:
+ const HandleT &handle_;
+ HashTableT *destination_hash_table_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
+};
/**
* @brief The helper intermediate subclass of AggregationHandle that provides
@@ -140,6 +224,11 @@ class AggregationConcreteHandle : public AggregationHandle {
return static_cast<const HandleT*>(this)->finalizeHashTableEntry(*group_state);
}
+ template <typename HandleT, typename StateT, typename HashTableT>
+ void mergeGroupByHashTablesHelper(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const;
+
private:
DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
};
@@ -373,6 +462,22 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
}
}
+template <typename HandleT,
+ typename StateT,
+ typename HashTableT>
+void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const {
+ const HandleT &handle = static_cast<const HandleT &>(*this);
+ const HashTableT &source_hash_table_concrete =
+ static_cast<const HashTableT &>(source_hash_table);
+
+ HashTableMerger<HandleT, StateT, HashTableT> merger(handle,
+ destination_hash_table);
+
+ source_hash_table_concrete.forEachCompositeKey(&merger);
+}
+
} // namespace quickstep
#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 625f334..cdebb03 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -276,7 +276,7 @@ class AggregationHandle {
* each GROUP BY group. Later, a second-round aggregation on the distinctify
* hash table will be performed to actually compute the aggregated result for
* each GROUP BY group.
- *
+ *
* In the case of single aggregation where there is no GROUP BY expressions,
* we simply treat it as a special GROUP BY case that the GROUP BY expression
* vector is empty.
@@ -349,6 +349,19 @@ class AggregationHandle {
const AggregationStateHashTableBase &distinctify_hash_table,
AggregationStateHashTableBase *aggregation_hash_table) const = 0;
+ /**
+ * @brief Merge two GROUP BY hash tables in one.
+ *
+ * @note Both the hash tables should have the same structure.
+ *
+ * @param source_hash_table The hash table which will get merged.
+ * @param destination_hash_table The hash table to which we will merge the
+ * other hash table.
+ **/
+ virtual void mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const = 0;
+
protected:
AggregationHandle() {
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index cb0d63d..42a2fb9 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -203,4 +203,13 @@ void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
aggregation_hash_table);
}
+void AggregationHandleAvg::mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const {
+ mergeGroupByHashTablesHelper<AggregationHandleAvg,
+ AggregationStateAvg,
+ AggregationStateHashTable<AggregationStateAvg>>(
+ source_hash_table, destination_hash_table);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 6a94ee6..4ad4b21 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -158,6 +158,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
const AggregationStateHashTableBase &distinctify_hash_table,
AggregationStateHashTableBase *aggregation_hash_table) const override;
+ void mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const override;
+
private:
friend class AggregateFunctionAvg;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 5ece8ba..964b7c2 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -206,6 +206,17 @@ void AggregationHandleCount<count_star, nullable_type>
aggregation_hash_table);
}
+template <bool count_star, bool nullable_type>
+void AggregationHandleCount<count_star, nullable_type>::mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const {
+ mergeGroupByHashTablesHelper<
+ AggregationHandleCount,
+ AggregationStateCount,
+ AggregationStateHashTable<AggregationStateCount>>(source_hash_table,
+ destination_hash_table);
+}
+
// Explicitly instantiate and compile in the different versions of
// AggregationHandleCount we need. Note that we do not compile a version with
// 'count_star == true' and 'nullable_type == true', as that combination is
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6bb4e65..50138b9 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -166,6 +166,10 @@ class AggregationHandleCount : public AggregationConcreteHandle {
const AggregationStateHashTableBase &distinctify_hash_table,
AggregationStateHashTableBase *aggregation_hash_table) const override;
+ void mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const override;
+
private:
friend class AggregateFunctionCount;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 918fdf8..6342c2b 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -109,6 +109,13 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
const AggregationStateHashTableBase &hash_table,
std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ void mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const override {
+ LOG(FATAL)
+ << "AggregationHandleDistinct does not support mergeGroupByHashTables";
+ }
+
private:
DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index 4703657..a7a4a52 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -139,4 +139,13 @@ void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
aggregation_hash_table);
}
+void AggregationHandleMax::mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const {
+ mergeGroupByHashTablesHelper<AggregationHandleMax,
+ AggregationStateMax,
+ AggregationStateHashTable<AggregationStateMax>>(
+ source_hash_table, destination_hash_table);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 8932ef8..5af5a12 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -151,6 +151,10 @@ class AggregationHandleMax : public AggregationConcreteHandle {
const AggregationStateHashTableBase &distinctify_hash_table,
AggregationStateHashTableBase *aggregation_hash_table) const override;
+ void mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const override;
+
private:
friend class AggregateFunctionMax;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index de2709a..ca9b163 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -141,4 +141,13 @@ void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
aggregation_hash_table);
}
+void AggregationHandleMin::mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const {
+ mergeGroupByHashTablesHelper<AggregationHandleMin,
+ AggregationStateMin,
+ AggregationStateHashTable<AggregationStateMin>>(
+ source_hash_table, destination_hash_table);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 4e4c05d..f68bb9d 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -149,6 +149,10 @@ class AggregationHandleMin : public AggregationConcreteHandle {
const AggregationStateHashTableBase &distinctify_hash_table,
AggregationStateHashTableBase *aggregation_hash_table) const override;
+ void mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const override;
+
private:
friend class AggregateFunctionMin;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 14421d2..691ff39 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -190,4 +190,13 @@ void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
aggregation_hash_table);
}
+void AggregationHandleSum::mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const {
+ mergeGroupByHashTablesHelper<AggregationHandleSum,
+ AggregationStateSum,
+ AggregationStateHashTable<AggregationStateSum>>(
+ source_hash_table, destination_hash_table);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index b765243..fdc0884 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -148,6 +148,10 @@ class AggregationHandleSum : public AggregationConcreteHandle {
const AggregationStateHashTableBase &distinctify_hash_table,
AggregationStateHashTableBase *aggregation_hash_table) const override;
+ void mergeGroupByHashTables(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const override;
+
private:
friend class AggregateFunctionSum;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 26cec7f..416c4c6 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -291,6 +291,8 @@ target_link_libraries(AggregationHandle_tests
quickstep_expressions_aggregation_AggregationHandleMin
quickstep_expressions_aggregation_AggregationHandleSum
quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_HashTableBase
+ quickstep_storage_StorageManager
quickstep_types_CharType
quickstep_types_DateOperatorOverloads
quickstep_types_DatetimeIntervalType
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
index d27b54e..fd82cba 100644
--- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
@@ -26,6 +26,7 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationHandleAvg.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DateOperatorOverloads.hpp"
#include "types/DatetimeIntervalType.hpp"
@@ -238,6 +239,7 @@ class AggregationHandleAvgTest : public::testing::Test {
std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
+ std::unique_ptr<StorageManager> storage_manager_;
};
const int AggregationHandleAvgTest::kNumSamples;
@@ -417,4 +419,111 @@ TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
}
+TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
+ const Type &long_non_null_type = LongType::Instance(false);
+ initializeHandle(long_non_null_type);
+ storage_manager_.reset(new StorageManager("./test_avg_data"));
+ std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+ aggregation_handle_avg_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+ std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+ aggregation_handle_avg_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+
+ AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+ destination_hash_table.get());
+
+ AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+ source_hash_table.get());
+
+ AggregationHandleAvg *aggregation_handle_avg_derived =
+ static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
+ // We create three keys: first is present in both the hash tables, second key
+ // is present only in the source hash table while the third key is present
+ // the destination hash table only.
+ std::vector<TypedValue> common_key;
+ common_key.emplace_back(static_cast<std::int64_t>(0));
+ std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+ exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+ exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+ const std::int64_t common_key_source_avg = 355;
+ TypedValue common_key_source_avg_val(common_key_source_avg);
+
+ const std::int64_t common_key_destination_avg = 295;
+ TypedValue common_key_destination_avg_val(common_key_destination_avg);
+
+ const std::int64_t exclusive_key_source_avg = 1;
+ TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg);
+
+ const std::int64_t exclusive_key_destination_avg = 1;
+ TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg);
+
+ std::unique_ptr<AggregationStateAvg> common_key_source_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> common_key_destination_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> exclusive_key_source_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> exclusive_key_destination_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+
+ // Create avg value states for keys.
+ aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(),
+ common_key_source_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ common_key_destination_state.get(), common_key_destination_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ exclusive_key_destination_state.get(), exclusive_key_destination_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ exclusive_key_source_state.get(), exclusive_key_source_avg_val);
+
+ // Add the key-state pairs to the hash tables.
+ source_hash_table_derived->putCompositeKey(common_key,
+ *common_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ common_key, *common_key_destination_state);
+ source_hash_table_derived->putCompositeKey(exclusive_source_key,
+ *exclusive_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ exclusive_destination_key, *exclusive_key_destination_state);
+
+ EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+ EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+ aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
+ destination_hash_table.get());
+
+ EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+ CheckAvgValue<double>(
+ (common_key_destination_avg_val.getLiteral<std::int64_t>() +
+ common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
+ *aggregation_handle_avg_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+ CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_avg_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key)));
+ CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_avg_derived,
+ *(source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key)));
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
index 7bebf6a..bf02523 100644
--- a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
@@ -27,6 +27,7 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationHandleCount.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DoubleType.hpp"
#include "types/FloatType.hpp"
@@ -355,6 +356,7 @@ class AggregationHandleCountTest : public::testing::Test {
std::unique_ptr<AggregationHandle> aggregation_handle_count_;
std::unique_ptr<AggregationState> aggregation_handle_count_state_;
+ std::unique_ptr<StorageManager> storage_manager_;
};
typedef AggregationHandleCountTest AggregationHandleCountDeathTest;
@@ -477,5 +479,127 @@ TEST_F(AggregationHandleCountTest, ResultTypeForArgumentTypeTest) {
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kLong));
}
-} // namespace quickstep
+TEST_F(AggregationHandleCountTest, GroupByTableMergeTestCount) {
+ const Type &long_non_null_type = LongType::Instance(false);
+ initializeHandle(&long_non_null_type);
+ storage_manager_.reset(new StorageManager("./test_count_data"));
+ std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+ aggregation_handle_count_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+ std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+ aggregation_handle_count_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+
+ AggregationStateHashTable<AggregationStateCount> *destination_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateCount> *>(
+ destination_hash_table.get());
+
+ AggregationStateHashTable<AggregationStateCount> *source_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateCount> *>(
+ source_hash_table.get());
+
+ // TODO(harshad) - Use TemplateUtil::CreateBoolInstantiatedInstance to
+ // generate all the combinations of the bool template arguments and test them.
+ AggregationHandleCount<true, false> *aggregation_handle_count_derived =
+ static_cast<AggregationHandleCount<true, false> *>(
+ aggregation_handle_count_.get());
+ // We create three keys: first is present in both the hash tables, second key
+ // is present only in the source hash table while the third key is present
+ // the destination hash table only.
+ std::vector<TypedValue> common_key;
+ common_key.emplace_back(static_cast<std::int64_t>(0));
+ std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+ exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+ exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+ const std::int64_t common_key_source_count = 1;
+ TypedValue common_key_source_count_val(common_key_source_count);
+
+ const std::int64_t common_key_destination_count = 1;
+ TypedValue common_key_destination_count_val(common_key_destination_count);
+
+ const std::int64_t exclusive_key_source_count = 1;
+ TypedValue exclusive_key_source_count_val(exclusive_key_source_count);
+
+ const std::int64_t exclusive_key_destination_count = 1;
+ TypedValue exclusive_key_destination_count_val(exclusive_key_destination_count);
+
+ std::unique_ptr<AggregationStateCount> common_key_source_state(
+ static_cast<AggregationStateCount *>(
+ aggregation_handle_count_->createInitialState()));
+ std::unique_ptr<AggregationStateCount> common_key_destination_state(
+ static_cast<AggregationStateCount *>(
+ aggregation_handle_count_->createInitialState()));
+ std::unique_ptr<AggregationStateCount> exclusive_key_source_state(
+ static_cast<AggregationStateCount *>(
+ aggregation_handle_count_->createInitialState()));
+ std::unique_ptr<AggregationStateCount> exclusive_key_destination_state(
+ static_cast<AggregationStateCount *>(
+ aggregation_handle_count_->createInitialState()));
+
+ // Create count value states for keys.
+ aggregation_handle_count_derived->iterateUnaryInl(common_key_source_state.get(),
+ common_key_source_count_val);
+ std::int64_t actual_val = aggregation_handle_count_->finalize(*common_key_source_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(common_key_source_count_val.getLiteral<std::int64_t>(), actual_val);
+
+ aggregation_handle_count_derived->iterateUnaryInl(
+ common_key_destination_state.get(), common_key_destination_count_val);
+ actual_val = aggregation_handle_count_->finalize(*common_key_destination_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(common_key_destination_count_val.getLiteral<std::int64_t>(), actual_val);
+
+ aggregation_handle_count_derived->iterateUnaryInl(
+ exclusive_key_destination_state.get(), exclusive_key_destination_count_val);
+ actual_val =
+ aggregation_handle_count_->finalize(*exclusive_key_destination_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(exclusive_key_destination_count_val.getLiteral<std::int64_t>(), actual_val);
+
+ aggregation_handle_count_derived->iterateUnaryInl(
+ exclusive_key_source_state.get(), exclusive_key_source_count_val);
+ actual_val = aggregation_handle_count_->finalize(*exclusive_key_source_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(exclusive_key_source_count_val.getLiteral<std::int64_t>(), actual_val);
+
+ // Add the key-state pairs to the hash tables.
+ source_hash_table_derived->putCompositeKey(common_key,
+ *common_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ common_key, *common_key_destination_state);
+ source_hash_table_derived->putCompositeKey(exclusive_source_key,
+ *exclusive_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ exclusive_destination_key, *exclusive_key_destination_state);
+
+ EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+ EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+ aggregation_handle_count_->mergeGroupByHashTables(*source_hash_table,
+ destination_hash_table.get());
+
+ EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+ CheckCountValue(
+ common_key_destination_count_val.getLiteral<std::int64_t>() +
+ common_key_source_count_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_count_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+ CheckCountValue(exclusive_key_destination_count_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_count_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key)));
+ CheckCountValue(exclusive_key_source_count_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_count_derived,
+ *(source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key)));
+}
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
index 027f24b..fc25e91 100644
--- a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
@@ -29,6 +29,8 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationHandleMax.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DatetimeIntervalType.hpp"
#include "types/DatetimeLit.hpp"
@@ -413,6 +415,7 @@ class AggregationHandleMaxTest : public ::testing::Test {
std::unique_ptr<AggregationHandle> aggregation_handle_max_;
std::unique_ptr<AggregationState> aggregation_handle_max_state_;
+ std::unique_ptr<StorageManager> storage_manager_;
};
template <>
@@ -637,4 +640,123 @@ TEST_F(AggregationHandleMaxTest, ResultTypeForArgumentTypeTest) {
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
}
+TEST_F(AggregationHandleMaxTest, GroupByTableMergeTest) {
+ const Type &int_non_null_type = IntType::Instance(false);
+ initializeHandle(int_non_null_type);
+ storage_manager_.reset(new StorageManager("./test_max_data"));
+ std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+ aggregation_handle_max_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &int_non_null_type),
+ 10,
+ storage_manager_.get()));
+ std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+ aggregation_handle_max_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &int_non_null_type),
+ 10,
+ storage_manager_.get()));
+
+ AggregationStateHashTable<AggregationStateMax> *destination_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateMax> *>(
+ destination_hash_table.get());
+
+ AggregationStateHashTable<AggregationStateMax> *source_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateMax> *>(
+ source_hash_table.get());
+
+ AggregationHandleMax *aggregation_handle_max_derived =
+ static_cast<AggregationHandleMax *>(aggregation_handle_max_.get());
+ // We create three keys: first is present in both the hash tables, second key
+ // is present only in the source hash table while the third key is present
+ // the destination hash table only.
+ std::vector<TypedValue> common_key;
+ common_key.emplace_back(0);
+ std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+ exclusive_source_key.emplace_back(1);
+ exclusive_destination_key.emplace_back(2);
+
+ const int common_key_source_max = 3000;
+ TypedValue common_key_source_max_val(common_key_source_max);
+
+ const int common_key_destination_max = 4000;
+ TypedValue common_key_destination_max_val(common_key_destination_max);
+
+ const int exclusive_key_source_max = 100;
+ TypedValue exclusive_key_source_max_val(exclusive_key_source_max);
+
+ const int exclusive_key_destination_max = 200;
+ TypedValue exclusive_key_destination_max_val(exclusive_key_destination_max);
+
+ std::unique_ptr<AggregationStateMax> common_key_source_state(
+ static_cast<AggregationStateMax *>(
+ aggregation_handle_max_->createInitialState()));
+ std::unique_ptr<AggregationStateMax> common_key_destination_state(
+ static_cast<AggregationStateMax *>(
+ aggregation_handle_max_->createInitialState()));
+ std::unique_ptr<AggregationStateMax> exclusive_key_source_state(
+ static_cast<AggregationStateMax *>(
+ aggregation_handle_max_->createInitialState()));
+ std::unique_ptr<AggregationStateMax> exclusive_key_destination_state(
+ static_cast<AggregationStateMax *>(
+ aggregation_handle_max_->createInitialState()));
+
+ // Create max value states for keys.
+ aggregation_handle_max_derived->iterateUnaryInl(common_key_source_state.get(),
+ common_key_source_max_val);
+ int actual_val = aggregation_handle_max_->finalize(*common_key_source_state)
+ .getLiteral<int>();
+ EXPECT_EQ(common_key_source_max_val.getLiteral<int>(), actual_val);
+
+ aggregation_handle_max_derived->iterateUnaryInl(
+ common_key_destination_state.get(), common_key_destination_max_val);
+ actual_val = aggregation_handle_max_->finalize(*common_key_destination_state)
+ .getLiteral<int>();
+ EXPECT_EQ(common_key_destination_max_val.getLiteral<int>(), actual_val);
+
+ aggregation_handle_max_derived->iterateUnaryInl(
+ exclusive_key_destination_state.get(), exclusive_key_destination_max_val);
+ actual_val =
+ aggregation_handle_max_->finalize(*exclusive_key_destination_state)
+ .getLiteral<int>();
+ EXPECT_EQ(exclusive_key_destination_max_val.getLiteral<int>(), actual_val);
+
+ aggregation_handle_max_derived->iterateUnaryInl(
+ exclusive_key_source_state.get(), exclusive_key_source_max_val);
+ actual_val = aggregation_handle_max_->finalize(*exclusive_key_source_state)
+ .getLiteral<int>();
+ EXPECT_EQ(exclusive_key_source_max_val.getLiteral<int>(), actual_val);
+
+ // Add the key-state pairs to the hash tables.
+ source_hash_table_derived->putCompositeKey(common_key,
+ *common_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ common_key, *common_key_destination_state);
+ source_hash_table_derived->putCompositeKey(exclusive_source_key,
+ *exclusive_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ exclusive_destination_key, *exclusive_key_destination_state);
+
+ EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+ EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+ aggregation_handle_max_->mergeGroupByHashTables(*source_hash_table,
+ destination_hash_table.get());
+
+ EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+ CheckMaxValue<int>(
+ common_key_destination_max_val.getLiteral<int>(),
+ *aggregation_handle_max_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+ CheckMaxValue<int>(exclusive_key_destination_max_val.getLiteral<int>(),
+ *aggregation_handle_max_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key)));
+ CheckMaxValue<int>(exclusive_key_source_max_val.getLiteral<int>(),
+ *aggregation_handle_max_derived,
+ *(source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key)));
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
index eb64472..a87ace9 100644
--- a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
@@ -29,6 +29,7 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationHandleMin.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DatetimeIntervalType.hpp"
#include "types/DatetimeLit.hpp"
@@ -411,6 +412,7 @@ class AggregationHandleMinTest : public ::testing::Test {
std::unique_ptr<AggregationHandle> aggregation_handle_min_;
std::unique_ptr<AggregationState> aggregation_handle_min_state_;
+ std::unique_ptr<StorageManager> storage_manager_;
};
template <>
@@ -634,4 +636,123 @@ TEST_F(AggregationHandleMinTest, ResultTypeForArgumentTypeTest) {
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
}
+TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
+ const Type &int_non_null_type = IntType::Instance(false);
+ initializeHandle(int_non_null_type);
+ storage_manager_.reset(new StorageManager("./test_min_data"));
+ std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+ aggregation_handle_min_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &int_non_null_type),
+ 10,
+ storage_manager_.get()));
+ std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+ aggregation_handle_min_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &int_non_null_type),
+ 10,
+ storage_manager_.get()));
+
+ AggregationStateHashTable<AggregationStateMin> *destination_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateMin> *>(
+ destination_hash_table.get());
+
+ AggregationStateHashTable<AggregationStateMin> *source_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateMin> *>(
+ source_hash_table.get());
+
+ AggregationHandleMin *aggregation_handle_min_derived =
+ static_cast<AggregationHandleMin *>(aggregation_handle_min_.get());
+ // We create three keys: first is present in both the hash tables, second key
+ // is present only in the source hash table while the third key is present
+ // the destination hash table only.
+ std::vector<TypedValue> common_key;
+ common_key.emplace_back(0);
+ std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+ exclusive_source_key.emplace_back(1);
+ exclusive_destination_key.emplace_back(2);
+
+ const int common_key_source_min = 3000;
+ TypedValue common_key_source_min_val(common_key_source_min);
+
+ const int common_key_destination_min = 4000;
+ TypedValue common_key_destination_min_val(common_key_destination_min);
+
+ const int exclusive_key_source_min = 100;
+ TypedValue exclusive_key_source_min_val(exclusive_key_source_min);
+
+ const int exclusive_key_destination_min = 200;
+ TypedValue exclusive_key_destination_min_val(exclusive_key_destination_min);
+
+ std::unique_ptr<AggregationStateMin> common_key_source_state(
+ static_cast<AggregationStateMin *>(
+ aggregation_handle_min_->createInitialState()));
+ std::unique_ptr<AggregationStateMin> common_key_destination_state(
+ static_cast<AggregationStateMin *>(
+ aggregation_handle_min_->createInitialState()));
+ std::unique_ptr<AggregationStateMin> exclusive_key_source_state(
+ static_cast<AggregationStateMin *>(
+ aggregation_handle_min_->createInitialState()));
+ std::unique_ptr<AggregationStateMin> exclusive_key_destination_state(
+ static_cast<AggregationStateMin *>(
+ aggregation_handle_min_->createInitialState()));
+
+ // Create min value states for keys.
+ aggregation_handle_min_derived->iterateUnaryInl(common_key_source_state.get(),
+ common_key_source_min_val);
+ int actual_val = aggregation_handle_min_->finalize(*common_key_source_state)
+ .getLiteral<int>();
+ EXPECT_EQ(common_key_source_min_val.getLiteral<int>(), actual_val);
+
+ aggregation_handle_min_derived->iterateUnaryInl(
+ common_key_destination_state.get(), common_key_destination_min_val);
+ actual_val = aggregation_handle_min_->finalize(*common_key_destination_state)
+ .getLiteral<int>();
+ EXPECT_EQ(common_key_destination_min_val.getLiteral<int>(), actual_val);
+
+ aggregation_handle_min_derived->iterateUnaryInl(
+ exclusive_key_destination_state.get(), exclusive_key_destination_min_val);
+ actual_val =
+ aggregation_handle_min_->finalize(*exclusive_key_destination_state)
+ .getLiteral<int>();
+ EXPECT_EQ(exclusive_key_destination_min_val.getLiteral<int>(), actual_val);
+
+ aggregation_handle_min_derived->iterateUnaryInl(
+ exclusive_key_source_state.get(), exclusive_key_source_min_val);
+ actual_val = aggregation_handle_min_->finalize(*exclusive_key_source_state)
+ .getLiteral<int>();
+ EXPECT_EQ(exclusive_key_source_min_val.getLiteral<int>(), actual_val);
+
+ // Add the key-state pairs to the hash tables.
+ source_hash_table_derived->putCompositeKey(common_key,
+ *common_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ common_key, *common_key_destination_state);
+ source_hash_table_derived->putCompositeKey(exclusive_source_key,
+ *exclusive_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ exclusive_destination_key, *exclusive_key_destination_state);
+
+ EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+ EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+ aggregation_handle_min_->mergeGroupByHashTables(*source_hash_table,
+ destination_hash_table.get());
+
+ EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+ CheckMinValue<int>(
+ common_key_source_min_val.getLiteral<int>(),
+ *aggregation_handle_min_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+ CheckMinValue<int>(exclusive_key_destination_min_val.getLiteral<int>(),
+ *aggregation_handle_min_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key)));
+ CheckMinValue<int>(exclusive_key_source_min_val.getLiteral<int>(),
+ *aggregation_handle_min_derived,
+ *(source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key)));
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
index 7dbbeb3..abf8a89 100644
--- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
@@ -26,6 +26,7 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationHandleSum.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DatetimeIntervalType.hpp"
#include "types/DoubleType.hpp"
@@ -237,6 +238,7 @@ class AggregationHandleSumTest : public::testing::Test {
std::unique_ptr<AggregationHandle> aggregation_handle_sum_;
std::unique_ptr<AggregationState> aggregation_handle_sum_state_;
+ std::unique_ptr<StorageManager> storage_manager_;
};
const int AggregationHandleSumTest::kNumSamples;
@@ -425,4 +427,126 @@ TEST_F(AggregationHandleSumTest, ResultTypeForArgumentTypeTest) {
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
}
+TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
+ const Type &long_non_null_type = LongType::Instance(false);
+ initializeHandle(long_non_null_type);
+ storage_manager_.reset(new StorageManager("./test_sum_data"));
+ std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+ aggregation_handle_sum_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+ std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+ aggregation_handle_sum_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+
+ AggregationStateHashTable<AggregationStateSum> *destination_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateSum> *>(
+ destination_hash_table.get());
+
+ AggregationStateHashTable<AggregationStateSum> *source_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateSum> *>(
+ source_hash_table.get());
+
+ AggregationHandleSum *aggregation_handle_sum_derived =
+ static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get());
+ // We create three keys: first is present in both the hash tables, second key
+ // is present only in the source hash table while the third key is present
+ // the destination hash table only.
+ std::vector<TypedValue> common_key;
+ common_key.emplace_back(static_cast<std::int64_t>(0));
+ std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+ exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+ exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+ const std::int64_t common_key_source_sum = 3000;
+ TypedValue common_key_source_sum_val(common_key_source_sum);
+
+ const std::int64_t common_key_destination_sum = 4000;
+ TypedValue common_key_destination_sum_val(common_key_destination_sum);
+
+ const std::int64_t merged_common_key = common_key_source_sum + common_key_destination_sum;
+ TypedValue common_key_merged_val(merged_common_key);
+
+ const std::int64_t exclusive_key_source_sum = 100;
+ TypedValue exclusive_key_source_sum_val(exclusive_key_source_sum);
+
+ const std::int64_t exclusive_key_destination_sum = 200;
+ TypedValue exclusive_key_destination_sum_val(exclusive_key_destination_sum);
+
+ std::unique_ptr<AggregationStateSum> common_key_source_state(
+ static_cast<AggregationStateSum *>(
+ aggregation_handle_sum_->createInitialState()));
+ std::unique_ptr<AggregationStateSum> common_key_destination_state(
+ static_cast<AggregationStateSum *>(
+ aggregation_handle_sum_->createInitialState()));
+ std::unique_ptr<AggregationStateSum> exclusive_key_source_state(
+ static_cast<AggregationStateSum *>(
+ aggregation_handle_sum_->createInitialState()));
+ std::unique_ptr<AggregationStateSum> exclusive_key_destination_state(
+ static_cast<AggregationStateSum *>(
+ aggregation_handle_sum_->createInitialState()));
+
+ // Create sum value states for keys.
+ aggregation_handle_sum_derived->iterateUnaryInl(common_key_source_state.get(),
+ common_key_source_sum_val);
+ std::int64_t actual_val = aggregation_handle_sum_->finalize(*common_key_source_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(common_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+ aggregation_handle_sum_derived->iterateUnaryInl(
+ common_key_destination_state.get(), common_key_destination_sum_val);
+ actual_val = aggregation_handle_sum_->finalize(*common_key_destination_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+ aggregation_handle_sum_derived->iterateUnaryInl(
+ exclusive_key_destination_state.get(), exclusive_key_destination_sum_val);
+ actual_val =
+ aggregation_handle_sum_->finalize(*exclusive_key_destination_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+ aggregation_handle_sum_derived->iterateUnaryInl(
+ exclusive_key_source_state.get(), exclusive_key_source_sum_val);
+ actual_val = aggregation_handle_sum_->finalize(*exclusive_key_source_state)
+ .getLiteral<std::int64_t>();
+ EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+ // Add the key-state pairs to the hash tables.
+ source_hash_table_derived->putCompositeKey(common_key,
+ *common_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ common_key, *common_key_destination_state);
+ source_hash_table_derived->putCompositeKey(exclusive_source_key,
+ *exclusive_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ exclusive_destination_key, *exclusive_key_destination_state);
+
+ EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+ EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+ aggregation_handle_sum_->mergeGroupByHashTables(*source_hash_table,
+ destination_hash_table.get());
+
+ EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+ CheckSumValue<std::int64_t>(
+ common_key_merged_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_sum_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+ CheckSumValue<std::int64_t>(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_sum_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key)));
+ CheckSumValue<std::int64_t>(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_sum_derived,
+ *(source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key)));
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 9440fae..7d5628d 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -216,7 +216,7 @@ class QueryContext {
*
* @param id The BloomFilter id.
*
- * @return The constant pointer to BloomFilter that is
+ * @return The constant pointer to BloomFilter that is
* already created in the constructor.
**/
inline const BloomFilter* getBloomFilter(const bloom_filter_id id) const {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index d209ceb..4878cf1 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -92,11 +92,12 @@ AggregationOperationState::AggregationOperationState(
arguments_.push_back({});
is_distinct_.emplace_back(false);
- group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable(
- hash_table_impl_type,
- group_by_types,
- estimated_num_entries,
- storage_manager_));
+ group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ handles_.back().get(),
+ storage_manager)));
} else {
// Set up each individual aggregate in this operation.
std::vector<const AggregateFunction*>::const_iterator agg_func_it
@@ -124,12 +125,13 @@ AggregationOperationState::AggregationOperationState(
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
if (!group_by_list_.empty()) {
- // Aggregation with GROUP BY: create a HashTable for per-group states.
- group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable(
- hash_table_impl_type,
- group_by_types,
- estimated_num_entries,
- storage_manager_));
+ // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+ group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ handles_.back().get(),
+ storage_manager)));
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
@@ -408,17 +410,17 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
// Call StorageBlock::aggregateGroupBy() to aggregate this block's values
// directly into the (threadsafe) shared global HashTable for this
// aggregate.
- //
- // TODO(shoban): Implement optional code path for using local hash table per
- // block, which can be merged with global hash table for all blocks
- // aggregated on.
+ DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
+ AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+ DCHECK(agg_hash_table != nullptr);
block->aggregateGroupBy(*handles_[agg_idx],
arguments_[agg_idx],
group_by_list_,
predicate_.get(),
- group_by_hashtables_[agg_idx].get(),
+ agg_hash_table,
&reuse_matches,
&reuse_group_by_vectors);
+ group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table);
}
}
}
@@ -447,19 +449,65 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
+ // TODO(harshad) - The merge phase may be slower when each hash table contains
+ // large number of entries. We should find ways in which we can perform a
+ // parallel merge.
+
+ // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+ // e.g. Keep merging entries from smaller hash tables to larger.
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ if (hash_tables->size() > 1) {
+ for (int hash_table_index = 0;
+ hash_table_index < static_cast<int>(hash_tables->size() - 1);
+ ++hash_table_index) {
+ // Merge each hash table to the last hash table.
+ handles_[agg_idx]->mergeGroupByHashTables(
+ (*(*hash_tables)[hash_table_index]),
+ hash_tables->back().get());
+ }
+ }
+ }
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
if (is_distinct_[agg_idx]) {
+ DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
+ auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ // We may have a case where hash_tables is empty, e.g. no input blocks.
+ // However for aggregateOnDistinctifyHashTableForGroupBy to work
+ // correctly, we should create an empty group by hash table.
+ AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+ group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
+ hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ }
+ DCHECK(hash_tables->back() != nullptr);
+ AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
+ DCHECK(agg_hash_table != nullptr);
handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
*distinctify_hashtables_[agg_idx],
- group_by_hashtables_[agg_idx].get());
+ agg_hash_table);
}
+ auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ // We may have a case where hash_tables is empty, e.g. no input blocks.
+ // However for aggregateOnDistinctifyHashTableForGroupBy to work
+ // correctly, we should create an empty group by hash table.
+ AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+ group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
+ hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ }
+ AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
+ DCHECK(agg_hash_table != nullptr);
ColumnVector* agg_result_col =
- handles_[agg_idx]->finalizeHashTable(*group_by_hashtables_[agg_idx],
+ handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
&group_by_keys);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c3a1278..0199749 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -31,6 +31,7 @@
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
#include "storage/HashTableBase.hpp"
+#include "storage/HashTablePool.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
@@ -209,6 +210,9 @@ class AggregationOperationState {
// hash table to prevent multiple lookups.
std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
+ // A vector of group by hash table pools, one for each group by clause.
+ std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
+
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index a3093df..87a5e54 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -187,6 +187,7 @@ add_library(quickstep_storage_HashTable_proto ${storage_HashTable_proto_srcs})
add_library(quickstep_storage_HashTableBase ../empty_src.cpp HashTableBase.hpp)
add_library(quickstep_storage_HashTableFactory HashTableFactory.cpp HashTableFactory.hpp)
add_library(quickstep_storage_HashTableKeyManager ../empty_src.cpp HashTableKeyManager.hpp)
+add_library(quickstep_storage_HashTablePool ../empty_src.cpp HashTablePool.hpp)
add_library(quickstep_storage_IndexSubBlock ../empty_src.cpp IndexSubBlock.hpp)
add_library(quickstep_storage_IndexSubBlockDescriptionFactory ../empty_src.cpp IndexSubBlockDescriptionFactory.hpp)
add_library(quickstep_storage_InsertDestination InsertDestination.cpp InsertDestination.hpp)
@@ -252,6 +253,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
+ quickstep_storage_HashTablePool
quickstep_storage_InsertDestination
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
@@ -662,6 +664,13 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
quickstep_types_TypedValue
quickstep_types_operations_comparisons_ComparisonUtil
quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_HashTablePool
+ glog
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_HashTableBase
+ quickstep_threading_SpinMutex
+ quickstep_utility_Macros
+ quickstep_utility_StringUtil)
target_link_libraries(quickstep_storage_IndexSubBlock
quickstep_catalog_CatalogTypedefs
quickstep_expressions_predicate_PredicateCost
@@ -1012,6 +1021,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
quickstep_storage_HashTableKeyManager
+ quickstep_storage_HashTablePool
quickstep_storage_IndexSubBlock
quickstep_storage_IndexSubBlockDescriptionFactory
quickstep_storage_InsertDestination
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
new file mode 100644
index 0000000..c16d0f1
--- /dev/null
+++ b/storage/HashTablePool.hpp
@@ -0,0 +1,166 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_
+#define QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_
+
+#include <chrono>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "threading/SpinMutex.hpp"
+#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief A pool of HashTables used for a single aggregation handle. This class
+ * has similar functionality as InsertDestination, but for checking out
+ * HashTables. A worker thread can check out a hash table for insertion,
+ * perform the insertions and return the hash table to the pool. While
+ * one thread is using a hash table, no other thread can access it.
+ **/
+class HashTablePool {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param estimated_num_entries The maximum number of entries in a hash table.
+ * @param hash_table_impl_type The type of hash table implementation.
+ * @param group_by_types A vector of pointer of types which form the group by
+ * key.
+ * @param agg_handle The aggregation handle.
+ * @param storage_manager A pointer to the storage manager.
+ *
+ * @note The estimate of number of entries is quite inaccurate at this time.
+ * If we go by the current estimate, each hash table demands much
+ * larger space than it actually needs, which causes the system to
+ * either trigger evictions or worse - run out of memory. To fix this
+ * issue, we divide the estimate by 100. The division will not affect
+ * correctness, however it may allocate some hash tables smaller space
+ * than their requirement, causing them to be resized during build
+ * phase, which has a performance penalty.
+ **/
+ HashTablePool(const std::size_t estimated_num_entries,
+ const HashTableImplType hash_table_impl_type,
+ const std::vector<const Type *> &group_by_types,
+ AggregationHandle *agg_handle,
+ StorageManager *storage_manager)
+ : estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)),
+ hash_table_impl_type_(hash_table_impl_type),
+ group_by_types_(group_by_types),
+ agg_handle_(DCHECK_NOTNULL(agg_handle)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+ /**
+ * @brief Check out a hash table for insertion.
+ *
+ * @return A hash table pointer.
+ **/
+ AggregationStateHashTableBase* getHashTable() {
+ {
+ SpinMutexLock lock(mutex_);
+ if (!hash_tables_.empty()) {
+ std::unique_ptr<AggregationStateHashTableBase> ret_hash_table(
+ std::move(hash_tables_.back()));
+ hash_tables_.pop_back();
+ DCHECK(ret_hash_table != nullptr);
+ return ret_hash_table.release();
+ }
+ }
+ return createNewHashTable();
+ }
+
+ /**
+ * @brief Return a previously checked out hash table.
+ *
+ * @param hash_table A pointer to the checked out hash table.
+ **/
+ void returnHashTable(AggregationStateHashTableBase *hash_table) {
+ SpinMutexLock lock(mutex_);
+ hash_tables_.push_back(
+ std::unique_ptr<AggregationStateHashTableBase>(hash_table));
+ }
+
+ /**
+ * @brief Get all the hash tables from the pool.
+ *
+ * @warning The caller should ensure that this call is made when no hash table
+ * is being checked in or checked out from the pool. In other words
+ * the hash table pool is in read-only state.
+ *
+ * @param All the hash tables in the pool.
+ *
+ **/
+ const std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+ getAllHashTables() {
+ return &hash_tables_;
+ }
+
+ private:
+ AggregationStateHashTableBase* createNewHashTable() {
+ return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
+ group_by_types_,
+ estimated_num_entries_,
+ storage_manager_);
+ }
+
+ inline std::size_t reduceEstimatedCardinality(
+ const std::size_t original_estimate) const {
+ if (original_estimate < kEstimateReductionFactor) {
+ return original_estimate;
+ } else {
+ DCHECK_GT(kEstimateReductionFactor, 0u);
+ return original_estimate / kEstimateReductionFactor;
+ }
+ }
+
+ static constexpr std::size_t kEstimateReductionFactor = 100;
+
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
+
+ const std::size_t estimated_num_entries_;
+ const HashTableImplType hash_table_impl_type_;
+
+ const std::vector<const Type *> group_by_types_;
+
+ AggregationHandle *agg_handle_;
+ StorageManager *storage_manager_;
+
+ SpinMutex mutex_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashTablePool);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index dfc95b8..5d91052 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -183,7 +183,8 @@ StorageManager::~StorageManager() {
it != blocks_.end();
++it) {
if (it->second.block->isDirty()) {
- LOG(WARNING) << "Block with ID " << BlockIdUtil::ToString(it->first)
+ LOG(WARNING) << (it->second.block->isBlob() ? "Blob " : "Block ")
+ << "with ID " << BlockIdUtil::ToString(it->first)
<< " is dirty during StorageManager shutdown";
}
delete it->second.block;