You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/02 18:09:51 UTC
[08/10] incubator-quickstep git commit: Add BitVectorExactFilter as a
LIP filter and supports Join-to-Semijoin transformation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/InjectJoinFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.cpp b/query_optimizer/rules/InjectJoinFilters.cpp
new file mode 100644
index 0000000..0fcd06b
--- /dev/null
+++ b/query_optimizer/rules/InjectJoinFilters.cpp
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/InjectJoinFilters.hpp"
+
+#include <cstddef>
+#include <cstdint>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/rules/PruneColumns.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) {
+ DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+ const P::TopLevelPlanPtr top_level_plan =
+ std::static_pointer_cast<const P::TopLevelPlan>(input);
+ cost_model_.reset(
+ new cost::StarSchemaSimpleCostModel(
+ top_level_plan->shared_subplans()));
+ lip_filter_configuration_.reset(new P::LIPFilterConfiguration());
+
+ // Step 1. Transform applicable HashJoin nodes to FilterJoin nodes.
+ P::PhysicalPtr output = transformHashJoinToFilters(input);
+
+ // Step 2. Push down FilterJoin nodes to be evaluated early.
+ output = pushDownFilters(output);
+
+ // Step 3. Add Selection nodes for attaching the LIPFilters, if necessary.
+ output = addFilterAnchors(output, false);
+
+ // Step 4. Because of the pushdown of FilterJoin nodes, there are optimization
+ // opportunities for projecting columns early.
+ output = PruneColumns().apply(output);
+
+ // Step 5. For each FilterJoin node, attach its corresponding LIPFilter to
+ // proper nodes.
+ concretizeAsLIPFilters(output, nullptr);
+
+ if (!lip_filter_configuration_->getBuildInfoMap().empty() ||
+ !lip_filter_configuration_->getProbeInfoMap().empty()) {
+ output = std::static_pointer_cast<const P::TopLevelPlan>(output)
+ ->copyWithLIPFilterConfiguration(
+ P::LIPFilterConfigurationPtr(lip_filter_configuration_.release()));
+ }
+
+ return output;
+}
+
+bool InjectJoinFilters::isTransformable(
+ const physical::HashJoinPtr &hash_join) const {
+ // Conditions for replacing a HashJoin with a FilterJoin:
+
+ // No residual predicate.
+ if (hash_join->residual_predicate() != nullptr) {
+ return false;
+ }
+ // Single attribute equi-join.
+ if (hash_join->right_join_attributes().size() > 1) {
+ return false;
+ }
+ // All the output attributes must be from the probe side.
+ if (!E::SubsetOfExpressions(hash_join->getOutputAttributes(),
+ hash_join->left()->getOutputAttributes())) {
+ return false;
+ }
+ switch (hash_join->join_type()) {
+ case P::HashJoin::JoinType::kInnerJoin: {
+ // In the case of inner join, the build side join attributes must be unique.
+ if (!cost_model_->impliesUniqueAttributes(hash_join->right(),
+ hash_join->right_join_attributes())) {
+ return false;
+ }
+ break;
+ }
+ case P::HashJoin::JoinType::kLeftSemiJoin: // Fall through
+ case P::HashJoin::JoinType::kLeftAntiJoin:
+ break;
+ default:
+ return false;
+ }
+
+ // The build side join attribute has integer type and its values are exactly
+ // within a reasonable range.
+ std::int64_t min_cpp_value;
+ std::int64_t max_cpp_value;
+ const bool has_exact_min_max_stats =
+ findExactMinMaxValuesForAttributeHelper(hash_join->right(),
+ hash_join->right_join_attributes().front(),
+ &min_cpp_value,
+ &max_cpp_value);
+ if (!has_exact_min_max_stats) {
+ return false;
+ }
+
+ // TODO(jianqiao): implement SimpleHashSetExactFilter to relax this requirement.
+ // Note that 1G bits = 128MB.
+ const std::int64_t value_range = max_cpp_value - min_cpp_value;
+ DCHECK_GE(value_range, 0);
+ if (value_range > kMaxFilterSize) {
+ return false;
+ }
+
+ return true;
+}
+
+P::PhysicalPtr InjectJoinFilters::transformHashJoinToFilters(
+ const P::PhysicalPtr &input) const {
+ std::vector<P::PhysicalPtr> new_children;
+ bool has_changed_children = false;
+ for (const P::PhysicalPtr &child : input->children()) {
+ const P::PhysicalPtr new_child = transformHashJoinToFilters(child);
+ if (child != new_child && !has_changed_children) {
+ has_changed_children = true;
+ }
+ new_children.push_back(new_child);
+ }
+
+ P::HashJoinPtr hash_join;
+ if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join) &&
+ isTransformable(hash_join)) {
+ const bool is_anti_join =
+ hash_join->join_type() == P::HashJoin::JoinType::kLeftAntiJoin;
+
+ DCHECK_EQ(2u, new_children.size());
+ P::PhysicalPtr build_child = new_children[1];
+ E::PredicatePtr build_side_filter_predicate = nullptr;
+ P::SelectionPtr selection;
+ if (P::SomeSelection::MatchesWithConditionalCast(build_child, &selection) &&
+ E::SubsetOfExpressions(hash_join->right_join_attributes(),
+ selection->input()->getOutputAttributes())) {
+ build_child = selection->input();
+ build_side_filter_predicate = selection->filter_predicate();
+ }
+
+ return P::FilterJoin::Create(new_children[0],
+ build_child,
+ hash_join->left_join_attributes(),
+ hash_join->right_join_attributes(),
+ hash_join->project_expressions(),
+ build_side_filter_predicate,
+ is_anti_join);
+ }
+
+ if (has_changed_children) {
+ return input->copyWithNewChildren(new_children);
+ } else {
+ return input;
+ }
+}
+
+physical::PhysicalPtr InjectJoinFilters::pushDownFilters(
+ const physical::PhysicalPtr &input) const {
+ std::vector<P::PhysicalPtr> new_children;
+ bool has_changed_children = false;
+ for (const P::PhysicalPtr &child : input->children()) {
+ const P::PhysicalPtr new_child = pushDownFilters(child);
+ if (child != new_child && !has_changed_children) {
+ has_changed_children = true;
+ }
+ new_children.push_back(new_child);
+ }
+
+ P::FilterJoinPtr filter_join;
+ if (P::SomeFilterJoin::MatchesWithConditionalCast(input, &filter_join)) {
+ DCHECK_EQ(2u, new_children.size());
+ return pushDownFiltersInternal(
+ new_children[0], new_children[1], filter_join);
+ }
+
+ if (has_changed_children) {
+ return input->copyWithNewChildren(new_children);
+ } else {
+ return input;
+ }
+}
+
+physical::PhysicalPtr InjectJoinFilters::pushDownFiltersInternal(
+ const physical::PhysicalPtr &probe_child,
+ const physical::PhysicalPtr &build_child,
+ const physical::FilterJoinPtr &filter_join) const {
+ switch (probe_child->getPhysicalType()) {
+ case P::PhysicalType::kAggregate: // Fall through
+ case P::PhysicalType::kHashJoin:
+ case P::PhysicalType::kSample:
+ case P::PhysicalType::kSelection:
+ case P::PhysicalType::kSort:
+ case P::PhysicalType::kWindowAggregate: {
+ DCHECK_GE(probe_child->getNumChildren(), 1u);
+ const P::PhysicalPtr child = probe_child->children()[0];
+ if (E::SubsetOfExpressions(filter_join->probe_attributes(),
+ child->getOutputAttributes())) {
+ const P::PhysicalPtr new_child =
+ pushDownFiltersInternal(child, build_child, filter_join);
+ if (new_child != child) {
+ std::vector<P::PhysicalPtr> new_children = probe_child->children();
+ new_children[0] = new_child;
+ return probe_child->copyWithNewChildren(new_children);
+ }
+ }
+ }
+ default:
+ break;
+ }
+
+ if (probe_child != filter_join->left()) {
+ // TODO(jianqiao): may need to update probe_attributes.
+ return P::FilterJoin::Create(probe_child,
+ build_child,
+ filter_join->probe_attributes(),
+ filter_join->build_attributes(),
+ E::ToNamedExpressions(probe_child->getOutputAttributes()),
+ filter_join->build_side_filter_predicate(),
+ filter_join->is_anti_join());
+ } else {
+ return filter_join;
+ }
+}
+
+
+physical::PhysicalPtr InjectJoinFilters::addFilterAnchors(
+ const physical::PhysicalPtr &input,
+ const bool ancestor_can_anchor_filter) const {
+ std::vector<P::PhysicalPtr> new_children;
+
+ switch (input->getPhysicalType()) {
+ case P::PhysicalType::kAggregate: {
+ const P::AggregatePtr &aggregate =
+ std::static_pointer_cast<const P::Aggregate>(input);
+ new_children.emplace_back(
+ addFilterAnchors(aggregate->input(), true));
+ break;
+ }
+ case P::PhysicalType::kSelection: {
+ const P::SelectionPtr &selection =
+ std::static_pointer_cast<const P::Selection>(input);
+ new_children.emplace_back(
+ addFilterAnchors(selection->input(), true));
+ break;
+ }
+ // NOTE(jianqiao): Some of the SSB/TPCH queries slow down significantly if
+ // we attach converted filters to parent HashJoin nodes. E.g. one HashJoin +
+ // one attached LIPFilter is slower than the original two HashJoins. This is
+ // due to some implementation issues with the current HashJoinOperator. So
+ // currently we disable the anchoring of filters to HashJoin nodes. That is,
+ // in the case that a FilterJoin's parent node (or ancestor node, if there
+ // is a chain of FilterJoins) is a HashJoin, we create an extra Selection
+ // before the parent HashJoin as anchoring node to attach the filters. This
+ // guarantees non-degrading performance.
+ /*
+ case P::PhysicalType::kHashJoin: {
+ const P::HashJoinPtr &hash_join =
+ std::static_pointer_cast<const P::HashJoin>(input);
+ new_children.emplace_back(
+ addFilterAnchors(hash_join->left(), true));
+ new_children.emplace_back(
+ addFilterAnchors(hash_join->right(), false));
+ break;
+ }
+ */
+ case P::PhysicalType::kFilterJoin: {
+ const P::FilterJoinPtr &filter_join =
+ std::static_pointer_cast<const P::FilterJoin>(input);
+ new_children.emplace_back(
+ addFilterAnchors(filter_join->left(), true));
+ new_children.emplace_back(
+ addFilterAnchors(filter_join->right(), true));
+ break;
+ }
+ default: {
+ for (const P::PhysicalPtr &child : input->children()) {
+ new_children.emplace_back(addFilterAnchors(child, false));
+ }
+ }
+ }
+
+ DCHECK_EQ(new_children.size(), input->children().size());
+ const P::PhysicalPtr output_with_new_children =
+ new_children == input->children()
+ ? input
+ : input->copyWithNewChildren(new_children);
+
+ if (input->getPhysicalType() == P::PhysicalType::kFilterJoin &&
+ !ancestor_can_anchor_filter) {
+ const P::FilterJoinPtr &filter_join =
+ std::static_pointer_cast<const P::FilterJoin>(output_with_new_children);
+ return P::Selection::Create(filter_join,
+ filter_join->project_expressions(),
+ nullptr);
+ } else {
+ return output_with_new_children;
+ }
+}
+
+void InjectJoinFilters::concretizeAsLIPFilters(
+ const P::PhysicalPtr &input,
+ const P::PhysicalPtr &anchor_node) const {
+ switch (input->getPhysicalType()) {
+ case P::PhysicalType::kAggregate: {
+ const P::AggregatePtr &aggregate =
+ std::static_pointer_cast<const P::Aggregate>(input);
+ concretizeAsLIPFilters(aggregate->input(), aggregate);
+ break;
+ }
+ case P::PhysicalType::kSelection: {
+ const P::SelectionPtr &selection =
+ std::static_pointer_cast<const P::Selection>(input);
+ concretizeAsLIPFilters(selection->input(), selection);
+ break;
+ }
+ // Currently we disable the attachment of filters to HashJoin nodes. See the
+ // comments in InjectJoinFilters::addFilterAnchors().
+ /*
+ case P::PhysicalType::kHashJoin: {
+ const P::HashJoinPtr &hash_join =
+ std::static_pointer_cast<const P::HashJoin>(input);
+ concretizeAsLIPFilters(hash_join->left(), hash_join);
+ concretizeAsLIPFilters(hash_join->right(), nullptr);
+ break;
+ }
+ */
+ case P::PhysicalType::kFilterJoin: {
+ const P::FilterJoinPtr &filter_join =
+ std::static_pointer_cast<const P::FilterJoin>(input);
+ DCHECK_EQ(1u, filter_join->build_attributes().size());
+ const E::AttributeReferencePtr &build_attr =
+ filter_join->build_attributes().front();
+
+ std::int64_t min_cpp_value;
+ std::int64_t max_cpp_value;
+ const bool has_exact_min_max_stats =
+ findExactMinMaxValuesForAttributeHelper(filter_join,
+ build_attr,
+ &min_cpp_value,
+ &max_cpp_value);
+ DCHECK(has_exact_min_max_stats);
+ DCHECK_GE(max_cpp_value, min_cpp_value);
+ DCHECK_LE(max_cpp_value - min_cpp_value, kMaxFilterSize);
+ CHECK(anchor_node != nullptr);
+
+ lip_filter_configuration_->addBuildInfo(
+ P::BitVectorExactFilterBuildInfo::Create(build_attr,
+ min_cpp_value,
+ max_cpp_value,
+ filter_join->is_anti_join()),
+ filter_join);
+ lip_filter_configuration_->addProbeInfo(
+ P::LIPFilterProbeInfo::Create(filter_join->probe_attributes().front(),
+ build_attr,
+ filter_join),
+ anchor_node);
+
+ concretizeAsLIPFilters(filter_join->left(), anchor_node);
+ concretizeAsLIPFilters(filter_join->right(), filter_join);
+ break;
+ }
+ default: {
+ for (const P::PhysicalPtr &child : input->children()) {
+ concretizeAsLIPFilters(child, nullptr);
+ }
+ }
+ }
+}
+
+bool InjectJoinFilters::findExactMinMaxValuesForAttributeHelper(
+ const physical::PhysicalPtr &physical_plan,
+ const expressions::AttributeReferencePtr &attribute,
+ std::int64_t *min_cpp_value,
+ std::int64_t *max_cpp_value) const {
+ bool min_value_is_exact;
+ bool max_value_is_exact;
+
+ const TypedValue min_value =
+ cost_model_->findMinValueStat(physical_plan, attribute, &min_value_is_exact);
+ const TypedValue max_value =
+ cost_model_->findMaxValueStat(physical_plan, attribute, &max_value_is_exact);
+ if (min_value.isNull() || max_value.isNull() ||
+ (!min_value_is_exact) || (!max_value_is_exact)) {
+ return false;
+ }
+
+ switch (attribute->getValueType().getTypeID()) {
+ case TypeID::kInt: {
+ *min_cpp_value = min_value.getLiteral<int>();
+ *max_cpp_value = max_value.getLiteral<int>();
+ return true;
+ }
+ case TypeID::kLong: {
+ *min_cpp_value = min_value.getLiteral<std::int64_t>();
+ *max_cpp_value = max_value.getLiteral<std::int64_t>();
+ return true;
+ }
+ default:
+ return false;
+ }
+}
+
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/InjectJoinFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.hpp b/query_optimizer/rules/InjectJoinFilters.hpp
new file mode 100644
index 0000000..c5250b3
--- /dev/null
+++ b/query_optimizer/rules/InjectJoinFilters.hpp
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ * @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to transform HashJoin nodes into
+ * FilterJoin nodes.
+ *
+ * This is an optimization that strength-reduces HashJoins to FilterJoins
+ * (implemented as LIPFilters attached to some anchoring operators where the
+ * filters get applied). Briefly speaking, the idea is that in the case that
+ * (1) the join attribute has consecutive integer values bounded in a reasonably
+ * small range AND (2) the output attributes are all from the probe-side table,
+ * we can eliminate the HashJoin by building a BitVector on the build-side
+ * attribute and using the BitVector to filter the probe-side table.
+ */
+class InjectJoinFilters : public Rule<physical::Physical> {
+ public:
+ /**
+ * @brief Constructor.
+ */
+ InjectJoinFilters() {}
+
+ ~InjectJoinFilters() override {}
+
+ std::string getName() const override {
+ return "TransformFilterJoins";
+ }
+
+ physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+ // Check whether a HashJoin can be transformed into a FilterJoin.
+ bool isTransformable(const physical::HashJoinPtr &hash_join) const;
+
+ // Transform applicable HashJoin nodes into FilterJoin nodes.
+ physical::PhysicalPtr transformHashJoinToFilters(
+ const physical::PhysicalPtr &input) const;
+
+ // Push down FilterJoin nodes to be evaluated early.
+ physical::PhysicalPtr pushDownFilters(const physical::PhysicalPtr &input) const;
+
+ // Add Selection node, if necessary, for anchoring the LIP filters built by
+ // FilterJoin nodes.
+ physical::PhysicalPtr addFilterAnchors(const physical::PhysicalPtr &input,
+ const bool ancestor_can_anchor_filter) const;
+
+ // Setup lip_filter_configuration_ with the transformed plan tree.
+ void concretizeAsLIPFilters(const physical::PhysicalPtr &input,
+ const physical::PhysicalPtr &anchor_node) const;
+
+ physical::PhysicalPtr pushDownFiltersInternal(
+ const physical::PhysicalPtr &probe_child,
+ const physical::PhysicalPtr &build_child,
+ const physical::FilterJoinPtr &filter_join) const;
+
+ bool findExactMinMaxValuesForAttributeHelper(
+ const physical::PhysicalPtr &physical_plan,
+ const expressions::AttributeReferencePtr &attribute,
+ std::int64_t *min_cpp_value,
+ std::int64_t *max_cpp_value) const;
+
+ std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+ std::unique_ptr<physical::LIPFilterConfiguration> lip_filter_configuration_;
+
+ // TODO(jianqiao): Add this threshold as a gflag.
+ // Note that 1G bits = 128MB
+ static constexpr std::int64_t kMaxFilterSize = 1000000000L;
+
+ DISALLOW_COPY_AND_ASSIGN(InjectJoinFilters);
+};
+
+/** @} */
+
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/tests/OptimizerTextTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/OptimizerTextTest.cpp b/query_optimizer/tests/OptimizerTextTest.cpp
index e17f5c4..07accf6 100644
--- a/query_optimizer/tests/OptimizerTextTest.cpp
+++ b/query_optimizer/tests/OptimizerTextTest.cpp
@@ -34,6 +34,7 @@ namespace optimizer {
DECLARE_bool(reorder_columns);
DECLARE_bool(reorder_hash_joins);
DECLARE_bool(use_lip_filters);
+DECLARE_bool(use_filter_joins);
}
}
@@ -64,6 +65,7 @@ int main(int argc, char** argv) {
quickstep::optimizer::FLAGS_reorder_columns = false;
quickstep::optimizer::FLAGS_reorder_hash_joins = false;
quickstep::optimizer::FLAGS_use_lip_filters = false;
+ quickstep::optimizer::FLAGS_use_filter_joins = false;
::testing::InitGoogleTest(&argc, argv);
int success = RUN_ALL_TESTS();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
new file mode 100644
index 0000000..f7c09cd
--- /dev/null
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildLIPFilterOperator.hpp"
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool BuildLIPFilterOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ DCHECK(query_context != nullptr);
+
+ const Predicate *build_side_predicate =
+ query_context->getPredicate(build_side_predicate_index_);
+
+ if (input_relation_is_stored_) {
+ if (!started_) {
+ for (const block_id input_block_id : input_relation_block_ids_) {
+ container->addNormalWorkOrder(
+ new BuildLIPFilterWorkOrder(
+ query_id_,
+ input_relation_,
+ input_block_id,
+ build_side_predicate,
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
+ CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
+ op_index_);
+ }
+ started_ = true;
+ }
+ return true;
+ } else {
+ while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ container->addNormalWorkOrder(
+ new BuildLIPFilterWorkOrder(
+ query_id_,
+ input_relation_,
+ input_relation_block_ids_[num_workorders_generated_],
+ build_side_predicate,
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
+ CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ return done_feeding_input_relation_;
+ }
+}
+
+bool BuildLIPFilterOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (input_relation_is_stored_) {
+ if (!started_) {
+ for (const block_id block : input_relation_block_ids_) {
+ container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+ }
+ started_ = true;
+ }
+ return true;
+ } else {
+ while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ return done_feeding_input_relation_;
+ }
+}
+
+serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const block_id block) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::BuildLIPFilterWorkOrder::relation_id, input_relation_.getID());
+ proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id, block);
+ proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index,
+ build_side_predicate_index_);
+ proto->SetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index, lip_deployment_index_);
+
+ return proto;
+}
+
+void BuildLIPFilterWorkOrder::execute() {
+ BlockReference block(
+ storage_manager_->getBlock(build_block_id_, input_relation_));
+
+ // Apply the predicate first.
+ std::unique_ptr<TupleIdSequence> predicate_matches;
+ if (build_side_predicate_ != nullptr) {
+ predicate_matches.reset(block->getMatchesForPredicate(build_side_predicate_));
+ }
+
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor(predicate_matches.get()));
+
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ // Probe the LIP filters if there are any. Note that the LIP filters to be
+ // probed are for filtering the input relation. They are distinct from the
+ // target LIP filters we are building.
+ std::unique_ptr<TupleIdSequence> matches(
+ lip_filter_adaptive_prober_->filterValueAccessor(accessor.get()));
+ std::unique_ptr<ValueAccessor> filtered_accessor(
+ accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+
+ lip_filter_builder_->insertValueAccessor(filtered_accessor.get());
+ } else {
+ lip_filter_builder_->insertValueAccessor(accessor.get());
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/BuildLIPFilterOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
new file mode 100644
index 0000000..5192b40
--- /dev/null
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogRelationSchema;
+class Predicate;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+/**
+ * @brief An operator which builds LIPFilters on one relation.
+ **/
+class BuildLIPFilterOperator : public RelationalOperator {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @note The LIPFilters' information are not passed explicitly as parameters
+ * to this constructor, but attached later via RelationalOperator::deployLIPFilters().
+ *
+ * @param query_id The ID of the query to which this operator belongs.
+ * @param input_relation The relation to build LIP filters on.
+ * @param build_side_predicate_index The index of the predicate in QueryContext
+ * where the predicate is to be applied to the input relation before
+ * building the LIP filters (or kInvalidPredicateId if no predicate is
+ * to be applied).
+ * @param input_relation_is_stored If input_relation is a stored relation and
+ * is fully available to the operator before it can start generating
+ * workorders.
+ **/
+ BuildLIPFilterOperator(const std::size_t query_id,
+ const CatalogRelation &input_relation,
+ const QueryContext::predicate_id build_side_predicate_index,
+ const bool input_relation_is_stored)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
+ build_side_predicate_index_(build_side_predicate_index),
+ input_relation_is_stored_(input_relation_is_stored),
+ input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+ : std::vector<block_id>()),
+ num_workorders_generated_(0),
+ started_(false) {}
+
+ ~BuildLIPFilterOperator() override {}
+
+ /**
+ * @return The input relation to build LIP filters on.
+ */
+ const CatalogRelation& input_relation() const {
+ return input_relation_;
+ }
+
+ std::string getName() const override {
+ return "BuildLIPFilterOperator";
+ }
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ void feedInputBlock(const block_id input_block_id,
+ const relation_id input_relation_id,
+ const partition_id part_id) override {
+ input_relation_block_ids_.push_back(input_block_id);
+ }
+
+ private:
+ /**
+ * @brief Create Work Order proto.
+ *
+ * @param block The block id used in the Work Order.
+ **/
+ serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+ const CatalogRelation &input_relation_;
+ const QueryContext::predicate_id build_side_predicate_index_;
+ const bool input_relation_is_stored_;
+
+ std::vector<block_id> input_relation_block_ids_;
+ std::vector<block_id>::size_type num_workorders_generated_;
+
+ bool started_;
+
+ DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildLIPFilterOperator.
+ **/
+class BuildLIPFilterWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_id The ID of the query to which this WorkOrder belongs.
+ * @param input_relation The relation to build LIP filters on.
+ * @param build_block_id The block id.
+ * @param build_side_predicate The predicate to be applied to filter the input
+ * relation before building the LIP filters (or nullptr if no predicate
+ * is to be applied).
+ * @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
+ * @param lip_filter_builder The attached LIP filter builder.
+ **/
+ BuildLIPFilterWorkOrder(const std::size_t query_id,
+ const CatalogRelationSchema &input_relation,
+ const block_id build_block_id,
+ const Predicate *build_side_predicate,
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober,
+ LIPFilterBuilder *lip_filter_builder)
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
+ build_block_id_(build_block_id),
+ build_side_predicate_(build_side_predicate),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober),
+ lip_filter_builder_(DCHECK_NOTNULL(lip_filter_builder)) {}
+
+ ~BuildLIPFilterWorkOrder() override {}
+
+ /**
+ * @return The input relation to build LIP filters on.
+ */
+ const CatalogRelationSchema& input_relation() const {
+ return input_relation_;
+ }
+
+ void execute() override;
+
+ private:
+ const CatalogRelationSchema &input_relation_;
+ const block_id build_block_id_;
+ const Predicate *build_side_predicate_;
+
+ StorageManager *storage_manager_;
+
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+ std::unique_ptr<LIPFilterBuilder> lip_filter_builder_;
+
+ DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c8447f3..c18dc77 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -34,6 +34,7 @@ set_gflags_lib_name ()
# Declare micro-libs:
add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
+add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp)
add_library(quickstep_relationaloperators_DestroyAggregationStateOperator
@@ -113,6 +114,27 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator
quickstep_utility_lipfilter_LIPFilterBuilder
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
+target_link_libraries(quickstep_relationaloperators_BuildLIPFilterOperator
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleIdSequence
+ quickstep_storage_TupleStorageSubBlock
+ quickstep_storage_ValueAccessor
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+ quickstep_utility_lipfilter_LIPFilterBuilder
+ quickstep_utility_lipfilter_LIPFilterUtil
+ tmb)
target_link_libraries(quickstep_relationaloperators_CreateIndexOperator
glog
quickstep_catalog_CatalogRelation
@@ -483,6 +505,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_queryexecution_QueryContext
quickstep_relationaloperators_AggregationOperator
quickstep_relationaloperators_BuildHashOperator
+ quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_DeleteOperator
quickstep_relationaloperators_DestroyAggregationStateOperator
quickstep_relationaloperators_DestroyHashOperator
@@ -515,6 +538,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_CreateIndexOperator
quickstep_relationaloperators_CreateTableOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index f8d9246..76753d2 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -24,25 +24,26 @@ import "relational_operators/SortMergeRunOperator.proto";
enum WorkOrderType {
AGGREGATION = 1;
BUILD_HASH = 2;
- CREATE_INDEX = 3; // Placeholder.
- CREATE_TABLE = 4; // Placeholder.
- DELETE = 5;
- DESTROY_HASH = 6;
- DROP_TABLE = 7;
- FINALIZE_AGGREGATION = 8;
- HASH_JOIN = 9;
- INSERT = 10;
- NESTED_LOOP_JOIN = 11;
- SAMPLE = 12;
- SAVE_BLOCKS = 13;
- SELECT = 14;
- SORT_MERGE_RUN = 15;
- SORT_RUN_GENERATION = 16;
- TABLE_GENERATOR = 17;
- TEXT_SCAN = 18;
- UPDATE = 19;
- WINDOW_AGGREGATION = 20;
- DESTROY_AGGREGATION_STATE = 21;
+ BUILD_LIP_FILTER = 3;
+ CREATE_INDEX = 4; // Placeholder.
+ CREATE_TABLE = 5; // Placeholder.
+ DELETE = 6;
+ DESTROY_HASH = 7;
+ DROP_TABLE = 8;
+ FINALIZE_AGGREGATION = 9;
+ HASH_JOIN = 10;
+ INSERT = 11;
+ NESTED_LOOP_JOIN = 12;
+ SAMPLE = 13;
+ SAVE_BLOCKS = 14;
+ SELECT = 15;
+ SORT_MERGE_RUN = 16;
+ SORT_RUN_GENERATION = 17;
+ TABLE_GENERATOR = 18;
+ TEXT_SCAN = 19;
+ UPDATE = 20;
+ WINDOW_AGGREGATION = 21;
+ DESTROY_AGGREGATION_STATE = 22;
}
message WorkOrder {
@@ -77,6 +78,16 @@ message BuildHashWorkOrder {
}
}
+message BuildLIPFilterWorkOrder {
+ extend WorkOrder {
+ // All required.
+ optional int32 relation_id = 48;
+ optional fixed64 build_block_id = 49;
+ optional int32 build_side_predicate_index = 50;
+ optional int32 lip_deployment_index = 51;
+ }
+}
+
message DeleteWorkOrder {
extend WorkOrder {
// All required.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index a6cba02..5e8d03d 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -30,6 +30,7 @@
#include "query_execution/QueryContext.hpp"
#include "relational_operators/AggregationOperator.hpp"
#include "relational_operators/BuildHashOperator.hpp"
+#include "relational_operators/BuildLIPFilterOperator.hpp"
#include "relational_operators/DeleteOperator.hpp"
#include "relational_operators/DestroyAggregationStateOperator.hpp"
#include "relational_operators/DestroyHashOperator.hpp"
@@ -90,6 +91,23 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
CreateLIPFilterAdaptiveProberHelper(
proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
}
+ case serialization::BUILD_LIP_FILTER: {
+ LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index;
+
+ const QueryContext::lip_deployment_id lip_deployment_index =
+ proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index);
+
+ return new BuildLIPFilterWorkOrder(
+ proto.query_id(),
+ catalog_database->getRelationSchemaById(
+ proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id)),
+ proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id),
+ query_context->getPredicate(
+ proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index)),
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index, query_context),
+ CreateLIPFilterBuilderHelper(lip_deployment_index, query_context));
+ }
case serialization::BUILD_HASH: {
LOG(INFO) << "Creating BuildHashWorkOrder in Shiftboss " << shiftboss_index;
vector<attribute_id> join_key_attributes;
@@ -541,6 +559,33 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index),
proto.GetExtension(serialization::BuildHashWorkOrder::partition_id));
}
+ case serialization::BUILD_LIP_FILTER: {
+ if (!proto.HasExtension(serialization::BuildLIPFilterWorkOrder::relation_id)) {
+ return false;
+ }
+
+ const relation_id rel_id =
+ proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id);
+ if (!catalog_database.hasRelationWithId(rel_id)) {
+ return false;
+ }
+
+ if (!proto.HasExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index)) {
+ return false;
+ } else {
+ const QueryContext::lip_deployment_id lip_deployment_index =
+ proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index);
+ if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId &&
+ !query_context.isValidLIPDeploymentId(lip_deployment_index)) {
+ return false;
+ }
+ }
+
+ return proto.HasExtension(serialization::BuildLIPFilterWorkOrder::build_block_id) &&
+ proto.HasExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index) &&
+ query_context.isValidPredicate(
+ proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index));
+ }
case serialization::DELETE: {
return proto.HasExtension(serialization::DeleteWorkOrder::relation_id) &&
catalog_database.hasRelationWithId(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 8571149..aeff388 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -270,6 +270,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer
quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_ExprId
+ quickstep_queryoptimizer_physical_FilterJoin
quickstep_queryoptimizer_physical_HashJoin
quickstep_queryoptimizer_physical_LIPFilterConfiguration
quickstep_queryoptimizer_physical_Physical
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index df7a20c..f8bf6f8 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -32,6 +32,7 @@
#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
@@ -58,6 +59,8 @@ std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) {
color_map_["TableReference"] = "skyblue";
color_map_["Selection"] = "#90EE90";
+ color_map_["FilterJoin"] = "pink";
+ color_map_["FilterJoin(Anti)"] = "pink";
color_map_["HashJoin"] = "red";
color_map_["HashLeftOuterJoin"] = "orange";
color_map_["HashLeftSemiJoin"] = "orange";
@@ -126,7 +129,8 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
edge_info.dst_node_id = node_id;
edge_info.dashed = false;
- if (input->getPhysicalType() == P::PhysicalType::kHashJoin &&
+ if ((input->getPhysicalType() == P::PhysicalType::kHashJoin ||
+ input->getPhysicalType() == P::PhysicalType::kFilterJoin) &&
child == input->children()[1]) {
edge_info.dashed = true;
}
@@ -165,6 +169,20 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
}
break;
}
+ case P::PhysicalType::kFilterJoin: {
+ const P::FilterJoinPtr filter_join =
+ std::static_pointer_cast<const P::FilterJoin>(input);
+ node_info.labels.emplace_back(input->getName());
+
+ const auto &probe_attributes = filter_join->probe_attributes();
+ const auto &build_attributes = filter_join->build_attributes();
+ for (std::size_t i = 0; i < probe_attributes.size(); ++i) {
+ node_info.labels.emplace_back(
+ probe_attributes[i]->attribute_alias() + " = " +
+ build_attributes[i]->attribute_alias());
+ }
+ break;
+ }
default: {
node_info.labels.emplace_back(input->getName());
break;
@@ -177,7 +195,7 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
if (build_it != build_filters.end()) {
for (const auto &build_info : build_it->second) {
node_info.labels.emplace_back(
- std::string("[LIP build] ") + build_info.build_attribute->attribute_alias());
+ std::string("[LIP build] ") + build_info->build_attribute()->attribute_alias());
}
}
const auto &probe_filters = lip_filter_conf_->getProbeInfoMap();
@@ -185,7 +203,7 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
if (probe_it != probe_filters.end()) {
for (const auto &probe_info : probe_it->second) {
node_info.labels.emplace_back(
- std::string("[LIP probe] ") + probe_info.probe_attribute->attribute_alias());
+ std::string("[LIP probe] ") + probe_info->probe_attribute()->attribute_alias());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
new file mode 100644
index 0000000..6ad0567
--- /dev/null
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ * @{
+ */
+
+/**
+ * @brief A LIP filter that tests the EXACT memberships of elements, i.e. there
+ * will be neither false positives nor false negatives. The implementation
+ * is to simply reinterpret_cast a value's byte stream into the specified
+ * CppType as the underlying bit vector's index. Therefore, to use this
+ * filter, the corresponding LIP attribute's values must be bounded in a
+ * reasonably small integer range.
+ */
+template <typename CppType, bool is_anti_filter>
+class BitVectorExactFilter : public LIPFilter {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param min_value The minimum possible value for this filter to set.
+ * @param max_value The maximum possible value for this filter to set.
+ */
+ explicit BitVectorExactFilter(const std::int64_t min_value,
+ const std::int64_t max_value)
+ : LIPFilter(LIPFilterType::kBitVectorExactFilter),
+ min_value_(static_cast<CppType>(min_value)),
+ max_value_(static_cast<CppType>(max_value)),
+ bit_array_(GetByteSize(max_value - min_value + 1)) {
+ DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
+ DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
+ DCHECK_GE(max_value_, min_value_);
+
+ std::memset(bit_array_.data(),
+ 0x0,
+ sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
+ }
+
+ void insertValueAccessor(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const Type *attr_type) override {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ if (attr_type->isNullable()) {
+ this->insertValueAccessorInternal<true>(accessor, attr_id);
+ } else {
+ this->insertValueAccessorInternal<false>(accessor, attr_id);
+ }
+ });
+ }
+
+ std::size_t filterBatch(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const bool is_attr_nullable,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const override {
+ DCHECK(batch != nullptr);
+ DCHECK_LE(batch_size, batch->size());
+
+ return InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> std::size_t { // NOLINT(build/c++11)
+ if (is_attr_nullable) {
+ return this->filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+ } else {
+ return this->filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+ }
+ });
+ }
+
+ private:
+ /**
+ * @brief Round up bit_size to multiples of 8.
+ */
+ inline static std::size_t GetByteSize(const std::size_t bit_size) {
+ return (bit_size + 7u) / 8u;
+ }
+
+ /**
+ * @brief Iterate through the accessor and hash values into the internal bit
+ * array.
+ */
+ template <bool is_attr_nullable, typename ValueAccessorT>
+ inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+ const attribute_id attr_id) {
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+ if (!is_attr_nullable || value != nullptr) {
+ insert(value);
+ }
+ }
+ }
+
+ /**
+ * @brief Filter the given batch of tuples from a ValueAccessor. Write the
+ * tuple ids which survive in the filtering back to \p batch.
+ */
+ template <bool is_attr_nullable, typename ValueAccessorT>
+ inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+ const attribute_id attr_id,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const {
+ std::size_t out_size = 0;
+ for (std::size_t i = 0; i < batch_size; ++i) {
+ const tuple_id tid = batch->at(i);
+ const void *value =
+ accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+ if (is_attr_nullable && value == nullptr) {
+ continue;
+ }
+ if (contains(value)) {
+ batch->at(out_size) = tid;
+ ++out_size;
+ }
+ }
+ return out_size;
+ }
+
+ /**
+ * @brief Inserts a given value into the exact filter.
+ */
+ inline void insert(const void *key_begin) {
+ const CppType value = *reinterpret_cast<const CppType *>(key_begin);
+ DCHECK_GE(value, min_value_);
+ DCHECK_LE(value, max_value_);
+
+ const CppType loc = value - min_value_;
+ bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Test membership of a given value in the exact filter.
+ */
+ inline bool contains(const void *key_begin) const {
+ const CppType value = *reinterpret_cast<const CppType *>(key_begin);
+ if (value < min_value_ || value > max_value_) {
+ return is_anti_filter;
+ }
+
+ const CppType loc = value - min_value_;
+ const bool is_bit_set =
+ (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+
+ if (is_anti_filter) {
+ return !is_bit_set;
+ } else {
+ return is_bit_set;
+ }
+ }
+
+ const CppType min_value_;
+ const CppType max_value_;
+ alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+ DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index 23b3763..edd0d24 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
LIPFilter.proto)
# Declare micro-libs:
+add_library(quickstep_utility_lipfilter_BitVectorExactFilter ../../empty_src.cpp BitVectorExactFilter.hpp)
add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
@@ -31,6 +32,15 @@ add_library(quickstep_utility_lipfilter_LIPFilter_proto
add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
# Link dependencies:
+target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
+ quickstep_catalog_CatalogTypedefs
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_Macros)
target_link_libraries(quickstep_utility_lipfilter_LIPFilter
quickstep_catalog_CatalogTypedefs
quickstep_storage_StorageBlockInfo
@@ -56,6 +66,7 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
quickstep_utility_lipfilter_LIPFilterBuilder
quickstep_utility_lipfilter_LIPFilter_proto)
target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+ quickstep_utility_lipfilter_BitVectorExactFilter
quickstep_utility_lipfilter_LIPFilter_proto
quickstep_utility_lipfilter_SingleIdentityHashFilter
quickstep_utility_Macros)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index 682d69f..ba38264 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -37,8 +37,8 @@ class ValueAccessor;
*/
enum class LIPFilterType {
+ kBitVectorExactFilter,
kBloomFilter,
- kExactFilter,
kSingleIdentityHashFilter
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
index def13dd..45843f3 100644
--- a/utility/lip_filter/LIPFilter.proto
+++ b/utility/lip_filter/LIPFilter.proto
@@ -22,8 +22,8 @@ package quickstep.serialization;
import "types/Type.proto";
enum LIPFilterType {
- BLOOM_FILTER = 1;
- EXACT_FILTER = 2;
+ BIT_VECTOR_EXACT_FILTER = 1;
+ BLOOM_FILTER = 2;
SINGLE_IDENTITY_HASH_FILTER = 3;
}
@@ -33,17 +33,22 @@ message LIPFilter {
extensions 16 to max;
}
-message SingleIdentityHashFilter {
+message BitVectorExactFilter {
extend LIPFilter {
// All required
- optional uint64 filter_cardinality = 16;
- optional uint64 attribute_size = 17;
+ optional int64 min_value = 16;
+ optional int64 max_value = 17;
+ optional uint64 attribute_size = 18;
+ optional bool is_anti_filter = 19;
}
}
-enum LIPFilterActionType {
- BUILD = 1;
- PROBE = 2;
+message SingleIdentityHashFilter {
+ extend LIPFilter {
+ // All required
+ optional uint64 filter_cardinality = 24;
+ optional uint64 attribute_size = 25;
+ }
}
message LIPFilterDeployment {
@@ -53,6 +58,6 @@ message LIPFilterDeployment {
required Type attribute_type = 3;
}
- required LIPFilterActionType action_type = 1;
- repeated Entry entries = 2;
+ repeated Entry build_entries = 1;
+ repeated Entry probe_entries = 2;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
index cd4d90f..5721496 100644
--- a/utility/lip_filter/LIPFilterDeployment.cpp
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -28,45 +28,49 @@
#include "utility/lip_filter/LIPFilterBuilder.hpp"
#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
-#include "glog/logging.h"
-
namespace quickstep {
LIPFilterDeployment::LIPFilterDeployment(
const serialization::LIPFilterDeployment &proto,
const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
- switch (proto.action_type()) {
- case serialization::LIPFilterActionType::BUILD:
- action_type_ = LIPFilterActionType::kBuild;
- break;
- case serialization::LIPFilterActionType::PROBE:
- action_type_ = LIPFilterActionType::kProbe;
- break;
- default:
- LOG(FATAL) << "Unsupported LIPFilterActionType: "
- << serialization::LIPFilterActionType_Name(proto.action_type());
+ if (proto.build_entries_size() > 0) {
+ build_.reset(new LIPFilterDeploymentInfo());
+ for (int i = 0; i < proto.build_entries_size(); ++i) {
+ const auto &entry_proto = proto.build_entries(i);
+ build_->lip_filters_.emplace_back(
+ lip_filters.at(entry_proto.lip_filter_id()).get());
+ build_->attr_ids_.emplace_back(entry_proto.attribute_id());
+ build_->attr_types_.emplace_back(
+ &TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+ }
}
- for (int i = 0; i < proto.entries_size(); ++i) {
- const auto &entry_proto = proto.entries(i);
- lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
- attr_ids_.emplace_back(entry_proto.attribute_id());
- attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+ if (proto.probe_entries_size() > 0) {
+ probe_.reset(new LIPFilterDeploymentInfo());
+ for (int i = 0; i < proto.probe_entries_size(); ++i) {
+ const auto &entry_proto = proto.probe_entries(i);
+ probe_->lip_filters_.emplace_back(
+ lip_filters.at(entry_proto.lip_filter_id()).get());
+ probe_->attr_ids_.emplace_back(entry_proto.attribute_id());
+ probe_->attr_types_.emplace_back(
+ &TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+ }
}
}
bool LIPFilterDeployment::ProtoIsValid(
const serialization::LIPFilterDeployment &proto) {
- if (proto.action_type() != serialization::LIPFilterActionType::BUILD &&
- proto.action_type() != serialization::LIPFilterActionType::PROBE) {
- LOG(FATAL) << "Unsupported LIPFilterActionType: "
- << serialization::LIPFilterActionType_Name(proto.action_type());
- }
- if (proto.entries_size() == 0) {
+ if (proto.build_entries_size() == 0 && proto.probe_entries_size() == 0) {
return false;
}
- for (int i = 0; i < proto.entries_size(); ++i) {
- const auto &entry_proto = proto.entries(i);
+ for (int i = 0; i < proto.build_entries_size(); ++i) {
+ const auto &entry_proto = proto.build_entries(i);
+ if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) {
+ return false;
+ }
+ }
+ for (int i = 0; i < proto.probe_entries_size(); ++i) {
+ const auto &entry_proto = proto.probe_entries(i);
if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) {
return false;
}
@@ -75,13 +79,23 @@ bool LIPFilterDeployment::ProtoIsValid(
}
LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
- DCHECK(action_type_ == LIPFilterActionType::kBuild);
- return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+ if (build_ == nullptr) {
+ return nullptr;
+ }
+
+ return new LIPFilterBuilder(build_->lip_filters_,
+ build_->attr_ids_,
+ build_->attr_types_);
}
LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
- DCHECK(action_type_ == LIPFilterActionType::kProbe);
- return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+ if (probe_ == nullptr) {
+ return nullptr;
+ }
+
+ return new LIPFilterAdaptiveProber(probe_->lip_filters_,
+ probe_->attr_ids_,
+ probe_->attr_types_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
index 9b37f88..ab1259b 100644
--- a/utility/lip_filter/LIPFilterDeployment.hpp
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -39,11 +39,6 @@ class Type;
* @{
*/
-enum class LIPFilterActionType {
- kBuild = 0,
- kProbe
-};
-
/**
* @brief Helper class for organizing a group of LIPFilters in the backend.
* Each LIPFilterDeployment object is attached to a RelationalOperator.
@@ -69,16 +64,6 @@ class LIPFilterDeployment {
static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
/**
- * @brief Get the action type for this group of LIPFilters (i.e. whether
- * to build or probe the filters).
- *
- * @return The action type.
- */
- LIPFilterActionType getActionType() const {
- return action_type_;
- }
-
- /**
* @brief Create a LIPFilterBuilder for this group of LIPFilters.
*
* @return A new LIPFilterBuilder object for this group of LIPFilters.
@@ -95,11 +80,14 @@ class LIPFilterDeployment {
LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
private:
- LIPFilterActionType action_type_;
-
- std::vector<LIPFilter *> lip_filters_;
- std::vector<attribute_id> attr_ids_;
- std::vector<const Type *> attr_types_;
+ struct LIPFilterDeploymentInfo {
+ std::vector<LIPFilter *> lip_filters_;
+ std::vector<attribute_id> attr_ids_;
+ std::vector<const Type *> attr_types_;
+ };
+
+ std::unique_ptr<LIPFilterDeploymentInfo> build_;
+ std::unique_ptr<LIPFilterDeploymentInfo> probe_;
DISALLOW_COPY_AND_ASSIGN(LIPFilterDeployment);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
index ebc4a0e..f69d8b0 100644
--- a/utility/lip_filter/LIPFilterFactory.cpp
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -23,6 +23,7 @@
#include <cstdint>
#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/BitVectorExactFilter.hpp"
#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
#include "glog/logging.h"
@@ -31,6 +32,46 @@ namespace quickstep {
LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) {
switch (proto.lip_filter_type()) {
+ case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: {
+ const std::size_t attr_size =
+ proto.GetExtension(serialization::BitVectorExactFilter::attribute_size);
+ const std::int64_t min_value =
+ proto.GetExtension(serialization::BitVectorExactFilter::min_value);
+ const std::int64_t max_value =
+ proto.GetExtension(serialization::BitVectorExactFilter::max_value);
+ const bool is_anti_filter =
+ proto.GetExtension(serialization::BitVectorExactFilter::is_anti_filter);
+
+ switch (attr_size) {
+ case 1:
+ if (is_anti_filter) {
+ return new BitVectorExactFilter<std::int8_t, true>(min_value, max_value);
+ } else {
+ return new BitVectorExactFilter<std::int8_t, false>(min_value, max_value);
+ }
+ case 2:
+ if (is_anti_filter) {
+ return new BitVectorExactFilter<std::int16_t, true>(min_value, max_value);
+ } else {
+ return new BitVectorExactFilter<std::int16_t, false>(min_value, max_value);
+ }
+ case 4:
+ if (is_anti_filter) {
+ return new BitVectorExactFilter<std::int32_t, true>(min_value, max_value);
+ } else {
+ return new BitVectorExactFilter<std::int32_t, false>(min_value, max_value);
+ }
+ case 8:
+ if (is_anti_filter) {
+ return new BitVectorExactFilter<std::int64_t, true>(min_value, max_value);
+ } else {
+ return new BitVectorExactFilter<std::int64_t, false>(min_value, max_value);
+ }
+ default:
+ LOG(FATAL) << "Invalid attribute size for BitVectorExactFilter: "
+ << attr_size;
+ }
+ }
case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
const std::size_t attr_size =
proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
@@ -57,6 +98,15 @@ LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter
bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
switch (proto.lip_filter_type()) {
+ case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: {
+ const std::size_t attr_size =
+ proto.GetExtension(serialization::BitVectorExactFilter::attribute_size);
+ const std::int64_t min_value =
+ proto.GetExtension(serialization::BitVectorExactFilter::min_value);
+ const std::int64_t max_value =
+ proto.GetExtension(serialization::BitVectorExactFilter::max_value);
+ return (attr_size != 0 && max_value >= min_value);
+ }
case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
const std::size_t attr_size =
proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);