You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/08 18:08:42 UTC
[1/6] incubator-quickstep git commit: Avoid crash due to an error in
deleting a file in HDFS. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/aggregate-on-left-outer-join d79978ad4 -> 1db1e0887 (forced update)
Avoid crash due to an error in deleting a file in HDFS.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9a95c23b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9a95c23b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9a95c23b
Branch: refs/heads/aggregate-on-left-outer-join
Commit: 9a95c23bdfebe877288f325b3844b3a01c77c08a
Parents: f46ae15
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Feb 7 23:23:50 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Feb 7 23:23:50 2017 -0800
----------------------------------------------------------------------
storage/FileManagerHdfs.cpp | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a95c23b/storage/FileManagerHdfs.cpp
----------------------------------------------------------------------
diff --git a/storage/FileManagerHdfs.cpp b/storage/FileManagerHdfs.cpp
index 937750a..3a5167c 100644
--- a/storage/FileManagerHdfs.cpp
+++ b/storage/FileManagerHdfs.cpp
@@ -117,12 +117,17 @@ size_t FileManagerHdfs::numSlots(const block_id block) const {
bool FileManagerHdfs::deleteBlockOrBlob(const block_id block) {
const string filename(blockFilename(block));
- if ((hdfsDelete(hdfs_, filename.c_str(), 0) == 0) || (errno == ENOENT)) {
- return true;
- } else {
- LOG(ERROR) << "Failed to delete file " << filename << " with error: " << strerror(errno);
- return false;
+ if (hdfsDelete(hdfs_, filename.c_str(), 0)) {
+ switch (errno) {
+ case EINPROGRESS:
+ case ENOENT:
+ break;
+ default:
+ LOG(ERROR) << "Failed to delete file " << filename << " with error: " << strerror(errno);
+ }
}
+
+ return true;
}
bool FileManagerHdfs::readBlockOrBlob(const block_id block,
[2/6] incubator-quickstep git commit: Fixed a memory leak in the
distributed version.
Posted by ji...@apache.org.
Fixed a memory leak in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3b65b0fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3b65b0fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3b65b0fd
Branch: refs/heads/aggregate-on-left-outer-join
Commit: 3b65b0fdf40efa47862323834917cc19f2478ba3
Parents: 9a95c23
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 00:35:43 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 00:35:52 2017 -0800
----------------------------------------------------------------------
query_execution/PolicyEnforcerDistributed.cpp | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b65b0fd/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 38b8a34..49a1d9a 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -240,6 +240,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kQueryTeardownMessage);
+ free(proto_bytes);
DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
<< "') to all Shiftbosses";
[3/6] incubator-quickstep git commit: Removed the temp query result
relation in the distributed version.
Posted by ji...@apache.org.
Removed the temp query result relation in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/aa7f6fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/aa7f6fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/aa7f6fe4
Branch: refs/heads/aggregate-on-left-outer-join
Commit: aa7f6fe4e07804524aca0f1574935ae3f73c985d
Parents: dd8747f
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 00:33:31 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 00:35:53 2017 -0800
----------------------------------------------------------------------
cli/distributed/CMakeLists.txt | 1 +
cli/distributed/Cli.cpp | 20 +++++++++++++++++++-
cli/distributed/Conductor.cpp | 13 ++++++++++++-
cli/distributed/Conductor.hpp | 4 ++++
query_execution/QueryExecutionMessages.proto | 4 ++++
query_execution/QueryExecutionTypedefs.hpp | 2 ++
6 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index b46082f..5804321 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
# Link dependencies:
target_link_libraries(quickstep_cli_distributed_Conductor
glog
+ quickstep_catalog_CatalogDatabase
quickstep_cli_DefaultsConfigurator
quickstep_cli_Flags
quickstep_cli_distributed_Role
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 5af70e6..386654d 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -122,7 +122,10 @@ void Cli::init() {
// Prepare for submitting a query.
bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
+
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+ bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
+
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
}
@@ -191,7 +194,7 @@ void Cli::run() {
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
if (proto.has_result_relation()) {
- CatalogRelation result_relation(proto.result_relation());
+ const CatalogRelation result_relation(proto.result_relation());
PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
@@ -199,6 +202,21 @@ void Cli::run() {
for (const block_id block : blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
+
+ // Notify Conductor to remove the temp query result relation in the Catalog.
+ S::QueryResultTeardownMessage proto_response;
+ proto_response.set_relation_id(result_relation.getID());
+
+ const size_t proto_response_length = proto_response.ByteSize();
+ char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+ CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
+
+ TaggedMessage response_message(static_cast<const void*>(proto_response_bytes),
+ proto_response_length,
+ kQueryResultTeardownMessage);
+ free(proto_response_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(response_message));
}
std::chrono::duration<double, std::milli> time_in_ms = end - start;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 13d4d57..cf2eb4b 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -28,6 +28,7 @@
#include <string>
#include <utility>
+#include "catalog/CatalogDatabase.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
#include "parser/ParseStatement.hpp"
@@ -73,6 +74,7 @@ void Conductor::init() {
}
query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
+ catalog_database_ = query_processor_->getDefaultDatabase();
} catch (const std::exception &e) {
LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
<< "\nIf you intended to create a new database, "
@@ -93,12 +95,14 @@ void Conductor::init() {
bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
+
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
foreman_ = make_unique<ForemanDistributed>(*block_locator_,
std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
- query_processor_->getDefaultDatabase());
+ catalog_database_);
foreman_->start();
}
@@ -129,6 +133,13 @@ void Conductor::run() {
processSqlQueryMessage(sender, new string(move(proto.sql_query())));
break;
}
+ case kQueryResultTeardownMessage: {
+ S::QueryResultTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ catalog_database_->dropRelationById(proto.relation_id());
+ break;
+ }
default:
LOG(FATAL) << "Unknown TMB message type";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index e8c9582..09bf2b9 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -34,6 +34,8 @@
namespace quickstep {
+class CatalogDatabase;
+
/** \addtogroup CliDistributed
* @{
*/
@@ -63,6 +65,8 @@ class Conductor final : public Role {
SqlParserWrapper parser_wrapper_;
std::unique_ptr<QueryProcessor> query_processor_;
+ // Not owned.
+ CatalogDatabase *catalog_database_;
tmb::client_id conductor_client_id_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 115a9a3..68f286d 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -138,6 +138,10 @@ message QueryExecutionSuccessMessage {
optional CatalogRelationSchema result_relation = 1;
}
+message QueryResultTeardownMessage {
+ required int32 relation_id = 1;
+}
+
message QueryExecutionErrorMessage {
required string error_message = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9f78302..994bd60 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -105,6 +105,8 @@ enum QueryExecutionMessageType : message_type_id {
kQueryExecutionSuccessMessage,
kQueryExecutionErrorMessage,
+ kQueryResultTeardownMessage, // From CLI to Conductor.
+
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
[4/6] incubator-quickstep git commit: Revert "A workaround to remove
query result relation in the distributed version."
Posted by ji...@apache.org.
Revert "A workaround to remove query result relation in the distributed version."
This reverts commit aef1c3586580cfa72eb031fafe08700f6d5d9a86.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/dd8747fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/dd8747fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/dd8747fd
Branch: refs/heads/aggregate-on-left-outer-join
Commit: dd8747fd7aba95ce2f0f325297e108bbac5d958d
Parents: 3b65b0f
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 00:02:27 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 00:35:53 2017 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dd8747fd/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index e6f22ec..4d95f16 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -201,12 +201,8 @@ void ForemanDistributed::run() {
// TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) {
- const relation_id result_relation_id = proto.relation_id();
- processSaveQueryResultResponseMessage(proto.cli_id(), result_relation_id);
+ processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
query_result_saved_shiftbosses_.erase(query_id);
-
- // TODO(zuyu): Refactor to clean-up blocks in Shiftbosses.
- catalog_database_->dropRelationById(result_relation_id);
}
break;
}
[5/6] incubator-quickstep git commit: Style fixes for
TextScanOperator.
Posted by ji...@apache.org.
Style fixes for TextScanOperator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a9fe07d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a9fe07d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a9fe07d5
Branch: refs/heads/aggregate-on-left-outer-join
Commit: a9fe07d5b0885d08b4aac328aa7deea81d94bda7
Parents: aa7f6fe
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 01:57:23 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 01:57:23 2017 -0800
----------------------------------------------------------------------
relational_operators/TextScanOperator.cpp | 74 +++++++++++++-------------
relational_operators/TextScanOperator.hpp | 6 +--
2 files changed, 39 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9fe07d5/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 6650319..0a83a85 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -110,8 +110,6 @@ bool TextScanOperator::getAllWorkOrders(
if (blocking_dependencies_met_ && !work_generated_) {
for (const std::string &file : files) {
- // Use standard C libary to retrieve the file size.
-
#ifdef QUICKSTEP_HAVE_UNISTD
// Check file permissions before trying to open it.
const int access_result = access(file.c_str(), R_OK);
@@ -255,11 +253,11 @@ void TextScanWorkOrder::execute() {
} else {
vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty);
if (is_faulty) {
- // Skip faulty rows
- LOG(INFO) << "Faulty row found. Hence switching to next row.";
+ // Skip faulty rows
+ LOG(INFO) << "Faulty row found. Hence switching to next row.";
} else {
- // Convert vector returned to tuple only when a valid row is encountered.
- tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
+ // Convert vector returned to tuple only when a valid row is encountered.
+ tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
}
}
}
@@ -297,11 +295,11 @@ void TextScanWorkOrder::execute() {
vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty);
if (is_faulty) {
- // Skip the faulty row.
- LOG(INFO) << "Faulty row found. Hence switching to next row.";
+ // Skip the faulty row.
+ LOG(INFO) << "Faulty row found. Hence switching to next row.";
} else {
- // Convert vector returned to tuple only when a valid row is encountered.
- tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
+ // Convert vector returned to tuple only when a valid row is encountered.
+ tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
}
}
@@ -346,11 +344,11 @@ std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
std::string value_str;
for (const auto &attr : relation) {
if (has_reached_end_of_line) {
- // Do not abort if one of the row is faulty.
- // Set is_faulty to true and SKIP the current row.
- *is_faulty = true;
- LOG(INFO) << "Row has too few fields.";
- return attribute_values;
+ // Do not abort if one of the row is faulty.
+ // Set is_faulty to true and SKIP the current row.
+ *is_faulty = true;
+ LOG(INFO) << "Row has too few fields.";
+ return attribute_values;
}
value_str.clear();
@@ -363,46 +361,46 @@ std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
if (is_null_literal) {
// NULL literal.
if (!attr.getType().isNullable()) {
- *is_faulty = true;
- LOG(INFO) << "NULL literal '\\N' was specified for a column with a "
- "non-nullable Type.";
- skipFaultyRow(row_ptr);
- return attribute_values;
+ *is_faulty = true;
+ LOG(INFO) << "NULL literal '\\N' was specified for a column with a "
+ "non-nullable Type.";
+ skipFaultyRow(row_ptr);
+ return attribute_values;
}
attribute_values.emplace_back(attr.getType().makeNullValue());
} else {
attribute_values.emplace_back();
if (!attr.getType().parseValueFromString(value_str, &(attribute_values.back()))) {
- // Do not abort if one of the row is faulty.
- *is_faulty = true;
- LOG(INFO) << "Failed to parse value.";
- skipFaultyRow(row_ptr);
- return attribute_values;
+ // Do not abort if one of the row is faulty.
+ *is_faulty = true;
+ LOG(INFO) << "Failed to parse value.";
+ skipFaultyRow(row_ptr);
+ return attribute_values;
}
}
}
if (!has_reached_end_of_line) {
- // Do not abort if one of the row is faulty.
- // Set is_faulty to true and SKIP the current row.
- *is_faulty = true;
- LOG(INFO) << "Row has too many fields.";
- skipFaultyRow(row_ptr);
+ // Do not abort if one of the row is faulty.
+ // Set is_faulty to true and SKIP the current row.
+ *is_faulty = true;
+ LOG(INFO) << "Row has too many fields.";
+ skipFaultyRow(row_ptr);
}
return attribute_values;
}
void TextScanWorkOrder::skipFaultyRow(const char **field_ptr) const {
- const char *cur_ptr = *field_ptr;
- // Move row pointer to the end of faulty row.
- for (;; ++cur_ptr) {
- const char c = *cur_ptr;
- if (c == '\n') {
- break;
- }
+ const char *cur_ptr = *field_ptr;
+ // Move row pointer to the end of faulty row.
+ for (;; ++cur_ptr) {
+ const char c = *cur_ptr;
+ if (c == '\n') {
+ break;
}
- *field_ptr = cur_ptr + 1;
+ }
+ *field_ptr = cur_ptr + 1;
}
void TextScanWorkOrder::extractFieldString(const char **field_ptr,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9fe07d5/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 65863b3..eada190 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -264,9 +264,9 @@ class TextScanWorkOrder : public WorkOrder {
* @param is_faulty OUTPUT parameter. Set to true if the row is faulty,
* @return The tuple parsed from the char stream.
*/
-std::vector<TypedValue> parseRow(const char **row_ptr,
- const CatalogRelationSchema &relation,
- bool *is_faulty) const;
+ std::vector<TypedValue> parseRow(const char **row_ptr,
+ const CatalogRelationSchema &relation,
+ bool *is_faulty) const;
/**
* @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
[6/6] incubator-quickstep git commit: Fuse Aggregate with
LeftOuterJoin to accelerate evaluation.
Posted by ji...@apache.org.
Fuse Aggregate with LeftOuterJoin to accelerate evaluation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1db1e088
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1db1e088
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1db1e088
Branch: refs/heads/aggregate-on-left-outer-join
Commit: 1db1e08878bccaa87934741569aba5c6fff137e8
Parents: a9fe07d
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Feb 8 12:09:06 2017 -0600
----------------------------------------------------------------------
query_optimizer/CMakeLists.txt | 3 +
query_optimizer/ExecutionGenerator.cpp | 148 +++++++++++
query_optimizer/ExecutionGenerator.hpp | 4 +
query_optimizer/PhysicalGenerator.cpp | 3 +
query_optimizer/cost_model/CMakeLists.txt | 2 +
query_optimizer/cost_model/SimpleCostModel.cpp | 9 +
query_optimizer/cost_model/SimpleCostModel.hpp | 5 +
.../cost_model/StarSchemaSimpleCostModel.cpp | 37 ++-
.../cost_model/StarSchemaSimpleCostModel.hpp | 4 +
query_optimizer/physical/CMakeLists.txt | 14 ++
.../CrossReferenceCoalesceAggregate.cpp | 105 ++++++++
.../CrossReferenceCoalesceAggregate.hpp | 176 +++++++++++++
query_optimizer/physical/PatternMatcher.hpp | 3 +
query_optimizer/physical/PhysicalType.hpp | 1 +
query_optimizer/rules/BottomUpRule.hpp | 39 +--
query_optimizer/rules/CMakeLists.txt | 23 ++
query_optimizer/rules/FuseAggregateJoin.cpp | 249 +++++++++++++++++++
query_optimizer/rules/FuseAggregateJoin.hpp | 73 ++++++
.../BuildAggregationExistenceMapOperator.cpp | 148 +++++++++++
.../BuildAggregationExistenceMapOperator.hpp | 146 +++++++++++
relational_operators/CMakeLists.txt | 25 ++
storage/AggregationOperationState.cpp | 13 +-
storage/AggregationOperationState.hpp | 3 +
storage/CollisionFreeVectorTable.hpp | 4 +
utility/lip_filter/BitVectorExactFilter.hpp | 26 +-
utility/lip_filter/CMakeLists.txt | 10 +-
utility/lip_filter/SingleIdentityHashFilter.hpp | 20 +-
27 files changed, 1239 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index e750a1e..90bc19f 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -95,6 +95,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_queryoptimizer_physical_CopyFrom
quickstep_queryoptimizer_physical_CreateIndex
quickstep_queryoptimizer_physical_CreateTable
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_DeleteTuples
quickstep_queryoptimizer_physical_DropTable
quickstep_queryoptimizer_physical_FilterJoin
@@ -116,6 +117,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_queryoptimizer_physical_UpdateTable
quickstep_queryoptimizer_physical_WindowAggregate
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_BuildAggregationExistenceMapOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_CreateIndexOperator
@@ -213,6 +215,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
quickstep_queryoptimizer_logical_Logical
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_rules_AttachLIPFilters
+ quickstep_queryoptimizer_rules_FuseAggregateJoin
quickstep_queryoptimizer_rules_InjectJoinFilters
quickstep_queryoptimizer_rules_PruneColumns
quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 1b50caa..5b00eef 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -72,9 +72,11 @@
#include "query_optimizer/expressions/Scalar.hpp"
#include "query_optimizer/expressions/ScalarLiteral.hpp"
#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
#include "query_optimizer/physical/CopyFrom.hpp"
#include "query_optimizer/physical/CreateIndex.hpp"
#include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/DeleteTuples.hpp"
#include "query_optimizer/physical/DropTable.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
@@ -96,6 +98,7 @@
#include "query_optimizer/physical/UpdateTable.hpp"
#include "query_optimizer/physical/WindowAggregate.hpp"
#include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
#include "relational_operators/BuildHashOperator.hpp"
#include "relational_operators/BuildLIPFilterOperator.hpp"
#include "relational_operators/CreateIndexOperator.hpp"
@@ -266,6 +269,9 @@ void ExecutionGenerator::generatePlanInternal(
case P::PhysicalType::kAggregate:
return convertAggregate(
std::static_pointer_cast<const P::Aggregate>(physical_plan));
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+ return convertCrossReferenceCoalesceAggregate(
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
case P::PhysicalType::kCopyFrom:
return convertCopyFrom(
std::static_pointer_cast<const P::CopyFrom>(physical_plan));
@@ -1730,6 +1736,148 @@ void ExecutionGenerator::convertAggregate(
}
}
+void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
+ const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+ DCHECK_EQ(1u, physical_plan->left_join_attributes().size());
+ DCHECK_EQ(1u, physical_plan->right_join_attributes().size());
+
+ const CatalogRelationInfo *left_relation_info =
+ findRelationInfoOutputByPhysical(physical_plan->left_child());
+ const CatalogRelationInfo *right_relation_info =
+ findRelationInfoOutputByPhysical(physical_plan->right_child());
+
+ // Create aggr state proto.
+ const QueryContext::aggregation_state_id aggr_state_index =
+ query_context_proto_->aggregation_states_size();
+ S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+
+ aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+
+ // Group by the right join attribute.
+ std::unique_ptr<const Scalar> execution_group_by_expression(
+ physical_plan->right_join_attributes().front()->concretize(
+ attribute_substitution_map_));
+ aggr_state_proto->add_group_by_expressions()->CopyFrom(
+ execution_group_by_expression->getProto());
+
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+ aggr_state_proto->set_estimated_num_entries(
+ physical_plan->hash_table_num_entries());
+
+ if (physical_plan->right_filter_predicate() != nullptr) {
+ std::unique_ptr<const Predicate> predicate(
+ convertPredicate(physical_plan->right_filter_predicate()));
+ aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+ }
+
+ for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
+ const E::AggregateFunctionPtr unnamed_aggregate_expression =
+ std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+ // Add a new entry in 'aggregates'.
+ S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+ // Set the AggregateFunction.
+ aggr_proto->mutable_function()->CopyFrom(
+ unnamed_aggregate_expression->getAggregate().getProto());
+
+ // Add each of the aggregate's arguments.
+ for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+ unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+ aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+ }
+
+ // Set whether it is a DISTINCT aggregation.
+ DCHECK(!unnamed_aggregate_expression->is_distinct());
+ aggr_proto->set_is_distinct(false);
+ }
+
+ const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new InitializeAggregationOperator(
+ query_handle_->query_id(),
+ aggr_state_index));
+
+ const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
+ execution_plan_->addRelationalOperator(
+ new BuildAggregationExistenceMapOperator(
+ query_handle_->query_id(),
+ *left_relation_info->relation,
+ physical_plan->left_join_attributes().front()->id(),
+ left_relation_info->isStoredRelation(),
+ aggr_state_index));
+
+ if (!left_relation_info->isStoredRelation()) {
+ execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+ left_relation_info->producer_operator_index,
+ false /* is_pipeline_breaker */);
+ }
+
+ const QueryPlan::DAGNodeIndex aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new AggregationOperator(
+ query_handle_->query_id(),
+ *right_relation_info->relation,
+ right_relation_info->isStoredRelation(),
+ aggr_state_index));
+
+ if (!right_relation_info->isStoredRelation()) {
+ execution_plan_->addDirectDependency(aggregation_operator_index,
+ right_relation_info->producer_operator_index,
+ false /* is_pipeline_breaker */);
+ }
+
+ // Build aggregation existence map once initialization is done.
+ execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+ initialize_aggregation_operator_index,
+ true /* is_pipeline_breaker */);
+
+ // Start aggregation after building existence map.
+ execution_plan_->addDirectDependency(aggregation_operator_index,
+ build_aggregation_existence_map_operator_index,
+ true /* is_pipeline_breaker */);
+
+
+ // Create InsertDestination proto.
+ const CatalogRelation *output_relation = nullptr;
+ const QueryContext::insert_destination_id insert_destination_index =
+ query_context_proto_->insert_destinations_size();
+ S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ createTemporaryCatalogRelation(physical_plan,
+ &output_relation,
+ insert_destination_proto);
+
+ const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new FinalizeAggregationOperator(query_handle_->query_id(),
+ aggr_state_index,
+ *output_relation,
+ insert_destination_index));
+
+ insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
+
+ execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
+ aggregation_operator_index,
+ true /* is_pipeline_breaker */);
+
+ physical_to_output_relation_map_.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(physical_plan),
+ std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+ temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
+ output_relation);
+
+ const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+ execution_plan_->addRelationalOperator(
+ new DestroyAggregationStateOperator(query_handle_->query_id(),
+ aggr_state_index));
+
+ execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+ finalize_aggregation_operator_index,
+ true);
+}
+
void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
// Create sort configuration for run generation.
vector<bool> sort_ordering(physical_sort->sort_ascending());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 987f11a..db9664e 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -46,6 +46,7 @@
#include "query_optimizer/physical/CopyFrom.hpp"
#include "query_optimizer/physical/CreateIndex.hpp"
#include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/DeleteTuples.hpp"
#include "query_optimizer/physical/DropTable.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
@@ -356,6 +357,9 @@ class ExecutionGenerator {
*/
void convertAggregate(const physical::AggregatePtr &physical_plan);
+ void convertCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
/**
* @brief Converts a physical Sort to SortRunGeneration and SortMergeRun.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 1b68f49..ac51c31 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
#include "query_optimizer/logical/Logical.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
#include "query_optimizer/rules/InjectJoinFilters.hpp"
#include "query_optimizer/rules/PruneColumns.hpp"
#include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
@@ -145,6 +146,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
rules.emplace_back(new ReorderColumns());
}
+ rules.emplace_back(new FuseAggregateJoin());
+
// NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
// extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
// suggested that all the new rules be placed before this point.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 5f28bb3..035a1ac 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
quickstep_catalog_CatalogRelationStatistics
quickstep_queryoptimizer_costmodel_CostModel
quickstep_queryoptimizer_physical_Aggregate
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_FilterJoin
quickstep_queryoptimizer_physical_HashJoin
quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -62,6 +63,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
quickstep_queryoptimizer_expressions_PatternMatcher
quickstep_queryoptimizer_expressions_Predicate
quickstep_queryoptimizer_physical_Aggregate
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_FilterJoin
quickstep_queryoptimizer_physical_HashJoin
quickstep_queryoptimizer_physical_NestedLoopsJoin
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index e9d2e3a..cfd8a75 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -26,6 +26,7 @@
#include "catalog/CatalogRelationStatistics.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -74,6 +75,9 @@ std::size_t SimpleCostModel::estimateCardinality(
case P::PhysicalType::kAggregate:
return estimateCardinalityForAggregate(
std::static_pointer_cast<const P::Aggregate>(physical_plan));
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+ return estimateCardinalityForCrossReferenceCoalesceAggregate(
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
case P::PhysicalType::kSharedSubplanReference: {
const P::SharedSubplanReferencePtr shared_subplan_reference =
std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -149,6 +153,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
estimateCardinality(physical_plan->input()) / 10);
}
+std::size_t SimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+ return estimateCardinality(physical_plan->left_child());
+}
+
std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
const physical::WindowAggregatePtr &physical_plan) {
return estimateCardinality(physical_plan->input());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 4edc2fe..0660c37 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -25,6 +25,7 @@
#include "query_optimizer/cost_model/CostModel.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -100,6 +101,10 @@ class SimpleCostModel : public CostModel {
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
+ // Returns the cardinality of the left child plan.
+ std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
// Return the estimated cardinality of the input plan.
std::size_t estimateCardinalityForWindowAggregate(
const physical::WindowAggregatePtr &physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 7afa1c3..82e01c2 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -37,6 +37,7 @@
#include "query_optimizer/expressions/Predicate.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -88,6 +89,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
case P::PhysicalType::kAggregate:
return estimateCardinalityForAggregate(
std::static_pointer_cast<const P::Aggregate>(physical_plan));
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+ return estimateCardinalityForCrossReferenceCoalesceAggregate(
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
case P::PhysicalType::kSharedSubplanReference: {
const P::SharedSubplanReferencePtr shared_subplan_reference =
std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -175,6 +179,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
estimateNumGroupsForAggregate(physical_plan) * filter_selectivity);
}
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+ return estimateCardinality(physical_plan->left_child());
+}
+
std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
const P::WindowAggregatePtr &physical_plan) {
return estimateCardinality(physical_plan->input());
@@ -233,6 +242,13 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
}
break;
}
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+ const P::PhysicalPtr left_child = physical_plan->children()[0];
+ if (E::ContainsExprId(left_child->getOutputAttributes(), attribute_id)) {
+ return estimateNumDistinctValues(attribute_id, left_child);
+ }
+ break;
+ }
case P::PhysicalType::kFilterJoin: {
const P::FilterJoinPtr &filter_join =
std::static_pointer_cast<const P::FilterJoin>(physical_plan);
@@ -275,6 +291,17 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
double StarSchemaSimpleCostModel::estimateSelectivity(
const physical::PhysicalPtr &physical_plan) {
switch (physical_plan->getPhysicalType()) {
+ case P::PhysicalType::kAggregate: {
+ const P::AggregatePtr &aggregate =
+ std::static_pointer_cast<const P::Aggregate>(physical_plan);
+ return estimateSelectivity(aggregate->input()) *
+ estimateSelectivityForFilterPredicate(aggregate);
+ }
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+ const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+ return estimateSelectivity(aggregate_on_left_outer_join->left_child());
+ }
case P::PhysicalType::kSelection: {
const P::SelectionPtr &selection =
std::static_pointer_cast<const P::Selection>(physical_plan);
@@ -331,6 +358,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
const physical::PhysicalPtr &physical_plan) {
+ P::PhysicalPtr target_plan = physical_plan;
E::PredicatePtr filter_predicate = nullptr;
switch (physical_plan->getPhysicalType()) {
case P::PhysicalType::kSelection:
@@ -340,6 +368,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
case P::PhysicalType::kAggregate:
filter_predicate =
std::static_pointer_cast<const P::Aggregate>(physical_plan)->filter_predicate();
+ target_plan = physical_plan->children()[0];
break;
case P::PhysicalType::kHashJoin:
filter_predicate =
@@ -356,7 +385,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
if (filter_predicate == nullptr) {
return 1.0;
} else {
- return estimateSelectivityForPredicate(filter_predicate, physical_plan);
+ return estimateSelectivityForPredicate(filter_predicate, target_plan);
}
}
@@ -443,6 +472,12 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
std::static_pointer_cast<const P::Aggregate>(physical_plan);
return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
}
+ case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+ const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+ std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+ return E::SubsetOfExpressions(
+ aggregate_on_left_outer_join->left_join_attributes(), attributes);
+ }
case P::PhysicalType::kHashJoin: {
const P::HashJoinPtr &hash_join =
std::static_pointer_cast<const P::HashJoin>(physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index cbe18f4..99795a3 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -29,6 +29,7 @@
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
@@ -170,6 +171,9 @@ class StarSchemaSimpleCostModel : public CostModel {
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
+ std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+ const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
std::size_t estimateCardinalityForFilterJoin(
const physical::FilterJoinPtr &physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index f68ed39..77ae75e 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -21,6 +21,9 @@ add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJo
add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
+add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+ CrossReferenceCoalesceAggregate.cpp
+ CrossReferenceCoalesceAggregate.hpp)
add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp)
add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp)
@@ -95,6 +98,16 @@ target_link_libraries(quickstep_queryoptimizer_physical_CreateTable
quickstep_queryoptimizer_physical_PhysicalType
quickstep_utility_Cast
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+ quickstep_queryoptimizer_OptimizerTree
+ quickstep_queryoptimizer_expressions_Alias
+ quickstep_queryoptimizer_expressions_AttributeReference
+ quickstep_queryoptimizer_expressions_ExpressionUtil
+ quickstep_queryoptimizer_expressions_Predicate
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_physical_PhysicalType
+ quickstep_utility_Cast
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_physical_DeleteTuples
glog
quickstep_catalog_CatalogRelation
@@ -293,6 +306,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
quickstep_queryoptimizer_physical_CopyFrom
quickstep_queryoptimizer_physical_CreateIndex
quickstep_queryoptimizer_physical_CreateTable
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
quickstep_queryoptimizer_physical_DeleteTuples
quickstep_queryoptimizer_physical_DropTable
quickstep_queryoptimizer_physical_FilterJoin
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
new file mode 100644
index 0000000..16700c0
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "utility/Cast.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+ ::getOutputAttributes() const {
+ std::vector<E::AttributeReferencePtr> output_attributes(left_join_attributes_);
+ for (const auto &aggregate_expr : aggregate_expressions_) {
+ output_attributes.emplace_back(E::ToRef(aggregate_expr));
+ }
+ return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+ ::getReferencedAttributes() const {
+ std::unordered_set<E::AttributeReferencePtr> referenced_attributes;
+
+ referenced_attributes.insert(left_join_attributes_.begin(),
+ left_join_attributes_.end());
+ referenced_attributes.insert(right_join_attributes_.begin(),
+ right_join_attributes_.end());
+
+ if (right_filter_predicate_ != nullptr) {
+ const std::vector<E::AttributeReferencePtr> attrs_in_predicate =
+ right_filter_predicate_->getReferencedAttributes();
+ referenced_attributes.insert(attrs_in_predicate.begin(),
+ attrs_in_predicate.end());
+ }
+
+ for (const auto &aggregate_expr : aggregate_expressions_) {
+ const std::vector<E::AttributeReferencePtr> attrs_in_expr =
+ aggregate_expr->getReferencedAttributes();
+ referenced_attributes.insert(attrs_in_expr.begin(), attrs_in_expr.end());
+ }
+
+ return std::vector<E::AttributeReferencePtr>(
+ referenced_attributes.begin(), referenced_attributes.end());
+}
+
+void CrossReferenceCoalesceAggregate::getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+ inline_field_names->push_back("hash_table_num_entries");
+ inline_field_values->push_back(std::to_string(hash_table_num_entries_));
+
+ non_container_child_field_names->push_back("left_child");
+ non_container_child_fields->push_back(left_child_);
+ non_container_child_field_names->push_back("right_child");
+ non_container_child_fields->push_back(right_child_);
+
+ container_child_field_names->push_back("left_join_attributes");
+ container_child_fields->push_back(
+ CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
+ container_child_field_names->push_back("right_join_attributes");
+ container_child_fields->push_back(
+ CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_));
+
+ if (right_filter_predicate_ != nullptr) {
+ non_container_child_field_names->push_back("right_filter_predicate");
+ non_container_child_fields->push_back(right_filter_predicate_);
+ }
+ container_child_field_names->push_back("aggregate_expressions");
+ container_child_fields->push_back(
+ CastSharedPtrVector<OptimizerTreeBase>(aggregate_expressions_));
+}
+
+} // namespace physical
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
new file mode 100644
index 0000000..9fb02e2
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerLogical
+ * @{
+ */
+
+class CrossReferenceCoalesceAggregate;
+typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
+
+/**
+ * @brief TODO.
+ */
+class CrossReferenceCoalesceAggregate : public Physical {
+ public:
+ PhysicalType getPhysicalType() const override {
+ return PhysicalType::kCrossReferenceCoalesceAggregate;
+ }
+
+ std::string getName() const override {
+ return "CrossReferenceCoalesceAggregate";
+ }
+
+ const PhysicalPtr& left_child() const {
+ return left_child_;
+ }
+
+ const PhysicalPtr& right_child() const {
+ return right_child_;
+ }
+
+ const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const {
+ return left_join_attributes_;
+ }
+
+ const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const {
+ return right_join_attributes_;
+ }
+
+ const expressions::PredicatePtr& right_filter_predicate() const {
+ return right_filter_predicate_;
+ }
+
+ const std::vector<expressions::AliasPtr>& aggregate_expressions() const {
+ return aggregate_expressions_;
+ }
+
+ inline std::size_t hash_table_num_entries() const {
+ return hash_table_num_entries_;
+ }
+
+ PhysicalPtr copyWithNewChildren(
+ const std::vector<PhysicalPtr> &new_children) const override {
+ DCHECK_EQ(getNumChildren(), new_children.size());
+ return Create(new_children[0],
+ new_children[1],
+ left_join_attributes_,
+ right_join_attributes_,
+ right_filter_predicate_,
+ aggregate_expressions_,
+ hash_table_num_entries_);
+ }
+
+ std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+ std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+ bool maybeCopyWithPrunedExpressions(
+ const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+ PhysicalPtr *output) const override {
+ return false;
+ }
+
+ static CrossReferenceCoalesceAggregatePtr Create(
+ const PhysicalPtr &left_child,
+ const PhysicalPtr &right_child,
+ const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+ const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+ const expressions::PredicatePtr right_filter_predicate,
+ const std::vector<expressions::AliasPtr> &aggregate_expressions,
+ const std::size_t hash_table_num_entries) {
+ return CrossReferenceCoalesceAggregatePtr(
+ new CrossReferenceCoalesceAggregate(left_child,
+ right_child,
+ left_join_attributes,
+ right_join_attributes,
+ right_filter_predicate,
+ aggregate_expressions,
+ hash_table_num_entries));
+ }
+
+ protected:
+ void getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+ CrossReferenceCoalesceAggregate(
+ const PhysicalPtr &left_child,
+ const PhysicalPtr &right_child,
+ const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+ const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+ const expressions::PredicatePtr right_filter_predicate,
+ const std::vector<expressions::AliasPtr> &aggregate_expressions,
+ const std::size_t hash_table_num_entries)
+ : left_child_(left_child),
+ right_child_(right_child),
+ left_join_attributes_(left_join_attributes),
+ right_join_attributes_(right_join_attributes),
+ right_filter_predicate_(right_filter_predicate),
+ aggregate_expressions_(aggregate_expressions),
+ hash_table_num_entries_(hash_table_num_entries) {
+ addChild(left_child_);
+ addChild(right_child_);
+ }
+
+ PhysicalPtr left_child_;
+ PhysicalPtr right_child_;
+ std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
+ std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
+ expressions::PredicatePtr right_filter_predicate_;
+ std::vector<expressions::AliasPtr> aggregate_expressions_;
+ std::size_t hash_table_num_entries_;
+
+ DISALLOW_COPY_AND_ASSIGN(CrossReferenceCoalesceAggregate);
+};
+
+/** @} */
+
+} // namespace physical
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp
index 4336767..0204504 100644
--- a/query_optimizer/physical/PatternMatcher.hpp
+++ b/query_optimizer/physical/PatternMatcher.hpp
@@ -33,6 +33,7 @@ class Aggregate;
class BinaryJoin;
class CopyFrom;
class CreateTable;
+class CrossReferenceCoalesceAggregate;
class DeleteTuples;
class DropTable;
class FilterJoin;
@@ -112,6 +113,8 @@ using SomeAggregate = SomePhysicalNode<Aggregate, PhysicalType::kAggregate>;
using SomeBinaryJoin = SomePhysicalNode<BinaryJoin, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>;
using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>;
using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>;
+using SomeCrossReferenceCoalesceAggregate = SomePhysicalNode<CrossReferenceCoalesceAggregate,
+ PhysicalType::kCrossReferenceCoalesceAggregate>;
using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>;
using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>;
using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index 1da5929..077bd54 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -36,6 +36,7 @@ enum class PhysicalType {
kCopyFrom,
kCreateIndex,
kCreateTable,
+ kCrossReferenceCoalesceAggregate,
kDeleteTuples,
kDropTable,
kFilterJoin,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/BottomUpRule.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp
index 53dff0d..6c14e64 100644
--- a/query_optimizer/rules/BottomUpRule.hpp
+++ b/query_optimizer/rules/BottomUpRule.hpp
@@ -57,21 +57,7 @@ class BottomUpRule : public Rule<TreeType> {
DCHECK(tree != nullptr);
init(tree);
- std::vector<std::shared_ptr<const TreeType>> new_children;
- bool has_changed_children = false;
- for (const std::shared_ptr<const TreeType> &child : tree->children()) {
- std::shared_ptr<const TreeType> new_child = apply(child);
- if (child != new_child && !has_changed_children) {
- has_changed_children = true;
- }
- new_children.push_back(new_child);
- }
-
- if (has_changed_children) {
- return applyToNode(tree->copyWithNewChildren(new_children));
- } else {
- return applyToNode(tree);
- }
+ return applyInternal(tree);
}
protected:
@@ -89,10 +75,29 @@ class BottomUpRule : public Rule<TreeType> {
*
* @param input The input tree.
*/
- virtual void init(const TreeNodePtr &input) {
- }
+ virtual void init(const TreeNodePtr &input) {}
private:
+ TreeNodePtr applyInternal(const TreeNodePtr &tree) {
+ DCHECK(tree != nullptr);
+
+ std::vector<std::shared_ptr<const TreeType>> new_children;
+ bool has_changed_children = false;
+ for (const std::shared_ptr<const TreeType> &child : tree->children()) {
+ std::shared_ptr<const TreeType> new_child = applyInternal(child);
+ if (child != new_child && !has_changed_children) {
+ has_changed_children = true;
+ }
+ new_children.push_back(new_child);
+ }
+
+ if (has_changed_children) {
+ return applyToNode(tree->copyWithNewChildren(new_children));
+ } else {
+ return applyToNode(tree);
+ }
+ }
+
DISALLOW_COPY_AND_ASSIGN(BottomUpRule);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 029d816..a614bde 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
@@ -75,6 +76,27 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
quickstep_queryoptimizer_rules_Rule
quickstep_queryoptimizer_rules_RuleHelper
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin
+ quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+ quickstep_queryoptimizer_expressions_AggregateFunction
+ quickstep_queryoptimizer_expressions_Alias
+ quickstep_queryoptimizer_expressions_AttributeReference
+ quickstep_queryoptimizer_expressions_ExpressionUtil
+ quickstep_queryoptimizer_expressions_Predicate
+ quickstep_queryoptimizer_expressions_NamedExpression
+ quickstep_queryoptimizer_expressions_PatternMatcher
+ quickstep_queryoptimizer_physical_Aggregate
+ quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+ quickstep_queryoptimizer_physical_HashJoin
+ quickstep_queryoptimizer_physical_PatternMatcher
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_physical_PhysicalType
+ quickstep_queryoptimizer_physical_Selection
+ quickstep_queryoptimizer_physical_TopLevelPlan
+ quickstep_queryoptimizer_rules_BottomUpRule
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
glog
quickstep_queryoptimizer_expressions_AttributeReference
@@ -288,6 +310,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
quickstep_queryoptimizer_rules_AttachLIPFilters
quickstep_queryoptimizer_rules_BottomUpRule
quickstep_queryoptimizer_rules_CollapseProject
+ quickstep_queryoptimizer_rules_FuseAggregateJoin
quickstep_queryoptimizer_rules_GenerateJoins
quickstep_queryoptimizer_rules_InjectJoinFilters
quickstep_queryoptimizer_rules_PruneColumns
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/FuseAggregateJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp
new file mode 100644
index 0000000..d65ee27
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.cpp
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
+
+#include <unordered_set>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr FuseAggregateJoin::applyToNode(
+ const P::PhysicalPtr &node) {
+ P::AggregatePtr aggregate;
+ if (!P::SomeAggregate::MatchesWithConditionalCast(node, &aggregate) ||
+ aggregate->filter_predicate() != nullptr) {
+ return node;
+ }
+
+ P::HashJoinPtr hash_join;
+ if ((!P::SomeHashJoin::MatchesWithConditionalCast(aggregate->input(), &hash_join)) ||
+ hash_join->join_type() != P::HashJoin::JoinType::kLeftOuterJoin ||
+ hash_join->residual_predicate() != nullptr) {
+ return node;
+ }
+
+ const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+ hash_join->left_join_attributes();
+ if (left_join_attributes.size() != 1u ||
+ (!cost_model_->impliesUniqueAttributes(hash_join->left(), left_join_attributes))) {
+ return node;
+ }
+
+ const std::vector<E::NamedExpressionPtr> &grouping_expressions =
+ aggregate->grouping_expressions();
+ if (grouping_expressions.size() != 1u ||
+ grouping_expressions.front()->id() != left_join_attributes.front()->id()) {
+ return node;
+ }
+
+ std::unordered_set<E::ExprId> right_side_attr_ids;
+ for (const auto &attr : hash_join->right()->getOutputAttributes()) {
+ right_side_attr_ids.insert(attr->id());
+ }
+
+ const std::vector<E::AliasPtr> &aggregate_expressions =
+ aggregate->aggregate_expressions();
+ for (const auto &expr : aggregate_expressions) {
+ const E::AggregateFunctionPtr aggr_expr =
+ std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+
+ const std::vector<E::ScalarPtr> &arguments = aggr_expr->getArguments();
+ if (arguments.size() != 1u) {
+ return node;
+ }
+
+ E::AttributeReferencePtr arg_attr;
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(arguments.front(), &arg_attr) ||
+ right_side_attr_ids.find(arg_attr->id()) == right_side_attr_ids.end()) {
+ return node;
+ }
+ }
+
+ std::size_t max_num_groups;
+ // TODO: not actually aggregate here
+ if (!canUseCollisionFreeAggregation(aggregate,
+ cost_model_->estimateNumGroupsForAggregate(aggregate),
+ &max_num_groups)) {
+ return node;
+ }
+
+ // Fuse filter predicate.
+ P::PhysicalPtr right_child = hash_join->right();
+ const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+ hash_join->right_join_attributes();
+ E::PredicatePtr right_filter_predicate = nullptr;
+
+ P::SelectionPtr selection;
+ if (P::SomeSelection::MatchesWithConditionalCast(right_child, &selection)) {
+ if (E::SubsetOfExpressions(right_join_attributes,
+ selection->input()->getOutputAttributes())) {
+ right_child = selection->input();
+ right_filter_predicate = selection->filter_predicate();
+ }
+ }
+
+ return P::CrossReferenceCoalesceAggregate::Create(hash_join->left(),
+ right_child,
+ left_join_attributes,
+ right_join_attributes,
+ right_filter_predicate,
+ aggregate_expressions,
+ max_num_groups);
+}
+
+void FuseAggregateJoin::init(const P::PhysicalPtr &input) {
+ DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+ const P::TopLevelPlanPtr top_level_plan =
+ std::static_pointer_cast<const P::TopLevelPlan>(input);
+ cost_model_.reset(
+ new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+bool FuseAggregateJoin::canUseCollisionFreeAggregation(
+ const P::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *max_num_groups) const {
+#ifdef QUICKSTEP_DISTRIBUTED
+ // Currently we cannot do this fast path with the distributed setting. See
+ // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+ // FinalizeAggregationOperator::getAllWorkOrderProtos().
+ return false;
+#endif
+
+ // Supports only single group-by key.
+ if (aggregate->grouping_expressions().size() != 1) {
+ return false;
+ }
+
+ // We need to know the exact min/max stats of the group-by key.
+ // So it must be a CatalogAttribute (but not an expression).
+ E::AttributeReferencePtr group_by_key_attr;
+ const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+ return false;
+ }
+
+ bool min_value_stat_is_exact;
+ bool max_value_stat_is_exact;
+ const TypedValue min_value =
+ cost_model_->findMinValueStat(
+ aggregate, group_by_key_attr, &min_value_stat_is_exact);
+ const TypedValue max_value =
+ cost_model_->findMaxValueStat(
+ aggregate, group_by_key_attr, &max_value_stat_is_exact);
+ if (min_value.isNull() || max_value.isNull() ||
+ (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+ return false;
+ }
+
+ std::int64_t min_cpp_value;
+ std::int64_t max_cpp_value;
+ switch (group_by_key_attr->getValueType().getTypeID()) {
+ case TypeID::kInt: {
+ min_cpp_value = min_value.getLiteral<int>();
+ max_cpp_value = max_value.getLiteral<int>();
+ break;
+ }
+ case TypeID::kLong: {
+ min_cpp_value = min_value.getLiteral<std::int64_t>();
+ max_cpp_value = max_value.getLiteral<std::int64_t>();
+ break;
+ }
+ default:
+ return false;
+ }
+
+ // TODO(jianqiao):
+ // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+ // 2. Reason about the table size bound (e.g. by checking memory size) instead
+ // of hardcoding it as a gflag.
+ if (min_cpp_value < 0 ||
+ max_cpp_value >= 1000000000 ||
+ max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+ return false;
+ }
+
+ for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+ const E::AggregateFunctionPtr agg_func =
+ std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+ if (agg_func->is_distinct()) {
+ return false;
+ }
+
+ // TODO(jianqiao): Support AggregationID::AVG.
+ switch (agg_func->getAggregate().getAggregationID()) {
+ case AggregationID::kCount: // Fall through
+ case AggregationID::kSum:
+ break;
+ default:
+ return false;
+ }
+
+ const auto &arguments = agg_func->getArguments();
+ if (arguments.size() > 1) {
+ return false;
+ }
+
+ if (arguments.size() == 1) {
+ switch (arguments.front()->getValueType().getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ case TypeID::kFloat:
+ case TypeID::kDouble:
+ break;
+ default:
+ return false;
+ }
+ }
+ }
+
+ *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+ return true;
+}
+
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/FuseAggregateJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.hpp b/query_optimizer/rules/FuseAggregateJoin.hpp
new file mode 100644
index 0000000..9e2ec04
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.hpp
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ * @{
+ */
+
+class FuseAggregateJoin : public BottomUpRule<physical::Physical> {
+ public:
+ /**
+ * @brief Constructor.
+ */
+ FuseAggregateJoin() {}
+
+ ~FuseAggregateJoin() override {}
+
+ std::string getName() const override {
+ return "FuseAggregateOnLeftOuterJoin";
+ }
+
+ protected:
+ physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &node) override;
+
+ void init(const physical::PhysicalPtr &input) override;
+
+ private:
+ bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *max_num_groups) const;
+
+ std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+ DISALLOW_COPY_AND_ASSIGN(FuseAggregateJoin);
+};
+
+/** @} */
+
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
new file mode 100644
index 0000000..6a77696
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
+
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+namespace {
+
+template <typename CppType, bool is_attr_nullable>
+void ExecuteBuild(const attribute_id attr_id,
+ ValueAccessor *accessor,
+ BarrieredReadWriteConcurrentBitVector *existence_map) {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+ if (!is_attr_nullable || value != nullptr) {
+ existence_map->setBit(*reinterpret_cast<const CppType *>(value));
+ }
+ }
+ });
+}
+
+template <typename CppType>
+void ExecuteHelper(const attribute_id attr_id,
+ const bool is_attr_nullable,
+ ValueAccessor *accessor,
+ BarrieredReadWriteConcurrentBitVector *existence_map) {
+ if (is_attr_nullable) {
+ ExecuteBuild<CppType, true>(attr_id, accessor, existence_map);
+ } else {
+ ExecuteBuild<CppType, false>(attr_id, accessor, existence_map);
+ }
+}
+
+}
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ if (input_relation_is_stored_) {
+ if (!started_) {
+ for (const block_id input_block_id : input_relation_block_ids_) {
+ container->addNormalWorkOrder(
+ new BuildAggregationExistenceMapWorkOrder(
+ query_id_,
+ input_relation_,
+ input_block_id,
+ build_attribute_,
+ query_context->getAggregationState(aggr_state_index_),
+ storage_manager),
+ op_index_);
+ }
+ started_ = true;
+ }
+ return true;
+ } else {
+ while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ container->addNormalWorkOrder(
+ new BuildAggregationExistenceMapWorkOrder(
+ query_id_,
+ input_relation_,
+ input_relation_block_ids_[num_workorders_generated_],
+ build_attribute_,
+ query_context->getAggregationState(aggr_state_index_),
+ storage_manager),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ return done_feeding_input_relation_;
+ }
+}
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ LOG(FATAL) << "Not implemented";
+}
+
+void BuildAggregationExistenceMapWorkOrder::execute() {
+ BlockReference block(
+ storage_manager_->getBlock(build_block_id_, input_relation_));
+
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor());
+
+ const Type &attr_type =
+ input_relation_.getAttributeById(build_attribute_)->getType();
+ switch (attr_type.getTypeID()) {
+ case TypeID::kInt:
+ ExecuteHelper<int>(build_attribute_,
+ attr_type.isNullable(),
+ accessor.get(),
+ state_->getExistenceMap());
+ return;
+ case TypeID::kLong:
+ ExecuteHelper<std::int64_t>(build_attribute_,
+ attr_type.isNullable(),
+ accessor.get(),
+ state_->getExistenceMap());
+ return;
+ default:
+ LOG(FATAL) << "Build attribute type not supported by "
+ << "BuildAggregationExistenceMapOperator: "
+ << attr_type.getName();
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
new file mode 100644
index 0000000..2d13bda
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class CatalogRelationSchema;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+class BuildAggregationExistenceMapOperator : public RelationalOperator {
+ public:
+ BuildAggregationExistenceMapOperator(const std::size_t query_id,
+ const CatalogRelation &input_relation,
+ const attribute_id build_attribute,
+ const bool input_relation_is_stored,
+ const QueryContext::aggregation_state_id aggr_state_index)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
+ build_attribute_(build_attribute),
+ input_relation_is_stored_(input_relation_is_stored),
+ input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+ : std::vector<block_id>()),
+ aggr_state_index_(aggr_state_index),
+ num_workorders_generated_(0),
+ started_(false) {}
+
+ ~BuildAggregationExistenceMapOperator() override {}
+
+ std::string getName() const override {
+ return "BuildAggregationExistenceMapOperator";
+ }
+
+ const CatalogRelation& input_relation() const {
+ return input_relation_;
+ }
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
+ input_relation_block_ids_.push_back(input_block_id);
+ }
+
+ private:
+ serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+ const CatalogRelation &input_relation_;
+ const attribute_id build_attribute_;
+ const bool input_relation_is_stored_;
+ std::vector<block_id> input_relation_block_ids_;
+ const QueryContext::aggregation_state_id aggr_state_index_;
+
+ std::vector<block_id>::size_type num_workorders_generated_;
+ bool started_;
+
+ DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator.
+ **/
+class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
+ public:
+ BuildAggregationExistenceMapWorkOrder(const std::size_t query_id,
+ const CatalogRelationSchema &input_relation,
+ const block_id build_block_id,
+ const attribute_id build_attribute,
+ AggregationOperationState *state,
+ StorageManager *storage_manager)
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
+ build_block_id_(build_block_id),
+ build_attribute_(build_attribute),
+ state_(DCHECK_NOTNULL(state)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+ ~BuildAggregationExistenceMapWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ const CatalogRelationSchema &input_relation_;
+ const block_id build_block_id_;
+ const attribute_id build_attribute_;
+ AggregationOperationState *state_;
+
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..33321d3 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ set_gflags_lib_name ()
# Declare micro-libs:
add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
+add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+ BuildAggregationExistenceMapOperator.cpp
+ BuildAggregationExistenceMapOperator.hpp)
add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
@@ -95,6 +98,27 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
quickstep_utility_lipfilter_LIPFilterAdaptiveProber
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
+target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_AggregationOperationState
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_BuildHashOperator
glog
quickstep_catalog_CatalogRelation
@@ -552,6 +576,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_AggregationOperator
+ quickstep_relationaloperators_BuildAggregationExistenceMapOperator
quickstep_relationaloperators_BuildLIPFilterOperator
quickstep_relationaloperators_BuildHashOperator
quickstep_relationaloperators_CreateIndexOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0b34908..2d85312 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -412,12 +412,23 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
}
}
+BarrieredReadWriteConcurrentBitVector* AggregationOperationState
+ ::getExistenceMap() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeVectorTable *>(
+ collision_free_hashtable_.get())->getExistenceMap();
+ } else {
+ LOG(FATAL) << "AggregationOperationState::getExistenceMap() "
+ << "is not supported by this aggregation";
+ }
+}
+
void AggregationOperationState::initialize(const std::size_t partition_id) {
if (is_aggregate_collision_free_) {
static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->initialize(partition_id);
} else {
- LOG(FATAL) << "AggregationOperationState::initializeState() "
+ LOG(FATAL) << "AggregationOperationState::initialize() "
<< "is not supported by this aggregation";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 13ee377..23f23b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -39,6 +39,7 @@ namespace quickstep {
namespace serialization { class AggregationOperationState; }
class AggregateFunction;
+class BarrieredReadWriteConcurrentBitVector;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
@@ -167,6 +168,8 @@ class AggregationOperationState {
**/
std::size_t getNumFinalizationPartitions() const;
+ BarrieredReadWriteConcurrentBitVector* getExistenceMap() const;
+
/**
* @brief Initialize the specified partition of this aggregation.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 4f3e238..cd76854 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -104,6 +104,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
return existence_map_->onesCountInRange(start_position, end_position);
}
+ inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
+ return existence_map_.get();
+ }
+
/**
* @brief Initialize the specified partition of this aggregation table.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 6ad0567..7c4c238 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -20,9 +20,8 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
-#include <atomic>
+#include <cstddef>
#include <cstdint>
-#include <cstring>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -31,6 +30,7 @@
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
#include "utility/Macros.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
@@ -64,14 +64,10 @@ class BitVectorExactFilter : public LIPFilter {
: LIPFilter(LIPFilterType::kBitVectorExactFilter),
min_value_(static_cast<CppType>(min_value)),
max_value_(static_cast<CppType>(max_value)),
- bit_array_(GetByteSize(max_value - min_value + 1)) {
+ bit_vector_(max_value - min_value + 1) {
DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
DCHECK_GE(max_value_, min_value_);
-
- std::memset(bit_array_.data(),
- 0x0,
- sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
}
void insertValueAccessor(ValueAccessor *accessor,
@@ -109,13 +105,6 @@ class BitVectorExactFilter : public LIPFilter {
private:
/**
- * @brief Round up bit_size to multiples of 8.
- */
- inline static std::size_t GetByteSize(const std::size_t bit_size) {
- return (bit_size + 7u) / 8u;
- }
-
- /**
* @brief Iterate through the accessor and hash values into the internal bit
* array.
*/
@@ -164,8 +153,7 @@ class BitVectorExactFilter : public LIPFilter {
DCHECK_GE(value, min_value_);
DCHECK_LE(value, max_value_);
- const CppType loc = value - min_value_;
- bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+ bit_vector_.setBit(value - min_value_);
}
/**
@@ -177,9 +165,7 @@ class BitVectorExactFilter : public LIPFilter {
return is_anti_filter;
}
- const CppType loc = value - min_value_;
- const bool is_bit_set =
- (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+ const bool is_bit_set = bit_vector_.getBit(value - min_value_);
if (is_anti_filter) {
return !is_bit_set;
@@ -190,7 +176,7 @@ class BitVectorExactFilter : public LIPFilter {
const CppType min_value_;
const CppType max_value_;
- alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+ BarrieredReadWriteConcurrentBitVector bit_vector_;
DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index edd0d24..0a820cf 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -39,8 +39,9 @@ target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
quickstep_types_Type
- quickstep_utility_lipfilter_LIPFilter
- quickstep_utility_Macros)
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter)
target_link_libraries(quickstep_utility_lipfilter_LIPFilter
quickstep_catalog_CatalogTypedefs
quickstep_storage_StorageBlockInfo
@@ -83,5 +84,6 @@ target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
quickstep_types_Type
- quickstep_utility_lipfilter_LIPFilter
- quickstep_utility_Macros)
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
index 5c0e8a2..6eaa93e 100644
--- a/utility/lip_filter/SingleIdentityHashFilter.hpp
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -20,10 +20,8 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
-#include <atomic>
#include <cstddef>
#include <cstdint>
-#include <cstring>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -32,6 +30,7 @@
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
#include "utility/Macros.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
@@ -65,11 +64,8 @@ class SingleIdentityHashFilter : public LIPFilter {
explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
: LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
filter_cardinality_(filter_cardinality),
- bit_array_(GetByteSize(filter_cardinality)) {
+ bit_vector_(filter_cardinality) {
DCHECK_GE(filter_cardinality, 1u);
- std::memset(bit_array_.data(),
- 0x0,
- sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
}
void insertValueAccessor(ValueAccessor *accessor,
@@ -158,8 +154,9 @@ class SingleIdentityHashFilter : public LIPFilter {
* @brief Inserts a given value into the hash filter.
*/
inline void insert(const void *key_begin) {
- const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
- bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+ const CppType hash =
+ *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ bit_vector_.setBit(hash);
}
/**
@@ -168,12 +165,13 @@ class SingleIdentityHashFilter : public LIPFilter {
* If false is returned, a value is certainly not present in the hash filter.
*/
inline bool contains(const void *key_begin) const {
- const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
- return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+ const CppType hash =
+ *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ return bit_vector_.getBit(hash);
}
std::size_t filter_cardinality_;
- alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+ BarrieredReadWriteConcurrentBitVector bit_vector_;
DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
};