You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/08 19:43:06 UTC

[2/3] incubator-quickstep git commit: Fuse Aggregate with LeftOuterJoin to accelerate evaluation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..33321d3 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ set_gflags_lib_name ()
 
 # Declare micro-libs:
 add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
+add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+            BuildAggregationExistenceMapOperator.cpp
+            BuildAggregationExistenceMapOperator.hpp)
 add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
 add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
 add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
@@ -95,6 +98,27 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_AggregationOperationState
+                      quickstep_storage_StorageBlock
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -552,6 +576,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
 add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
 target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_CreateIndexOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0b34908..2d85312 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -412,12 +412,23 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
   }
 }
 
+BarrieredReadWriteConcurrentBitVector* AggregationOperationState
+    ::getExistenceMap() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->getExistenceMap();
+  } else {
+    LOG(FATAL) << "AggregationOperationState::getExistenceMap() "
+               << "is not supported by this aggregation";
+  }
+}
+
 void AggregationOperationState::initialize(const std::size_t partition_id) {
   if (is_aggregate_collision_free_) {
     static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->initialize(partition_id);
   } else {
-    LOG(FATAL) << "AggregationOperationState::initializeState() "
+    LOG(FATAL) << "AggregationOperationState::initialize() "
                << "is not supported by this aggregation";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 13ee377..23f23b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -39,6 +39,7 @@ namespace quickstep {
 namespace serialization { class AggregationOperationState; }
 
 class AggregateFunction;
+class BarrieredReadWriteConcurrentBitVector;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
@@ -167,6 +168,8 @@ class AggregationOperationState {
    **/
   std::size_t getNumFinalizationPartitions() const;
 
+  BarrieredReadWriteConcurrentBitVector* getExistenceMap() const;
+
   /**
    * @brief Initialize the specified partition of this aggregation.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 4f3e238..cd76854 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -104,6 +104,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
     return existence_map_->onesCountInRange(start_position, end_position);
   }
 
+  inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
+    return existence_map_.get();
+  }
+
   /**
    * @brief Initialize the specified partition of this aggregation table.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 6ad0567..7c4c238 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -20,9 +20,8 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 
-#include <atomic>
+#include <cstddef>
 #include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -31,6 +30,7 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -64,14 +64,10 @@ class BitVectorExactFilter : public LIPFilter {
       : LIPFilter(LIPFilterType::kBitVectorExactFilter),
         min_value_(static_cast<CppType>(min_value)),
         max_value_(static_cast<CppType>(max_value)),
-        bit_array_(GetByteSize(max_value - min_value + 1)) {
+        bit_vector_(max_value - min_value + 1) {
     DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
     DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
     DCHECK_GE(max_value_, min_value_);
-
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -109,13 +105,6 @@ class BitVectorExactFilter : public LIPFilter {
 
  private:
   /**
-   * @brief Round up bit_size to multiples of 8.
-   */
-  inline static std::size_t GetByteSize(const std::size_t bit_size) {
-    return (bit_size + 7u) / 8u;
-  }
-
-  /**
    * @brief Iterate through the accessor and hash values into the internal bit
    *        array.
    */
@@ -164,8 +153,7 @@ class BitVectorExactFilter : public LIPFilter {
     DCHECK_GE(value, min_value_);
     DCHECK_LE(value, max_value_);
 
-    const CppType loc = value - min_value_;
-    bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+    bit_vector_.setBit(value - min_value_);
   }
 
   /**
@@ -177,9 +165,7 @@ class BitVectorExactFilter : public LIPFilter {
       return is_anti_filter;
     }
 
-    const CppType loc = value - min_value_;
-    const bool is_bit_set =
-        (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+    const bool is_bit_set = bit_vector_.getBit(value - min_value_);
 
     if (is_anti_filter) {
       return !is_bit_set;
@@ -190,7 +176,7 @@ class BitVectorExactFilter : public LIPFilter {
 
   const CppType min_value_;
   const CppType max_value_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index edd0d24..0a820cf 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -39,8 +39,9 @@ target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
@@ -83,5 +84,6 @@ target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
index 5c0e8a2..6eaa93e 100644
--- a/utility/lip_filter/SingleIdentityHashFilter.hpp
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -20,10 +20,8 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 
-#include <atomic>
 #include <cstddef>
 #include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -32,6 +30,7 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -65,11 +64,8 @@ class SingleIdentityHashFilter : public LIPFilter {
   explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
       : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
         filter_cardinality_(filter_cardinality),
-        bit_array_(GetByteSize(filter_cardinality)) {
+        bit_vector_(filter_cardinality) {
     DCHECK_GE(filter_cardinality, 1u);
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -158,8 +154,9 @@ class SingleIdentityHashFilter : public LIPFilter {
    * @brief Inserts a given value into the hash filter.
    */
   inline void insert(const void *key_begin) {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    bit_vector_.setBit(hash);
   }
 
   /**
@@ -168,12 +165,13 @@ class SingleIdentityHashFilter : public LIPFilter {
    *        If false is returned, a value is certainly not present in the hash filter.
    */
   inline bool contains(const void *key_begin) const {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    return bit_vector_.getBit(hash);
   }
 
   std::size_t filter_cardinality_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
 };