You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/06/30 15:59:40 UTC
incubator-quickstep git commit: Created
WindowAggregationOperationState
Repository: incubator-quickstep
Updated Branches:
refs/heads/SQL-window-aggregation c6ed1c306 -> 678d94dc3
Created WindowAggregationOperationState
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/678d94dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/678d94dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/678d94dc
Branch: refs/heads/SQL-window-aggregation
Commit: 678d94dc306e9a85dc9b32f9d5a8f630a08ce78d
Parents: c6ed1c3
Author: shixuan-fan <sh...@apache.org>
Authored: Thu Jun 30 15:59:21 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Thu Jun 30 15:59:21 2016 +0000
----------------------------------------------------------------------
storage/CMakeLists.txt | 33 +++-
storage/WindowAggregationOperationState.cpp | 179 +++++++++++++++++++++
storage/WindowAggregationOperationState.hpp | 141 ++++++++++++++++
storage/WindowAggregationOperationState.proto | 33 ++++
4 files changed, 385 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index b536411..47262a1 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -131,6 +131,9 @@ QS_PROTOBUF_GENERATE_CPP(storage_InsertDestination_proto_srcs
QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
storage_StorageBlockLayout_proto_hdrs
StorageBlockLayout.proto)
+QS_PROTOBUF_GENERATE_CPP(storage_WindowAggregationOperationState_proto_srcs
+ storage_WindowAggregationOperationState_proto_hdrs
+ WindowAggregationOperationState.proto)
if (ENABLE_DISTRIBUTED)
GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
@@ -254,6 +257,11 @@ add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp
add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp)
add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp)
+add_library(quickstep_storage_WindowAggregationOperationState
+ WindowAggregationOperationState.hpp
+ WindowAggregationOperationState.cpp)
+add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_WindowAggregationOperationState_proto_srcs})
+
# Link dependencies:
target_link_libraries(quickstep_storage_AggregationOperationState
@@ -1038,6 +1046,27 @@ target_link_libraries(quickstep_storage_ValueAccessorUtil
quickstep_storage_ValueAccessor
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState
+ glog
+ quickstep_catalog_CatalogDatabaseLite
+ quickstep_catalog_CatalogRelationSchema
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_ExpressionFactories
+ quickstep_expressions_Expressions_proto
+ quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_aggregation_AggregateFunctionFactory
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_expressions_scalar_Scalar
+ quickstep_expressions_scalar_ScalarAttribute
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_WindowAggregationOperationState_proto
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
+ quickstep_expressions_aggregation_AggregateFunction_proto
+ quickstep_expressions_Expressions_proto
+ ${PROTOBUF_LIBRARY})
# Module all-in-one library:
add_library(quickstep_storage ../empty_src.cpp StorageModule.hpp)
@@ -1096,7 +1125,9 @@ target_link_libraries(quickstep_storage
quickstep_storage_TupleReference
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
- quickstep_storage_ValueAccessorUtil)
+ quickstep_storage_ValueAccessorUtil
+ quickstep_storage_WindowAggregationOperationState
+ quickstep_storage_WindowAggregationOperationState_proto)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_storage
quickstep_storage_FileManagerHdfs)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
new file mode 100644
index 0000000..4434b01
--- /dev/null
+++ b/storage/WindowAggregationOperationState.cpp
@@ -0,0 +1,179 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "storage/WindowAggregationOperationState.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabaseLite.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/ExpressionFactories.hpp"
+#include "expressions/Expressions.pb.h"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationOperationState::WindowAggregationOperationState(
+ const CatalogRelationSchema &input_relation,
+ const AggregateFunction *window_aggregate_function,
+ std::vector<std::unique_ptr<const Scalar>> &&arguments,
+ std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager)
+ : input_relation_(input_relation),
+ arguments_(std::move(arguments)),
+ partition_by_attributes_(std::move(partition_by_attributes)),
+ is_row_(is_row),
+ num_preceding_(num_preceding),
+ num_following_(num_following),
+ storage_manager_(storage_manager) {
+ // Get the Types of this window aggregate's arguments so that we can create an
+ // AggregationHandle.
+ // TODO(Shixuan): Next step: New handles for window aggregation function.
+ std::vector<const Type*> argument_types;
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ argument_types.emplace_back(&argument->getType());
+ }
+
+ // Check if window aggregate function could apply to the arguments.
+ DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
+
+ // Create the handle and initial state.
+ window_aggregation_handle_.reset(
+ window_aggregate_function->createHandle(argument_types));
+ window_aggregation_state_.reset(
+ window_aggregation_handle_->createInitialState());
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+ // See if all of this window aggregate's arguments are attributes in the input
+ // relation. If so, remember the attribute IDs so that we can do copy elision
+ // when actually performing the window aggregation.
+ arguments_as_attributes_.reserve(arguments_.size());
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
+ if (argument_id == -1) {
+ arguments_as_attributes_.clear();
+ break;
+ } else {
+ DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+ arguments_as_attributes_.push_back(argument_id);
+ }
+ }
+#endif
+}
+
+WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
+ const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database,
+ StorageManager *storage_manager) {
+ DCHECK(ProtoIsValid(proto, database));
+
+ // Rebuild contructor arguments from their representation in 'proto'.
+ const AggregateFunction *aggregate_function
+ = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
+
+ std::vector<std::unique_ptr<const Scalar>> arguments;
+ arguments.reserve(proto.arguments_size());
+ for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
+ arguments.emplace_back(ScalarFactory::ReconstructFromProto(
+ proto.arguments(argument_idx),
+ database));
+ }
+
+ std::vector<std::unique_ptr<const Scalar>> partition_by_attributes;
+ for (int attribute_idx = 0;
+ attribute_idx < proto.partition_by_attributes_size();
+ ++attribute_idx) {
+ partition_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+ proto.partition_by_attributes(attribute_idx),
+ database));
+ }
+
+ const bool is_row = proto.is_row();
+ const std::int64_t num_preceding = proto.num_preceding();
+ const std::int64_t num_following = proto.num_following();
+
+ return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
+ aggregate_function,
+ std::move(arguments),
+ std::move(partition_by_attributes),
+ is_row,
+ num_preceding,
+ num_following,
+ storage_manager);
+}
+
+bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database) {
+ if (!proto.IsInitialized() ||
+ !database.hasRelationWithId(proto.relation_id())) {
+ return false;
+ }
+
+ if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+ return false;
+ }
+
+ // TODO(chasseur): We may also want to check that the specified
+ // AggregateFunction is applicable to the specified arguments, but that
+ // requires partial deserialization and may be too heavyweight for this
+ // method.
+ // TODO(Shixuan): The TODO for AggregateFunction could also be applied here.
+ for (int argument_idx = 0;
+ argument_idx < proto.arguments_size();
+ ++argument_idx) {
+ if (!ScalarFactory::ProtoIsValid(proto.arguments(argument_idx), database)) {
+ return false;
+ }
+ }
+
+ for (int attribute_idx = 0;
+ attribute_idx < proto.partition_by_attributes_size();
+ ++attribute_idx) {
+ if (!ScalarFactory::ProtoIsValid(proto.partition_by_attributes(attribute_idx),
+ database)) {
+ return false;
+ }
+ }
+
+ if (proto.num_preceding() < -1 || proto.num_following() < -1) {
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
new file mode 100644
index 0000000..195218b
--- /dev/null
+++ b/storage/WindowAggregationOperationState.hpp
@@ -0,0 +1,141 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+#define QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregateFunction;
+class CatalogDatabaseLite;
+class CatalogRelationSchema;
+class InsertDestination;
+class StorageManager;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief Helper class for maintaining the state of window aggregation.
+ **/
+class WindowAggregationOperationState {
+ public:
+ /**
+ * @brief Constructor for window aggregation operation state.
+ *
+ * @param input_relation Input relation on which window aggregation is computed.
+ * @param window_aggregate_functions The window aggregate function to be
+ * computed.
+ * @param arguments A list of argument expressions to that aggregate.
+ * @param partition_by_attributes A list of window partition key.
+ * @param is_row True if the window frame is calculated by ROW, false if it is
+ * calculated by RANGE.
+ * @param num_preceding The number of rows/range for the tuples preceding the
+ * current row. -1 means UNBOUNDED PRECEDING.
+ * @param num_following The number of rows/range for the tuples following the
+ * current row. -1 means UNBOUNDED FOLLOWING.
+ * @param storage_manager The StorageManager to use for allocating hash
+ * tables.
+ */
+ WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+ const AggregateFunction *window_aggregate_function,
+ std::vector<std::unique_ptr<const Scalar>> &&arguments,
+ std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager);
+
+ ~WindowAggregationOperationState() {}
+
+ /**
+ * @brief Generate the window aggregation operation state from the serialized
+ * Protocol Buffer representation.
+ *
+ * @param proto A serialized protocol buffer representation of a
+ * WindowAggregationOperationState, originally generated by the
+ * optimizer.
+ * @param database The database for resolving relation and attribute
+ * references.
+ * @param storage_manager The StorageManager to use.
+ **/
+ static WindowAggregationOperationState* ReconstructFromProto(
+ const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database,
+ StorageManager *storage_manager);
+
+ /**
+ * @brief Check whether a serialization::AggregationOperationState is
+ * fully-formed and all parts are valid.
+ *
+ * @param proto A serialized Protocol Buffer representation of an
+ * AggregationOperationState, originally generated by the optimizer.
+ * @param database The Database to resolve relation and attribute references
+ * in.
+ * @return Whether proto is fully-formed and valid.
+ **/
+ static bool ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database);
+
+ private:
+ const CatalogRelationSchema &input_relation_;
+
+ // TODO(Shixuan): Handle and State for window aggregation will be needed for
+ // actual calculation.
+ std::unique_ptr<AggregationHandle> window_aggregation_handle_;
+ std::unique_ptr<AggregationState> window_aggregation_state_;
+ std::vector<std::unique_ptr<const Scalar>> arguments_;
+
+ // We don't add order_by_attributes here since it is not needed after sorting.
+ std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
+
+ // Window framing information.
+ const bool is_row_;
+ const std::int64_t num_preceding_;
+ const std::int64_t num_following_;
+
+ StorageManager *storage_manager_;
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+ // If all an aggregate's argument expressions are simply attributes in
+ // 'input_relation_', then this caches the attribute IDs of those arguments.
+ std::vector<attribute_id> arguments_as_attributes_;
+#endif
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
new file mode 100644
index 0000000..cf92fe2
--- /dev/null
+++ b/storage/WindowAggregationOperationState.proto
@@ -0,0 +1,33 @@
+// Copyright 2011-2015 Quickstep Technologies LLC.
+// Copyright 2015-2016 Pivotal Software, Inc.
+// Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+// University of Wisconsin\u2014Madison.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/Expressions.proto";
+
+message WindowAggregationOperationState {
+ required int32 relation_id = 1;
+ required AggregateFunction function = 2;
+ repeated Scalar arguments = 3;
+ repeated Scalar partition_by_attributes = 4;
+ required bool is_row = 5;
+ required int64 num_preceding = 6;
+ required int64 num_following = 7;
+}