You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/08 19:43:06 UTC
[2/3] incubator-quickstep git commit: Fuse Aggregate with
LeftOuterJoin to accelerate evaluation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..33321d3 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ set_gflags_lib_name ()
# Declare micro-libs:
add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
+add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+ BuildAggregationExistenceMapOperator.cpp
+ BuildAggregationExistenceMapOperator.hpp)
add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
@@ -95,6 +98,27 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
quickstep_utility_lipfilter_LIPFilterAdaptiveProber
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
+target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_AggregationOperationState
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_BuildHashOperator
glog
quickstep_catalog_CatalogRelation
@@ -552,6 +576,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_BuildAggregationExistenceMapOperator
quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_CreateIndexOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0b34908..2d85312 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -412,12 +412,23 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
}
}
+BarrieredReadWriteConcurrentBitVector* AggregationOperationState
+ ::getExistenceMap() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeVectorTable *>(
+ collision_free_hashtable_.get())->getExistenceMap();
+ } else {
+ LOG(FATAL) << "AggregationOperationState::getExistenceMap() "
+ << "is not supported by this aggregation";
+ }
+}
+
void AggregationOperationState::initialize(const std::size_t partition_id) {
if (is_aggregate_collision_free_) {
static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->initialize(partition_id);
} else {
- LOG(FATAL) << "AggregationOperationState::initializeState() "
+ LOG(FATAL) << "AggregationOperationState::initialize() "
<< "is not supported by this aggregation";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 13ee377..23f23b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -39,6 +39,7 @@ namespace quickstep {
namespace serialization { class AggregationOperationState; }
class AggregateFunction;
+class BarrieredReadWriteConcurrentBitVector;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
@@ -167,6 +168,8 @@ class AggregationOperationState {
**/
std::size_t getNumFinalizationPartitions() const;
+ BarrieredReadWriteConcurrentBitVector* getExistenceMap() const;
+
/**
* @brief Initialize the specified partition of this aggregation.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 4f3e238..cd76854 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -104,6 +104,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
return existence_map_->onesCountInRange(start_position, end_position);
}
+ inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
+ return existence_map_.get();
+ }
+
/**
* @brief Initialize the specified partition of this aggregation table.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 6ad0567..7c4c238 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -20,9 +20,8 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
-#include <atomic>
+#include <cstddef>
#include <cstdint>
-#include <cstring>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -31,6 +30,7 @@
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
#include "utility/Macros.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
@@ -64,14 +64,10 @@ class BitVectorExactFilter : public LIPFilter {
: LIPFilter(LIPFilterType::kBitVectorExactFilter),
min_value_(static_cast<CppType>(min_value)),
max_value_(static_cast<CppType>(max_value)),
- bit_array_(GetByteSize(max_value - min_value + 1)) {
+ bit_vector_(max_value - min_value + 1) {
DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
DCHECK_GE(max_value_, min_value_);
-
- std::memset(bit_array_.data(),
- 0x0,
- sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
}
void insertValueAccessor(ValueAccessor *accessor,
@@ -109,13 +105,6 @@ class BitVectorExactFilter : public LIPFilter {
private:
/**
- * @brief Round up bit_size to multiples of 8.
- */
- inline static std::size_t GetByteSize(const std::size_t bit_size) {
- return (bit_size + 7u) / 8u;
- }
-
- /**
* @brief Iterate through the accessor and hash values into the internal bit
* array.
*/
@@ -164,8 +153,7 @@ class BitVectorExactFilter : public LIPFilter {
DCHECK_GE(value, min_value_);
DCHECK_LE(value, max_value_);
- const CppType loc = value - min_value_;
- bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+ bit_vector_.setBit(value - min_value_);
}
/**
@@ -177,9 +165,7 @@ class BitVectorExactFilter : public LIPFilter {
return is_anti_filter;
}
- const CppType loc = value - min_value_;
- const bool is_bit_set =
- (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+ const bool is_bit_set = bit_vector_.getBit(value - min_value_);
if (is_anti_filter) {
return !is_bit_set;
@@ -190,7 +176,7 @@ class BitVectorExactFilter : public LIPFilter {
const CppType min_value_;
const CppType max_value_;
- alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+ BarrieredReadWriteConcurrentBitVector bit_vector_;
DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index edd0d24..0a820cf 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -39,8 +39,9 @@ target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
quickstep_types_Type
- quickstep_utility_lipfilter_LIPFilter
- quickstep_utility_Macros)
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter)
target_link_libraries(quickstep_utility_lipfilter_LIPFilter
quickstep_catalog_CatalogTypedefs
quickstep_storage_StorageBlockInfo
@@ -83,5 +84,6 @@ target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
quickstep_types_Type
- quickstep_utility_lipfilter_LIPFilter
- quickstep_utility_Macros)
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
index 5c0e8a2..6eaa93e 100644
--- a/utility/lip_filter/SingleIdentityHashFilter.hpp
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -20,10 +20,8 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
-#include <atomic>
#include <cstddef>
#include <cstdint>
-#include <cstring>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -32,6 +30,7 @@
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
#include "utility/Macros.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
@@ -65,11 +64,8 @@ class SingleIdentityHashFilter : public LIPFilter {
explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
: LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
filter_cardinality_(filter_cardinality),
- bit_array_(GetByteSize(filter_cardinality)) {
+ bit_vector_(filter_cardinality) {
DCHECK_GE(filter_cardinality, 1u);
- std::memset(bit_array_.data(),
- 0x0,
- sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
}
void insertValueAccessor(ValueAccessor *accessor,
@@ -158,8 +154,9 @@ class SingleIdentityHashFilter : public LIPFilter {
* @brief Inserts a given value into the hash filter.
*/
inline void insert(const void *key_begin) {
- const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
- bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+ const CppType hash =
+ *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ bit_vector_.setBit(hash);
}
/**
@@ -168,12 +165,13 @@ class SingleIdentityHashFilter : public LIPFilter {
* If false is returned, a value is certainly not present in the hash filter.
*/
inline bool contains(const void *key_begin) const {
- const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
- return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+ const CppType hash =
+ *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ return bit_vector_.getBit(hash);
}
std::size_t filter_cardinality_;
- alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+ BarrieredReadWriteConcurrentBitVector bit_vector_;
DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
};