You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/09 15:05:46 UTC
[1/2] incubator-quickstep git commit: Fuse Aggregate with
LeftOuterJoin to accelerate evaluation.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 266b9b9a9 -> a28b1e4d7
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
new file mode 100644
index 0000000..e2928a8
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+
+#include <cstddef>
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class CatalogRelationSchema;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+/**
+ * @brief An operator which builds a bit vector on the input relation's one
+ * attribute where the bit vector serves as the existence map for an
+ * AggregationOperationState's CollisionFreeVectorTable.
+ **/
+class BuildAggregationExistenceMapOperator : public RelationalOperator {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_id The ID of the query to which this operator belongs.
+ * @param input_relation The relation to build the existence map on.
+ * @param build_attribute The ID of the attribute to build the existence map on.
+ * @param input_relation_is_stored If input_relation is a stored relation and
+ * is fully available to the operator before it can start generating
+ * workorders.
+ * @param aggr_state_index The index of the AggregationState in QueryContext.
+ **/
+ BuildAggregationExistenceMapOperator(const std::size_t query_id,
+ const CatalogRelation &input_relation,
+ const attribute_id build_attribute,
+ const bool input_relation_is_stored,
+ const QueryContext::aggregation_state_id aggr_state_index)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
+ build_attribute_(build_attribute),
+ input_relation_is_stored_(input_relation_is_stored),
+ input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+ : std::vector<block_id>()),
+ aggr_state_index_(aggr_state_index),
+ num_workorders_generated_(0),
+ started_(false) {}
+
+ ~BuildAggregationExistenceMapOperator() override {}
+
+ std::string getName() const override {
+ return "BuildAggregationExistenceMapOperator";
+ }
+
+ /**
+ * @return The input relation.
+ */
+ const CatalogRelation& input_relation() const {
+ return input_relation_;
+ }
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
+ input_relation_block_ids_.push_back(input_block_id);
+ }
+
+ private:
+ serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+ const CatalogRelation &input_relation_;
+ const attribute_id build_attribute_;
+ const bool input_relation_is_stored_;
+ std::vector<block_id> input_relation_block_ids_;
+ const QueryContext::aggregation_state_id aggr_state_index_;
+
+ std::vector<block_id>::size_type num_workorders_generated_;
+ bool started_;
+
+ DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator.
+ **/
+class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param query_id The ID of this query.
+ * @param input_relation The relation to build the existence map on.
+ * @param build_block_id The block id.
+ * @param build_attribute The ID of the attribute to build on.
+ * @param state The AggregationState to use.
+ * @param storage_manager The StorageManager to use.
+ **/
+ BuildAggregationExistenceMapWorkOrder(const std::size_t query_id,
+ const CatalogRelationSchema &input_relation,
+ const block_id build_block_id,
+ const attribute_id build_attribute,
+ AggregationOperationState *state,
+ StorageManager *storage_manager)
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
+ build_block_id_(build_block_id),
+ build_attribute_(build_attribute),
+ state_(DCHECK_NOTNULL(state)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+ ~BuildAggregationExistenceMapWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ const CatalogRelationSchema &input_relation_;
+ const block_id build_block_id_;
+ const attribute_id build_attribute_;
+ AggregationOperationState *state_;
+
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..457d58a 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ set_gflags_lib_name ()
# Declare micro-libs:
add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
+add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+ BuildAggregationExistenceMapOperator.cpp
+ BuildAggregationExistenceMapOperator.hpp)
add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
@@ -95,6 +98,31 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
quickstep_utility_lipfilter_LIPFilterAdaptiveProber
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
+target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+ glog
+ quickstep_catalog_CatalogAttribute
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogRelationSchema
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_AggregationOperationState
+ quickstep_storage_CollisionFreeVectorTable
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleStorageSubBlock
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_BuildHashOperator
glog
quickstep_catalog_CatalogRelation
@@ -518,6 +546,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_BuildAggregationExistenceMapOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_DeleteOperator
@@ -552,6 +581,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_BuildAggregationExistenceMapOperator
quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_CreateIndexOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 76753d2..d0d0753 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -44,6 +44,7 @@ enum WorkOrderType {
UPDATE = 20;
WINDOW_AGGREGATION = 21;
DESTROY_AGGREGATION_STATE = 22;
+ BUILD_AGGREGATION_EXISTENCE_MAP = 23;
}
message WorkOrder {
@@ -278,6 +279,15 @@ message WindowAggregationWorkOrder {
message DestroyAggregationStateWorkOrder {
extend WorkOrder {
- optional uint32 aggr_state_index = 339;
+ optional uint32 aggr_state_index = 352;
+ }
+}
+
+message BuildAggregationExistenceMapWorkOrder {
+ extend WorkOrder {
+ optional int32 relation_id = 368;
+ optional fixed64 build_block_id = 369;
+ optional int32 build_attribute = 370;
+ optional uint32 aggr_state_index = 371;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index bd2a0f8..d2c8251 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -29,6 +29,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
#include "relational_operators/BuildHashOperator.hpp"
#include "relational_operators/BuildLIPFilterOperator.hpp"
#include "relational_operators/DeleteOperator.hpp"
@@ -91,6 +92,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
CreateLIPFilterAdaptiveProberHelper(
proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
}
+ case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
+ LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder in Shiftboss " << shiftboss_index;
+
+ return new BuildAggregationExistenceMapWorkOrder(
+ proto.query_id(),
+ catalog_database->getRelationSchemaById(
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)),
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
+ query_context->getAggregationState(
+ proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
+ storage_manager);
+ }
case serialization::BUILD_LIP_FILTER: {
LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index;
@@ -525,6 +539,29 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
query_context.isValidAggregationStateId(
proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index));
}
+ case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
+ if (!proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)) {
+ return false;
+ }
+
+ const relation_id rel_id =
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id);
+ if (!catalog_database.hasRelationWithId(rel_id)) {
+ return false;
+ }
+
+ const CatalogRelationSchema &relation = catalog_database.getRelationSchemaById(rel_id);
+ const attribute_id build_attribute =
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute);
+ if (!relation.hasAttributeWithId(build_attribute)) {
+ return false;
+ }
+
+ return proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id) &&
+ proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index) &&
+ query_context.isValidAggregationStateId(
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index));
+ }
case serialization::BUILD_HASH: {
if (!proto.HasExtension(serialization::BuildHashWorkOrder::relation_id)) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0b34908..0f39b41 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -412,12 +412,18 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
}
}
+CollisionFreeVectorTable* AggregationOperationState
+ ::getCollisionFreeVectorTable() const {
+ return static_cast<CollisionFreeVectorTable *>(
+ collision_free_hashtable_.get());
+}
+
void AggregationOperationState::initialize(const std::size_t partition_id) {
if (is_aggregate_collision_free_) {
static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->initialize(partition_id);
} else {
- LOG(FATAL) << "AggregationOperationState::initializeState() "
+ LOG(FATAL) << "AggregationOperationState::initialize() "
<< "is not supported by this aggregation";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 13ee377..c8930ee 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -41,6 +41,7 @@ namespace serialization { class AggregationOperationState; }
class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
+class CollisionFreeVectorTable;
class InsertDestination;
class LIPFilterAdaptiveProber;
class StorageManager;
@@ -198,6 +199,14 @@ class AggregationOperationState {
void finalizeAggregate(const std::size_t partition_id,
InsertDestination *output_destination);
+ /**
+ * @brief Get the collision-free vector table used by this aggregation.
+ *
+ * @return The collision-free vector table used by this aggregation.
+ * Returns NULL if collision-free vector table is not used.
+ */
+ CollisionFreeVectorTable* getCollisionFreeVectorTable() const;
+
private:
// Check whether partitioned aggregation can be applied.
bool checkAggregatePartitioned(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 4f3e238..772d47d 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -105,6 +105,15 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
}
/**
+ * @brief Get the existence map for this vector table.
+ *
+ * @return The existence map for this vector table.
+ */
+ inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
+ return existence_map_.get();
+ }
+
+ /**
* @brief Initialize the specified partition of this aggregation table.
*
* @param partition_id ID of the partition to be initialized.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 6ad0567..48fd5e1 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -20,17 +20,16 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
-#include <atomic>
+#include <cstddef>
#include <cstdint>
-#include <cstring>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
#include "utility/Macros.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
@@ -64,14 +63,10 @@ class BitVectorExactFilter : public LIPFilter {
: LIPFilter(LIPFilterType::kBitVectorExactFilter),
min_value_(static_cast<CppType>(min_value)),
max_value_(static_cast<CppType>(max_value)),
- bit_array_(GetByteSize(max_value - min_value + 1)) {
+ bit_vector_(max_value - min_value + 1) {
DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
DCHECK_GE(max_value_, min_value_);
-
- std::memset(bit_array_.data(),
- 0x0,
- sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
}
void insertValueAccessor(ValueAccessor *accessor,
@@ -109,13 +104,6 @@ class BitVectorExactFilter : public LIPFilter {
private:
/**
- * @brief Round up bit_size to multiples of 8.
- */
- inline static std::size_t GetByteSize(const std::size_t bit_size) {
- return (bit_size + 7u) / 8u;
- }
-
- /**
* @brief Iterate through the accessor and hash values into the internal bit
* array.
*/
@@ -164,8 +152,7 @@ class BitVectorExactFilter : public LIPFilter {
DCHECK_GE(value, min_value_);
DCHECK_LE(value, max_value_);
- const CppType loc = value - min_value_;
- bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+ bit_vector_.setBit(value - min_value_);
}
/**
@@ -177,9 +164,7 @@ class BitVectorExactFilter : public LIPFilter {
return is_anti_filter;
}
- const CppType loc = value - min_value_;
- const bool is_bit_set =
- (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+ const bool is_bit_set = bit_vector_.getBit(value - min_value_);
if (is_anti_filter) {
return !is_bit_set;
@@ -190,7 +175,7 @@ class BitVectorExactFilter : public LIPFilter {
const CppType min_value_;
const CppType max_value_;
- alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+ BarrieredReadWriteConcurrentBitVector bit_vector_;
DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index edd0d24..519d3e9 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -35,12 +35,12 @@ add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src
target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
quickstep_catalog_CatalogTypedefs
quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
quickstep_types_Type
- quickstep_utility_lipfilter_LIPFilter
- quickstep_utility_Macros)
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter)
target_link_libraries(quickstep_utility_lipfilter_LIPFilter
quickstep_catalog_CatalogTypedefs
quickstep_storage_StorageBlockInfo
@@ -79,9 +79,9 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
quickstep_catalog_CatalogTypedefs
quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
quickstep_types_Type
- quickstep_utility_lipfilter_LIPFilter
- quickstep_utility_Macros)
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
index 5c0e8a2..d7e3475 100644
--- a/utility/lip_filter/SingleIdentityHashFilter.hpp
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -20,18 +20,15 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
-#include <atomic>
#include <cstddef>
-#include <cstdint>
-#include <cstring>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
#include "utility/Macros.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
@@ -65,11 +62,8 @@ class SingleIdentityHashFilter : public LIPFilter {
explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
: LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
filter_cardinality_(filter_cardinality),
- bit_array_(GetByteSize(filter_cardinality)) {
+ bit_vector_(filter_cardinality) {
DCHECK_GE(filter_cardinality, 1u);
- std::memset(bit_array_.data(),
- 0x0,
- sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
}
void insertValueAccessor(ValueAccessor *accessor,
@@ -158,8 +152,9 @@ class SingleIdentityHashFilter : public LIPFilter {
* @brief Inserts a given value into the hash filter.
*/
inline void insert(const void *key_begin) {
- const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
- bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+ const CppType hash =
+ *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ bit_vector_.setBit(hash);
}
/**
@@ -168,12 +163,13 @@ class SingleIdentityHashFilter : public LIPFilter {
* If false is returned, a value is certainly not present in the hash filter.
*/
inline bool contains(const void *key_begin) const {
- const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
- return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+ const CppType hash =
+ *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ return bit_vector_.getBit(hash);
}
std::size_t filter_cardinality_;
- alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+ BarrieredReadWriteConcurrentBitVector bit_vector_;
DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
};
[2/2] incubator-quickstep git commit: Fuse Aggregate with
LeftOuterJoin to accelerate evaluation.
Posted by ji...@apache.org.
Fuse Aggregate with LeftOuterJoin to accelerate evaluation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a28b1e4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a28b1e4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a28b1e4d
Branch: refs/heads/master
Commit: a28b1e4d77ee12466b0801a5a7c5185f7a83e7f8
Parents: 266b9b9
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Feb 8 23:55:32 2017 -0600
----------------------------------------------------------------------
query_optimizer/CMakeLists.txt | 6 +-
query_optimizer/ExecutionGenerator.cpp | 261 +++++++++++--------
query_optimizer/ExecutionGenerator.hpp | 20 +-
query_optimizer/PhysicalGenerator.cpp | 3 +
query_optimizer/cost_model/CMakeLists.txt | 8 +
query_optimizer/cost_model/SimpleCostModel.cpp | 9 +
query_optimizer/cost_model/SimpleCostModel.hpp | 5 +
.../cost_model/StarSchemaSimpleCostModel.cpp | 148 ++++++++++-
.../cost_model/StarSchemaSimpleCostModel.hpp | 20 ++
query_optimizer/physical/CMakeLists.txt | 14 +
.../CrossReferenceCoalesceAggregate.cpp | 105 ++++++++
.../CrossReferenceCoalesceAggregate.hpp | 232 +++++++++++++++++
query_optimizer/physical/PatternMatcher.hpp | 3 +
query_optimizer/physical/PhysicalType.hpp | 1 +
query_optimizer/rules/BottomUpRule.hpp | 39 +--
query_optimizer/rules/CMakeLists.txt | 23 ++
query_optimizer/rules/FuseAggregateJoin.cpp | 170 ++++++++++++
query_optimizer/rules/FuseAggregateJoin.hpp | 71 +++++
.../BuildAggregationExistenceMapOperator.cpp | 196 ++++++++++++++
.../BuildAggregationExistenceMapOperator.hpp | 177 +++++++++++++
relational_operators/CMakeLists.txt | 30 +++
relational_operators/WorkOrder.proto | 12 +-
relational_operators/WorkOrderFactory.cpp | 37 +++
storage/AggregationOperationState.cpp | 8 +-
storage/AggregationOperationState.hpp | 9 +
storage/CollisionFreeVectorTable.hpp | 9 +
utility/lip_filter/BitVectorExactFilter.hpp | 27 +-
utility/lip_filter/CMakeLists.txt | 12 +-
utility/lip_filter/SingleIdentityHashFilter.hpp | 22 +-
29 files changed, 1489 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index e750a1e..3ff783c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,7 +64,6 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunction_proto
- quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
@@ -95,6 +94,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_queryoptimizer_physical_CopyFrom
quickstep_queryoptimizer_physical_CreateIndex
quickstep_queryoptimizer_physical_CreateTable
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_DeleteTuples
quickstep_queryoptimizer_physical_DropTable
quickstep_queryoptimizer_physical_FilterJoin
@@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_queryoptimizer_physical_UpdateTable
quickstep_queryoptimizer_physical_WindowAggregate
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_BuildAggregationExistenceMapOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_CreateIndexOperator
@@ -147,12 +148,10 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_storage_StorageBlockLayout_proto
quickstep_storage_SubBlockTypeRegistry
quickstep_types_Type
- quickstep_types_TypeID
quickstep_types_Type_proto
quickstep_types_TypedValue
quickstep_types_TypedValue_proto
quickstep_types_containers_Tuple_proto
- quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
quickstep_utility_SqlError)
if (ENABLE_DISTRIBUTED)
@@ -213,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
quickstep_queryoptimizer_logical_Logical
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_rules_AttachLIPFilters
+ quickstep_queryoptimizer_rules_FuseAggregateJoin
quickstep_queryoptimizer_rules_InjectJoinFilters
quickstep_queryoptimizer_rules_PruneColumns
quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 1b50caa..70b69e0 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,7 +49,6 @@
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunction.pb.h"
-#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
@@ -72,9 +71,11 @@
#include "query_optimizer/expressions/Scalar.hpp"
#include "query_optimizer/expressions/ScalarLiteral.hpp"
#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
#include "query_optimizer/physical/CopyFrom.hpp"
#include "query_optimizer/physical/CreateIndex.hpp"
#include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/DeleteTuples.hpp"
#include "query_optimizer/physical/DropTable.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
@@ -96,6 +97,7 @@
#include "query_optimizer/physical/UpdateTable.hpp"
#include "query_optimizer/physical/WindowAggregate.hpp"
#include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
#include "relational_operators/BuildHashOperator.hpp"
#include "relational_operators/BuildLIPFilterOperator.hpp"
#include "relational_operators/CreateIndexOperator.hpp"
@@ -128,11 +130,9 @@
#include "storage/SubBlockTypeRegistry.hpp"
#include "types/Type.hpp"
#include "types/Type.pb.h"
-#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"
#include "types/containers/Tuple.pb.h"
-#include "utility/EqualsAnyConstant.hpp"
#include "utility/SqlError.hpp"
#include "gflags/gflags.h"
@@ -163,10 +163,6 @@ static const volatile bool aggregate_hashtable_type_dummy
DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
-DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
- "The maximum allowed key range (number of entries) for using a "
- "CollisionFreeVectorTable.");
-
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
namespace S = ::quickstep::serialization;
@@ -266,6 +262,9 @@ void ExecutionGenerator::generatePlanInternal(
case P::PhysicalType::kAggregate:
return convertAggregate(
std::static_pointer_cast<const P::Aggregate>(physical_plan));
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+ return convertCrossReferenceCoalesceAggregate(
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
case P::PhysicalType::kCopyFrom:
return convertCopyFrom(
std::static_pointer_cast<const P::CopyFrom>(physical_plan));
@@ -379,105 +378,6 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
}
}
-bool ExecutionGenerator::canUseCollisionFreeAggregation(
- const P::AggregatePtr &aggregate,
- const std::size_t estimated_num_groups,
- std::size_t *max_num_groups) const {
-#ifdef QUICKSTEP_DISTRIBUTED
- // Currently we cannot do this fast path with the distributed setting. See
- // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
- // FinalizeAggregationOperator::getAllWorkOrderProtos().
- return false;
-#endif
-
- // Supports only single group-by key.
- if (aggregate->grouping_expressions().size() != 1) {
- return false;
- }
-
- // We need to know the exact min/max stats of the group-by key.
- // So it must be a CatalogAttribute (but not an expression).
- E::AttributeReferencePtr group_by_key_attr;
- const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
- if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
- return false;
- }
-
- bool min_value_stat_is_exact;
- bool max_value_stat_is_exact;
- const TypedValue min_value =
- cost_model_for_aggregation_->findMinValueStat(
- aggregate, group_by_key_attr, &min_value_stat_is_exact);
- const TypedValue max_value =
- cost_model_for_aggregation_->findMaxValueStat(
- aggregate, group_by_key_attr, &max_value_stat_is_exact);
- if (min_value.isNull() || max_value.isNull() ||
- (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
- return false;
- }
-
- std::int64_t min_cpp_value;
- std::int64_t max_cpp_value;
- switch (group_by_key_attr->getValueType().getTypeID()) {
- case TypeID::kInt: {
- min_cpp_value = min_value.getLiteral<int>();
- max_cpp_value = max_value.getLiteral<int>();
- break;
- }
- case TypeID::kLong: {
- min_cpp_value = min_value.getLiteral<std::int64_t>();
- max_cpp_value = max_value.getLiteral<std::int64_t>();
- break;
- }
- default:
- return false;
- }
-
- // TODO(jianqiao):
- // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
- // 2. Reason about the table size bound (e.g. by checking memory size) instead
- // of hardcoding it as a gflag.
- if (min_cpp_value < 0 ||
- max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
- max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
- return false;
- }
-
- for (const auto &agg_expr : aggregate->aggregate_expressions()) {
- const E::AggregateFunctionPtr agg_func =
- std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
-
- if (agg_func->is_distinct()) {
- return false;
- }
-
- // TODO(jianqiao): Support AggregationID::AVG.
- if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
- AggregationID::kCount,
- AggregationID::kSum)) {
- return false;
- }
-
- const auto &arguments = agg_func->getArguments();
- if (arguments.size() > 1u) {
- return false;
- }
-
- if (arguments.size() == 1u) {
- if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
- TypeID::kInt,
- TypeID::kLong,
- TypeID::kFloat,
- TypeID::kDouble)) {
- return false;
- }
- }
- }
-
- *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
- return true;
-}
-
void ExecutionGenerator::convertNamedExpressions(
const std::vector<E::NamedExpressionPtr> &named_expressions,
S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1608,9 +1508,10 @@ void ExecutionGenerator::convertAggregate(
cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
std::size_t max_num_groups;
- if (canUseCollisionFreeAggregation(physical_plan,
- estimated_num_groups,
- &max_num_groups)) {
+ if (cost_model_for_aggregation_
+ ->canUseCollisionFreeAggregation(physical_plan,
+ estimated_num_groups,
+ &max_num_groups)) {
aggr_state_proto->set_hash_table_impl_type(
serialization::HashTableImplType::COLLISION_FREE_VECTOR);
aggr_state_proto->set_estimated_num_entries(max_num_groups);
@@ -1730,6 +1631,148 @@ void ExecutionGenerator::convertAggregate(
}
}
+void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
+ const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+ DCHECK_EQ(1u, physical_plan->left_join_attributes().size());
+ DCHECK_EQ(1u, physical_plan->right_join_attributes().size());
+
+ const CatalogRelationInfo *left_relation_info =
+ findRelationInfoOutputByPhysical(physical_plan->left_child());
+ const CatalogRelationInfo *right_relation_info =
+ findRelationInfoOutputByPhysical(physical_plan->right_child());
+
+ // Create aggr state proto.
+ const QueryContext::aggregation_state_id aggr_state_index =
+ query_context_proto_->aggregation_states_size();
+ S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+
+ aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+
+ // Group by the right join attribute.
+ std::unique_ptr<const Scalar> execution_group_by_expression(
+ physical_plan->right_join_attributes().front()->concretize(
+ attribute_substitution_map_));
+ aggr_state_proto->add_group_by_expressions()->CopyFrom(
+ execution_group_by_expression->getProto());
+
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+ aggr_state_proto->set_estimated_num_entries(
+ physical_plan->group_by_key_value_range());
+
+ if (physical_plan->right_filter_predicate() != nullptr) {
+ std::unique_ptr<const Predicate> predicate(
+ convertPredicate(physical_plan->right_filter_predicate()));
+ aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+ }
+
+ for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
+ const E::AggregateFunctionPtr unnamed_aggregate_expression =
+ std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+ // Add a new entry in 'aggregates'.
+ S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+ // Set the AggregateFunction.
+ aggr_proto->mutable_function()->CopyFrom(
+ unnamed_aggregate_expression->getAggregate().getProto());
+
+ // Add each of the aggregate's arguments.
+ for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+ unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+ aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+ }
+
+ // Set whether it is a DISTINCT aggregation.
+ DCHECK(!unnamed_aggregate_expression->is_distinct());
+ aggr_proto->set_is_distinct(false);
+ }
+
+ const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new InitializeAggregationOperator(
+ query_handle_->query_id(),
+ aggr_state_index));
+
+ const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
+ execution_plan_->addRelationalOperator(
+ new BuildAggregationExistenceMapOperator(
+ query_handle_->query_id(),
+ *left_relation_info->relation,
+ physical_plan->left_join_attributes().front()->id(),
+ left_relation_info->isStoredRelation(),
+ aggr_state_index));
+
+ if (!left_relation_info->isStoredRelation()) {
+ execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+ left_relation_info->producer_operator_index,
+ false /* is_pipeline_breaker */);
+ }
+
+ const QueryPlan::DAGNodeIndex aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new AggregationOperator(
+ query_handle_->query_id(),
+ *right_relation_info->relation,
+ right_relation_info->isStoredRelation(),
+ aggr_state_index));
+
+ if (!right_relation_info->isStoredRelation()) {
+ execution_plan_->addDirectDependency(aggregation_operator_index,
+ right_relation_info->producer_operator_index,
+ false /* is_pipeline_breaker */);
+ }
+
+ // Build aggregation existence map once initialization is done.
+ execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+ initialize_aggregation_operator_index,
+ true /* is_pipeline_breaker */);
+
+ // Start aggregation after building existence map.
+ execution_plan_->addDirectDependency(aggregation_operator_index,
+ build_aggregation_existence_map_operator_index,
+ true /* is_pipeline_breaker */);
+
+
+ // Create InsertDestination proto.
+ const CatalogRelation *output_relation = nullptr;
+ const QueryContext::insert_destination_id insert_destination_index =
+ query_context_proto_->insert_destinations_size();
+ S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ createTemporaryCatalogRelation(physical_plan,
+ &output_relation,
+ insert_destination_proto);
+
+ const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new FinalizeAggregationOperator(query_handle_->query_id(),
+ aggr_state_index,
+ *output_relation,
+ insert_destination_index));
+
+ insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
+
+ execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
+ aggregation_operator_index,
+ true /* is_pipeline_breaker */);
+
+ physical_to_output_relation_map_.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(physical_plan),
+ std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+ temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
+ output_relation);
+
+ const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+ execution_plan_->addRelationalOperator(
+ new DestroyAggregationStateOperator(query_handle_->query_id(),
+ aggr_state_index));
+
+ execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+ finalize_aggregation_operator_index,
+ true);
+}
+
void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
// Create sort configuration for run generation.
vector<bool> sort_ordering(physical_sort->sort_ascending());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 987f11a..f4e614a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -46,6 +46,7 @@
#include "query_optimizer/physical/CopyFrom.hpp"
#include "query_optimizer/physical/CreateIndex.hpp"
#include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/DeleteTuples.hpp"
#include "query_optimizer/physical/DropTable.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
@@ -206,22 +207,6 @@ class ExecutionGenerator {
std::string getNewRelationName();
/**
- * @brief Checks whether an aggregate node can be efficiently evaluated with
- * the collision-free aggregation fast path.
- *
- * @param aggregate The physical aggregate node to be checked.
- * @param estimated_num_groups The estimated number of groups for the aggregate.
- * @param exact_num_groups If collision-free aggregation is applicable, the
- * pointed content of this pointer will be set as the maximum possible
- * number of groups that the collision-free hash table need to hold.
- * @return A bool value indicating whether collision-free aggregation can be
- * used to evaluate \p aggregate.
- */
- bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
- const std::size_t estimated_num_groups,
- std::size_t *max_num_groups) const;
-
- /**
* @brief Sets up the info of the CatalogRelation represented by TableReference.
* TableReference is not converted to any operator.
*
@@ -356,6 +341,9 @@ class ExecutionGenerator {
*/
void convertAggregate(const physical::AggregatePtr &physical_plan);
+ void convertCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
/**
* @brief Converts a physical Sort to SortRunGeneration and SortMergeRun.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 1b68f49..ac51c31 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
#include "query_optimizer/logical/Logical.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
#include "query_optimizer/rules/InjectJoinFilters.hpp"
#include "query_optimizer/rules/PruneColumns.hpp"
#include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
@@ -145,6 +146,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
rules.emplace_back(new ReorderColumns());
}
+ rules.emplace_back(new FuseAggregateJoin());
+
// NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
// extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
// suggested that all the new rules be placed before this point.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 5f28bb3..4042915 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
quickstep_catalog_CatalogRelationStatistics
quickstep_queryoptimizer_costmodel_CostModel
quickstep_queryoptimizer_physical_Aggregate
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_FilterJoin
quickstep_queryoptimizer_physical_HashJoin
quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -51,7 +52,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogRelationStatistics
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_aggregation_AggregationID
quickstep_queryoptimizer_costmodel_CostModel
+ quickstep_queryoptimizer_expressions_AggregateFunction
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_ComparisonExpression
quickstep_queryoptimizer_expressions_ExprId
@@ -62,6 +66,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
quickstep_queryoptimizer_expressions_PatternMatcher
quickstep_queryoptimizer_expressions_Predicate
quickstep_queryoptimizer_physical_Aggregate
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_FilterJoin
quickstep_queryoptimizer_physical_HashJoin
quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -76,7 +81,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
quickstep_queryoptimizer_physical_TopLevelPlan
quickstep_queryoptimizer_physical_WindowAggregate
quickstep_types_NullType
+ quickstep_types_Type
+ quickstep_types_TypeID
quickstep_types_TypedValue
+ quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros)
# Module all-in-one library:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index e9d2e3a..cfd8a75 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -26,6 +26,7 @@
#include "catalog/CatalogRelationStatistics.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -74,6 +75,9 @@ std::size_t SimpleCostModel::estimateCardinality(
case P::PhysicalType::kAggregate:
return estimateCardinalityForAggregate(
std::static_pointer_cast<const P::Aggregate>(physical_plan));
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+ return estimateCardinalityForCrossReferenceCoalesceAggregate(
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
case P::PhysicalType::kSharedSubplanReference: {
const P::SharedSubplanReferencePtr shared_subplan_reference =
std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -149,6 +153,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
estimateCardinality(physical_plan->input()) / 10);
}
+std::size_t SimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+ return estimateCardinality(physical_plan->left_child());
+}
+
std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
const physical::WindowAggregatePtr &physical_plan) {
return estimateCardinality(physical_plan->input());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 4edc2fe..0660c37 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -25,6 +25,7 @@
#include "query_optimizer/cost_model/CostModel.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -100,6 +101,10 @@ class SimpleCostModel : public CostModel {
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
+ // Returns the cardinality of the left child plan.
+ std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
// Return the estimated cardinality of the input plan.
std::size_t estimateCardinalityForWindowAggregate(
const physical::WindowAggregatePtr &physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 7afa1c3..fc775c7 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -20,13 +20,18 @@
#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include <algorithm>
+#include <cstddef>
+#include <cstdint>
#include <memory>
#include <vector>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogRelationStatistics.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ComparisonExpression.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
@@ -37,6 +42,7 @@
#include "query_optimizer/expressions/Predicate.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -49,8 +55,13 @@
#include "query_optimizer/physical/TableGenerator.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/NullType.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -58,6 +69,10 @@ namespace quickstep {
namespace optimizer {
namespace cost {
+DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
+ "The maximum allowed key range (number of entries) for using a "
+ "CollisionFreeVectorTable.");
+
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
@@ -88,6 +103,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
case P::PhysicalType::kAggregate:
return estimateCardinalityForAggregate(
std::static_pointer_cast<const P::Aggregate>(physical_plan));
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+ return estimateCardinalityForCrossReferenceCoalesceAggregate(
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
case P::PhysicalType::kSharedSubplanReference: {
const P::SharedSubplanReferencePtr shared_subplan_reference =
std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -175,6 +193,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
estimateNumGroupsForAggregate(physical_plan) * filter_selectivity);
}
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+ return estimateCardinality(physical_plan->left_child());
+}
+
std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
const P::WindowAggregatePtr &physical_plan) {
return estimateCardinality(physical_plan->input());
@@ -233,6 +256,13 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
}
break;
}
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+ const P::PhysicalPtr left_child = physical_plan->children()[0];
+ if (E::ContainsExprId(left_child->getOutputAttributes(), attribute_id)) {
+ return estimateNumDistinctValues(attribute_id, left_child);
+ }
+ break;
+ }
case P::PhysicalType::kFilterJoin: {
const P::FilterJoinPtr &filter_join =
std::static_pointer_cast<const P::FilterJoin>(physical_plan);
@@ -275,6 +305,17 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
double StarSchemaSimpleCostModel::estimateSelectivity(
const physical::PhysicalPtr &physical_plan) {
switch (physical_plan->getPhysicalType()) {
+ case P::PhysicalType::kAggregate: {
+ const P::AggregatePtr &aggregate =
+ std::static_pointer_cast<const P::Aggregate>(physical_plan);
+ return estimateSelectivity(aggregate->input()) *
+ estimateSelectivityForFilterPredicate(aggregate);
+ }
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+ const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+ return estimateSelectivity(aggregate_on_left_outer_join->left_child());
+ }
case P::PhysicalType::kSelection: {
const P::SelectionPtr &selection =
std::static_pointer_cast<const P::Selection>(physical_plan);
@@ -331,6 +372,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
const physical::PhysicalPtr &physical_plan) {
+ P::PhysicalPtr target_plan = physical_plan;
E::PredicatePtr filter_predicate = nullptr;
switch (physical_plan->getPhysicalType()) {
case P::PhysicalType::kSelection:
@@ -340,6 +382,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
case P::PhysicalType::kAggregate:
filter_predicate =
std::static_pointer_cast<const P::Aggregate>(physical_plan)->filter_predicate();
+ target_plan = physical_plan->children()[0];
break;
case P::PhysicalType::kHashJoin:
filter_predicate =
@@ -356,7 +399,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
if (filter_predicate == nullptr) {
return 1.0;
} else {
- return estimateSelectivityForPredicate(filter_predicate, physical_plan);
+ return estimateSelectivityForPredicate(filter_predicate, target_plan);
}
}
@@ -443,6 +486,12 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
std::static_pointer_cast<const P::Aggregate>(physical_plan);
return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
}
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+ const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+ return E::SubsetOfExpressions(
+ aggregate_on_left_outer_join->left_join_attributes(), attributes);
+ }
case P::PhysicalType::kHashJoin: {
const P::HashJoinPtr &hash_join =
std::static_pointer_cast<const P::HashJoin>(physical_plan);
@@ -542,6 +591,103 @@ attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
return kInvalidAttributeID;
}
+bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
+ const P::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *max_num_groups) {
+#ifdef QUICKSTEP_DISTRIBUTED
+ // Currently we cannot do this fast path with the distributed setting. See
+ // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+ // FinalizeAggregationOperator::getAllWorkOrderProtos().
+ return false;
+#endif
+
+ // Supports only single group-by key.
+ if (aggregate->grouping_expressions().size() != 1) {
+ return false;
+ }
+
+ // We need to know the exact min/max stats of the group-by key.
+ // So it must be a CatalogAttribute (but not an expression).
+ E::AttributeReferencePtr group_by_key_attr;
+ const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+ return false;
+ }
+
+ bool min_value_stat_is_exact;
+ bool max_value_stat_is_exact;
+ const TypedValue min_value = findMinValueStat(
+ aggregate, group_by_key_attr, &min_value_stat_is_exact);
+ const TypedValue max_value = findMaxValueStat(
+ aggregate, group_by_key_attr, &max_value_stat_is_exact);
+ if (min_value.isNull() || max_value.isNull() ||
+ (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+ return false;
+ }
+
+ std::int64_t min_cpp_value;
+ std::int64_t max_cpp_value;
+ switch (group_by_key_attr->getValueType().getTypeID()) {
+ case TypeID::kInt: {
+ min_cpp_value = min_value.getLiteral<int>();
+ max_cpp_value = max_value.getLiteral<int>();
+ break;
+ }
+ case TypeID::kLong: {
+ min_cpp_value = min_value.getLiteral<std::int64_t>();
+ max_cpp_value = max_value.getLiteral<std::int64_t>();
+ break;
+ }
+ default:
+ return false;
+ }
+
+ // TODO(jianqiao):
+ // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+ // 2. Reason about the table size bound (e.g. by checking memory size) instead
+ // of hardcoding it as a gflag.
+ if (min_cpp_value < 0 ||
+ max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
+ max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+ return false;
+ }
+
+ for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+ const E::AggregateFunctionPtr agg_func =
+ std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+ if (agg_func->is_distinct()) {
+ return false;
+ }
+
+ // TODO(jianqiao): Support AggregationID::AVG.
+ if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
+ AggregationID::kCount,
+ AggregationID::kSum)) {
+ return false;
+ }
+
+ const auto &arguments = agg_func->getArguments();
+ if (arguments.size() > 1u) {
+ return false;
+ }
+
+ if (arguments.size() == 1u) {
+ if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
+ TypeID::kInt,
+ TypeID::kLong,
+ TypeID::kFloat,
+ TypeID::kDouble)) {
+ return false;
+ }
+ }
+ }
+
+ *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+ return true;
+}
+
} // namespace cost
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index cbe18f4..afb2ef9 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -29,6 +29,7 @@
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -166,10 +167,29 @@ class StarSchemaSimpleCostModel : public CostModel {
physical_plan, attribute->id(), StatType::kMax, is_exact_stat);
}
+ /**
+ * @brief Checks whether an aggregate node can be efficiently evaluated with
+ * the collision-free aggregation fast path.
+ *
+ * @param aggregate The physical aggregate node to be checked.
+ * @param estimated_num_groups The estimated number of groups for the aggregate.
+ * @param exact_num_groups If collision-free aggregation is applicable, the
+ * pointed content of this pointer will be set as the maximum possible
+ * number of groups that the collision-free hash table need to hold.
+ * @return A bool value indicating whether collision-free aggregation can be
+ * used to evaluate \p aggregate.
+ */
+ bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *max_num_groups);
+
private:
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
+ std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
std::size_t estimateCardinalityForFilterJoin(
const physical::FilterJoinPtr &physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index f68ed39..77ae75e 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -21,6 +21,9 @@ add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJo
add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
+add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+ CrossReferenceCoalesceAggregate.cpp
+ CrossReferenceCoalesceAggregate.hpp)
add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp)
add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp)
@@ -95,6 +98,16 @@ target_link_libraries(quickstep_queryoptimizer_physical_CreateTable
quickstep_queryoptimizer_physical_PhysicalType
quickstep_utility_Cast
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+ quickstep_queryoptimizer_OptimizerTree
+ quickstep_queryoptimizer_expressions_Alias
+ quickstep_queryoptimizer_expressions_AttributeReference
+ quickstep_queryoptimizer_expressions_ExpressionUtil
+ quickstep_queryoptimizer_expressions_Predicate
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_physical_PhysicalType
+ quickstep_utility_Cast
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_physical_DeleteTuples
glog
quickstep_catalog_CatalogRelation
@@ -293,6 +306,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
quickstep_queryoptimizer_physical_CopyFrom
quickstep_queryoptimizer_physical_CreateIndex
quickstep_queryoptimizer_physical_CreateTable
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_DeleteTuples
quickstep_queryoptimizer_physical_DropTable
quickstep_queryoptimizer_physical_FilterJoin
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
new file mode 100644
index 0000000..6bed215
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "utility/Cast.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+ ::getOutputAttributes() const {
+ std::vector<E::AttributeReferencePtr> output_attributes(left_join_attributes_);
+ for (const auto &aggregate_expr : aggregate_expressions_) {
+ output_attributes.emplace_back(E::ToRef(aggregate_expr));
+ }
+ return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+ ::getReferencedAttributes() const {
+ std::unordered_set<E::AttributeReferencePtr> referenced_attributes;
+
+ referenced_attributes.insert(left_join_attributes_.begin(),
+ left_join_attributes_.end());
+ referenced_attributes.insert(right_join_attributes_.begin(),
+ right_join_attributes_.end());
+
+ if (right_filter_predicate_ != nullptr) {
+ const std::vector<E::AttributeReferencePtr> attrs_in_predicate =
+ right_filter_predicate_->getReferencedAttributes();
+ referenced_attributes.insert(attrs_in_predicate.begin(),
+ attrs_in_predicate.end());
+ }
+
+ for (const auto &aggregate_expr : aggregate_expressions_) {
+ const std::vector<E::AttributeReferencePtr> attrs_in_expr =
+ aggregate_expr->getReferencedAttributes();
+ referenced_attributes.insert(attrs_in_expr.begin(), attrs_in_expr.end());
+ }
+
+ return std::vector<E::AttributeReferencePtr>(
+ referenced_attributes.begin(), referenced_attributes.end());
+}
+
+void CrossReferenceCoalesceAggregate::getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+ inline_field_names->push_back("group_by_key_value_range");
+ inline_field_values->push_back(std::to_string(group_by_key_value_range_));
+
+ non_container_child_field_names->push_back("left_child");
+ non_container_child_fields->push_back(left_child_);
+ non_container_child_field_names->push_back("right_child");
+ non_container_child_fields->push_back(right_child_);
+
+ container_child_field_names->push_back("left_join_attributes");
+ container_child_fields->push_back(
+ CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
+ container_child_field_names->push_back("right_join_attributes");
+ container_child_fields->push_back(
+ CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_));
+
+ if (right_filter_predicate_ != nullptr) {
+ non_container_child_field_names->push_back("right_filter_predicate");
+ non_container_child_fields->push_back(right_filter_predicate_);
+ }
+ container_child_field_names->push_back("aggregate_expressions");
+ container_child_fields->push_back(
+ CastSharedPtrVector<OptimizerTreeBase>(aggregate_expressions_));
+}
+
+} // namespace physical
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
new file mode 100644
index 0000000..44f8a33
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerLogical
+ * @{
+ */
+
+class CrossReferenceCoalesceAggregate;
+typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
+
+/**
+ * @brief A physical node that fuses a HashJoin with an Aggregate to enable
+ * fast-path execution.
+ *
+ * Below we briefly describe the semantics of this physical node.
+ *
+ * Let L be a table with PRIMARY KEY u. Let R be a table with FOREIGN KEY x
+ * referring to L(u). Then CrossReferenceCoalesceAggregate represents a common
+ * class of analytical queries that
+ * - For each u in L, COUNT/SUM the records in R that correspond to u (i.e.
+ * those records satisfying R.x = L.u).
+ * In the case that there is no record for u in R, use 0 as the result value.
+ *
+ * And we have the mapping:
+ * L -> left_child_
+ * R -> right_child_
+ * u -> left_join_attributes_
+ * x -> right_join_attributes_
+ * COUNT/SUM -> aggregate_expressions_
+ */
+class CrossReferenceCoalesceAggregate : public Physical {
+ public:
+ PhysicalType getPhysicalType() const override {
+ return PhysicalType::kCrossReferenceCoalesceAggregate;
+ }
+
+ std::string getName() const override {
+ return "CrossReferenceCoalesceAggregate";
+ }
+
+ /**
+ * @return The left physical child.
+ */
+ const PhysicalPtr& left_child() const {
+ return left_child_;
+ }
+
+ /**
+ * @return The right physical child.
+ */
+ const PhysicalPtr& right_child() const {
+ return right_child_;
+ }
+
+ /**
+ * @return The left join attributes.
+ */
+ const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const {
+ return left_join_attributes_;
+ }
+
+ /**
+ * @return The right join attributes.
+ */
+ const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const {
+ return right_join_attributes_;
+ }
+
+ /**
+ * @return The predicate to be applied to the right child before aggregation.
+ */
+ const expressions::PredicatePtr& right_filter_predicate() const {
+ return right_filter_predicate_;
+ }
+
+ /**
+ * @return Aggregate expressions.
+ */
+ const std::vector<expressions::AliasPtr>& aggregate_expressions() const {
+ return aggregate_expressions_;
+ }
+
+ /**
+ * @return The maximum possible value of the group-by keys when mapped to
+ * integer.
+ */
+ std::size_t group_by_key_value_range() const {
+ return group_by_key_value_range_;
+ }
+
+ PhysicalPtr copyWithNewChildren(
+ const std::vector<PhysicalPtr> &new_children) const override {
+ DCHECK_EQ(getNumChildren(), new_children.size());
+ return Create(new_children[0],
+ new_children[1],
+ left_join_attributes_,
+ right_join_attributes_,
+ right_filter_predicate_,
+ aggregate_expressions_,
+ group_by_key_value_range_);
+ }
+
+ std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+ std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+ bool maybeCopyWithPrunedExpressions(
+ const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+ PhysicalPtr *output) const override {
+ return false;
+ }
+
+ /**
+ * @brief Creates a physical CrossReferenceCoalesceAggregate.
+ *
+ * @param left_child The left child.
+ * @param right_child The right child.
+ * @param left_join_attributes The join attributes of the left child.
+ * @param right_join_attributes The join attributes of the right child.
+ * @param right_filter_predicate Optional filtering predicate evaluated on
+ * the left child before aggregation.
+ * @param aggregate_expressions The aggregate expressions.
+ * @param group_by_key_value_range The maximum possible value of the group-by
+ * keys when mapped to integer.
+ * @return An immutable physical CrossReferenceCoalesceAggregate.
+ */
+ static CrossReferenceCoalesceAggregatePtr Create(
+ const PhysicalPtr &left_child,
+ const PhysicalPtr &right_child,
+ const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+ const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+ const expressions::PredicatePtr right_filter_predicate,
+ const std::vector<expressions::AliasPtr> &aggregate_expressions,
+ const std::size_t group_by_key_value_range) {
+ return CrossReferenceCoalesceAggregatePtr(
+ new CrossReferenceCoalesceAggregate(left_child,
+ right_child,
+ left_join_attributes,
+ right_join_attributes,
+ right_filter_predicate,
+ aggregate_expressions,
+ group_by_key_value_range));
+ }
+
+ protected:
+ void getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+ CrossReferenceCoalesceAggregate(
+ const PhysicalPtr &left_child,
+ const PhysicalPtr &right_child,
+ const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+ const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+ const expressions::PredicatePtr right_filter_predicate,
+ const std::vector<expressions::AliasPtr> &aggregate_expressions,
+ const std::size_t group_by_key_value_range)
+ : left_child_(left_child),
+ right_child_(right_child),
+ left_join_attributes_(left_join_attributes),
+ right_join_attributes_(right_join_attributes),
+ right_filter_predicate_(right_filter_predicate),
+ aggregate_expressions_(aggregate_expressions),
+ group_by_key_value_range_(group_by_key_value_range) {
+ addChild(left_child_);
+ addChild(right_child_);
+ }
+
+ // TODO(jianqiao): For the left child, support filter predicate fusing and
+ // attachment of LIPFilters.
+ PhysicalPtr left_child_;
+ PhysicalPtr right_child_;
+ std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
+ std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
+ expressions::PredicatePtr right_filter_predicate_;
+ std::vector<expressions::AliasPtr> aggregate_expressions_;
+ std::size_t group_by_key_value_range_;
+
+ DISALLOW_COPY_AND_ASSIGN(CrossReferenceCoalesceAggregate);
+};
+
+/** @} */
+
+} // namespace physical
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp
index 4336767..0204504 100644
--- a/query_optimizer/physical/PatternMatcher.hpp
+++ b/query_optimizer/physical/PatternMatcher.hpp
@@ -33,6 +33,7 @@ class Aggregate;
class BinaryJoin;
class CopyFrom;
class CreateTable;
+class CrossReferenceCoalesceAggregate;
class DeleteTuples;
class DropTable;
class FilterJoin;
@@ -112,6 +113,8 @@ using SomeAggregate = SomePhysicalNode<Aggregate, PhysicalType::kAggregate>;
using SomeBinaryJoin = SomePhysicalNode<BinaryJoin, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>;
using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>;
using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>;
+using SomeCrossReferenceCoalesceAggregate = SomePhysicalNode<CrossReferenceCoalesceAggregate,
+ PhysicalType::kCrossReferenceCoalesceAggregate>;
using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>;
using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>;
using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index 1da5929..077bd54 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -36,6 +36,7 @@ enum class PhysicalType {
kCopyFrom,
kCreateIndex,
kCreateTable,
+ kCrossReferenceCoalesceAggregate,
kDeleteTuples,
kDropTable,
kFilterJoin,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/BottomUpRule.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp
index 53dff0d..6c14e64 100644
--- a/query_optimizer/rules/BottomUpRule.hpp
+++ b/query_optimizer/rules/BottomUpRule.hpp
@@ -57,21 +57,7 @@ class BottomUpRule : public Rule<TreeType> {
DCHECK(tree != nullptr);
init(tree);
- std::vector<std::shared_ptr<const TreeType>> new_children;
- bool has_changed_children = false;
- for (const std::shared_ptr<const TreeType> &child : tree->children()) {
- std::shared_ptr<const TreeType> new_child = apply(child);
- if (child != new_child && !has_changed_children) {
- has_changed_children = true;
- }
- new_children.push_back(new_child);
- }
-
- if (has_changed_children) {
- return applyToNode(tree->copyWithNewChildren(new_children));
- } else {
- return applyToNode(tree);
- }
+ return applyInternal(tree);
}
protected:
@@ -89,10 +75,29 @@ class BottomUpRule : public Rule<TreeType> {
*
* @param input The input tree.
*/
- virtual void init(const TreeNodePtr &input) {
- }
+ virtual void init(const TreeNodePtr &input) {}
private:
+ TreeNodePtr applyInternal(const TreeNodePtr &tree) {
+ DCHECK(tree != nullptr);
+
+ std::vector<std::shared_ptr<const TreeType>> new_children;
+ bool has_changed_children = false;
+ for (const std::shared_ptr<const TreeType> &child : tree->children()) {
+ std::shared_ptr<const TreeType> new_child = applyInternal(child);
+ if (child != new_child && !has_changed_children) {
+ has_changed_children = true;
+ }
+ new_children.push_back(new_child);
+ }
+
+ if (has_changed_children) {
+ return applyToNode(tree->copyWithNewChildren(new_children));
+ } else {
+ return applyToNode(tree);
+ }
+ }
+
DISALLOW_COPY_AND_ASSIGN(BottomUpRule);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 029d816..427500d 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
@@ -75,6 +76,27 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
quickstep_queryoptimizer_rules_Rule
quickstep_queryoptimizer_rules_RuleHelper
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin
+ quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+ quickstep_queryoptimizer_expressions_AggregateFunction
+ quickstep_queryoptimizer_expressions_Alias
+ quickstep_queryoptimizer_expressions_AttributeReference
+ quickstep_queryoptimizer_expressions_ExprId
+ quickstep_queryoptimizer_expressions_ExpressionUtil
+ quickstep_queryoptimizer_expressions_NamedExpression
+ quickstep_queryoptimizer_expressions_PatternMatcher
+ quickstep_queryoptimizer_expressions_Predicate
+ quickstep_queryoptimizer_expressions_Scalar
+ quickstep_queryoptimizer_physical_Aggregate
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+ quickstep_queryoptimizer_physical_HashJoin
+ quickstep_queryoptimizer_physical_PatternMatcher
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_physical_PhysicalType
+ quickstep_queryoptimizer_physical_Selection
+ quickstep_queryoptimizer_physical_TopLevelPlan
+ quickstep_queryoptimizer_rules_BottomUpRule
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
glog
quickstep_queryoptimizer_expressions_AttributeReference
@@ -288,6 +310,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
quickstep_queryoptimizer_rules_AttachLIPFilters
quickstep_queryoptimizer_rules_BottomUpRule
quickstep_queryoptimizer_rules_CollapseProject
+ quickstep_queryoptimizer_rules_FuseAggregateJoin
quickstep_queryoptimizer_rules_GenerateJoins
quickstep_queryoptimizer_rules_InjectJoinFilters
quickstep_queryoptimizer_rules_PruneColumns
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/FuseAggregateJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp
new file mode 100644
index 0000000..6efc7e8
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.cpp
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr FuseAggregateJoin::applyToNode(
+ const P::PhysicalPtr &node) {
+ // Currently we consider only Aggregate on HashLeftOuterJoin.
+ P::AggregatePtr aggregate;
+ if (!P::SomeAggregate::MatchesWithConditionalCast(node, &aggregate) ||
+ aggregate->filter_predicate() != nullptr) {
+ return node;
+ }
+
+ P::HashJoinPtr hash_join;
+ if ((!P::SomeHashJoin::MatchesWithConditionalCast(aggregate->input(), &hash_join)) ||
+ hash_join->join_type() != P::HashJoin::JoinType::kLeftOuterJoin ||
+ hash_join->residual_predicate() != nullptr) {
+ return node;
+ }
+
+ // Single left join attribute with unique values.
+ const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+ hash_join->left_join_attributes();
+ if (left_join_attributes.size() != 1u ||
+ (!cost_model_->impliesUniqueAttributes(hash_join->left(), left_join_attributes))) {
+ return node;
+ }
+
+ // Single group-by attribute that is the same as the right join attribute.
+ const std::vector<E::NamedExpressionPtr> &grouping_expressions =
+ aggregate->grouping_expressions();
+ if (grouping_expressions.size() != 1u ||
+ grouping_expressions.front()->id() != left_join_attributes.front()->id()) {
+ return node;
+ }
+
+ std::unordered_set<E::ExprId> right_side_attr_ids;
+ for (const auto &attr : hash_join->right()->getOutputAttributes()) {
+ right_side_attr_ids.insert(attr->id());
+ }
+
+ // Aggregate expressions only depend on attributes from the right child.
+ const std::vector<E::AliasPtr> &aggregate_expressions =
+ aggregate->aggregate_expressions();
+ for (const auto &expr : aggregate_expressions) {
+ const E::AggregateFunctionPtr aggr_expr =
+ std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+
+ const std::vector<E::ScalarPtr> &arguments = aggr_expr->getArguments();
+ if (arguments.size() != 1u) {
+ return node;
+ }
+
+ E::AttributeReferencePtr arg_attr;
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(arguments.front(), &arg_attr) ||
+ right_side_attr_ids.find(arg_attr->id()) == right_side_attr_ids.end()) {
+ return node;
+ }
+ }
+
+ // Collision-free vector aggregation is applicable, and both the left and right
+ // join attributes are range-bounded integer values.
+ const std::size_t estimated_num_groups =
+ cost_model_->estimateNumGroupsForAggregate(aggregate);
+
+ std::size_t max_num_groups_left;
+ if (!cost_model_->canUseCollisionFreeAggregation(aggregate,
+ estimated_num_groups,
+ &max_num_groups_left)) {
+ return node;
+ }
+
+ std::size_t max_num_groups_right;
+ if (!cost_model_->canUseCollisionFreeAggregation(
+ P::Aggregate::Create(hash_join->right(),
+ E::ToNamedExpressions(hash_join->right_join_attributes()),
+ aggregate->aggregate_expressions(),
+ nullptr),
+ estimated_num_groups,
+ &max_num_groups_right)) {
+ return node;
+ }
+
+ // Fuse right child's filter predicate.
+ P::PhysicalPtr right_child = hash_join->right();
+ const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+ hash_join->right_join_attributes();
+ E::PredicatePtr right_filter_predicate = nullptr;
+
+ P::SelectionPtr selection;
+ if (P::SomeSelection::MatchesWithConditionalCast(right_child, &selection)) {
+ if (E::SubsetOfExpressions(right_join_attributes,
+ selection->input()->getOutputAttributes())) {
+ right_child = selection->input();
+ right_filter_predicate = selection->filter_predicate();
+ }
+ }
+
+ const std::size_t max_num_groups =
+ std::max(max_num_groups_left, max_num_groups_right);
+
+ return P::CrossReferenceCoalesceAggregate::Create(hash_join->left(),
+ right_child,
+ left_join_attributes,
+ right_join_attributes,
+ right_filter_predicate,
+ aggregate_expressions,
+ max_num_groups);
+}
+
+void FuseAggregateJoin::init(const P::PhysicalPtr &input) {
+ DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+ const P::TopLevelPlanPtr top_level_plan =
+ std::static_pointer_cast<const P::TopLevelPlan>(input);
+ cost_model_.reset(
+ new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/FuseAggregateJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.hpp b/query_optimizer/rules/FuseAggregateJoin.hpp
new file mode 100644
index 0000000..f2d4c47
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ * @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to fuse Aggregate nodes with
+ * HashJoin nodes.
+ */
+class FuseAggregateJoin : public BottomUpRule<physical::Physical> {
+ public:
+ /**
+ * @brief Constructor.
+ */
+ FuseAggregateJoin() {}
+
+ ~FuseAggregateJoin() override {}
+
+ std::string getName() const override {
+ return "FuseAggregateJoin";
+ }
+
+ protected:
+ physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &node) override;
+
+ void init(const physical::PhysicalPtr &input) override;
+
+ private:
+ std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+ DISALLOW_COPY_AND_ASSIGN(FuseAggregateJoin);
+};
+
+/** @} */
+
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
new file mode 100644
index 0000000..648e291
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/CollisionFreeVectorTable.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+namespace {
+
+template <typename CppType, bool is_attr_nullable>
+void ExecuteBuild(const attribute_id attr_id,
+ ValueAccessor *accessor,
+ BarrieredReadWriteConcurrentBitVector *existence_map) {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+ if (!is_attr_nullable || value != nullptr) {
+ existence_map->setBit(*reinterpret_cast<const CppType *>(value));
+ }
+ }
+ });
+}
+
+// Dispatch helper.
+template <typename CppType>
+void ExecuteHelper(const attribute_id attr_id,
+ const bool is_attr_nullable,
+ ValueAccessor *accessor,
+ BarrieredReadWriteConcurrentBitVector *existence_map) {
+ if (is_attr_nullable) {
+ ExecuteBuild<CppType, true>(attr_id, accessor, existence_map);
+ } else {
+ ExecuteBuild<CppType, false>(attr_id, accessor, existence_map);
+ }
+}
+
+} // namespace
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ if (input_relation_is_stored_) {
+ if (!started_) {
+ for (const block_id input_block_id : input_relation_block_ids_) {
+ container->addNormalWorkOrder(
+ new BuildAggregationExistenceMapWorkOrder(
+ query_id_,
+ input_relation_,
+ input_block_id,
+ build_attribute_,
+ query_context->getAggregationState(aggr_state_index_),
+ storage_manager),
+ op_index_);
+ }
+ started_ = true;
+ }
+ return true;
+ } else {
+ while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ container->addNormalWorkOrder(
+ new BuildAggregationExistenceMapWorkOrder(
+ query_id_,
+ input_relation_,
+ input_relation_block_ids_[num_workorders_generated_],
+ build_attribute_,
+ query_context->getAggregationState(aggr_state_index_),
+ storage_manager),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ return done_feeding_input_relation_;
+ }
+}
+
+bool BuildAggregationExistenceMapOperator
+ ::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (input_relation_is_stored_) {
+ if (!started_) {
+ for (const block_id block : input_relation_block_ids_) {
+ container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+ }
+ started_ = true;
+ }
+ return true;
+ } else {
+ while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ return done_feeding_input_relation_;
+ }
+}
+
+serialization::WorkOrder* BuildAggregationExistenceMapOperator
+ ::createWorkOrderProto(const block_id block) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id,
+ input_relation_.getID());
+ proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id,
+ block);
+ proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute,
+ build_attribute_);
+ proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index,
+ aggr_state_index_);
+ return proto;
+}
+
+void BuildAggregationExistenceMapWorkOrder::execute() {
+ BlockReference block(
+ storage_manager_->getBlock(build_block_id_, input_relation_));
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor());
+
+ CollisionFreeVectorTable *aggregate_table =
+ state_->getCollisionFreeVectorTable();
+ DCHECK(aggregate_table != nullptr);
+
+ BarrieredReadWriteConcurrentBitVector *existence_map =
+ aggregate_table->getExistenceMap();
+
+ const Type &attr_type =
+ input_relation_.getAttributeById(build_attribute_)->getType();
+ switch (attr_type.getTypeID()) {
+ case TypeID::kInt:
+ ExecuteHelper<int>(build_attribute_,
+ attr_type.isNullable(),
+ accessor.get(),
+ existence_map);
+ return;
+ case TypeID::kLong:
+ ExecuteHelper<std::int64_t>(build_attribute_,
+ attr_type.isNullable(),
+ accessor.get(),
+ existence_map);
+ return;
+ default:
+ LOG(FATAL) << "Build attribute type not supported by "
+ << "BuildAggregationExistenceMapOperator: "
+ << attr_type.getName();
+ }
+}
+
+} // namespace quickstep