You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/06/30 15:59:40 UTC

incubator-quickstep git commit: Created WindowAggregationOperationState

Repository: incubator-quickstep
Updated Branches:
  refs/heads/SQL-window-aggregation c6ed1c306 -> 678d94dc3


Created WindowAggregationOperationState


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/678d94dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/678d94dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/678d94dc

Branch: refs/heads/SQL-window-aggregation
Commit: 678d94dc306e9a85dc9b32f9d5a8f630a08ce78d
Parents: c6ed1c3
Author: shixuan-fan <sh...@apache.org>
Authored: Thu Jun 30 15:59:21 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Thu Jun 30 15:59:21 2016 +0000

----------------------------------------------------------------------
 storage/CMakeLists.txt                        |  33 +++-
 storage/WindowAggregationOperationState.cpp   | 179 +++++++++++++++++++++
 storage/WindowAggregationOperationState.hpp   | 141 ++++++++++++++++
 storage/WindowAggregationOperationState.proto |  33 ++++
 4 files changed, 385 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index b536411..47262a1 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -131,6 +131,9 @@ QS_PROTOBUF_GENERATE_CPP(storage_InsertDestination_proto_srcs
 QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
                          storage_StorageBlockLayout_proto_hdrs
                          StorageBlockLayout.proto)
+QS_PROTOBUF_GENERATE_CPP(storage_WindowAggregationOperationState_proto_srcs
+                         storage_WindowAggregationOperationState_proto_hdrs
+                         WindowAggregationOperationState.proto)
 
 if (ENABLE_DISTRIBUTED)
   GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
@@ -254,6 +257,11 @@ add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
 add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp)
 add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp)
+add_library(quickstep_storage_WindowAggregationOperationState
+            WindowAggregationOperationState.hpp
+            WindowAggregationOperationState.cpp)
+add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_WindowAggregationOperationState_proto_srcs})
+
 
 # Link dependencies:
 target_link_libraries(quickstep_storage_AggregationOperationState
@@ -1038,6 +1046,27 @@ target_link_libraries(quickstep_storage_ValueAccessorUtil
                       quickstep_storage_ValueAccessor
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState
+                      glog
+                      quickstep_catalog_CatalogDatabaseLite
+                      quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_ExpressionFactories
+                      quickstep_expressions_Expressions_proto
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregateFunctionFactory
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_WindowAggregationOperationState_proto
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
+                      quickstep_expressions_aggregation_AggregateFunction_proto
+                      quickstep_expressions_Expressions_proto
+                      ${PROTOBUF_LIBRARY})
 
 # Module all-in-one library:
 add_library(quickstep_storage ../empty_src.cpp StorageModule.hpp)
@@ -1096,7 +1125,9 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
-                      quickstep_storage_ValueAccessorUtil)
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_storage_WindowAggregationOperationState
+                      quickstep_storage_WindowAggregationOperationState_proto)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   target_link_libraries(quickstep_storage
                         quickstep_storage_FileManagerHdfs)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
new file mode 100644
index 0000000..4434b01
--- /dev/null
+++ b/storage/WindowAggregationOperationState.cpp
@@ -0,0 +1,179 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "storage/WindowAggregationOperationState.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabaseLite.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/ExpressionFactories.hpp"
+#include "expressions/Expressions.pb.h"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationOperationState::WindowAggregationOperationState(
+    const CatalogRelationSchema &input_relation,
+    const AggregateFunction *window_aggregate_function,
+    std::vector<std::unique_ptr<const Scalar>> &&arguments,
+    std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following,
+    StorageManager *storage_manager)
+    : input_relation_(input_relation),
+      arguments_(std::move(arguments)),
+      partition_by_attributes_(std::move(partition_by_attributes)),
+      is_row_(is_row),
+      num_preceding_(num_preceding),
+      num_following_(num_following),
+      storage_manager_(storage_manager) {
+  // Get the Types of this window aggregate's arguments so that we can create an
+  // AggregationHandle.
+  // TODO(Shixuan): Next step: New handles for window aggregation function.
+  std::vector<const Type*> argument_types;
+  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+    argument_types.emplace_back(&argument->getType());
+  }
+
+  // Check if window aggregate function could apply to the arguments.
+  DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
+
+  // Create the handle and initial state.
+  window_aggregation_handle_.reset(
+      window_aggregate_function->createHandle(argument_types));
+  window_aggregation_state_.reset(
+      window_aggregation_handle_->createInitialState());
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+  // See if all of this window aggregate's arguments are attributes in the input
+  // relation. If so, remember the attribute IDs so that we can do copy elision
+  // when actually performing the window aggregation.
+  arguments_as_attributes_.reserve(arguments_.size());
+  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+    const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
+    if (argument_id == -1) {
+      arguments_as_attributes_.clear();
+      break;
+    } else {
+      DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+      arguments_as_attributes_.push_back(argument_id);
+    }
+  }
+#endif
+}
+
+WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
+    const serialization::WindowAggregationOperationState &proto,
+    const CatalogDatabaseLite &database,
+    StorageManager *storage_manager) {
+  DCHECK(ProtoIsValid(proto, database));
+
+  // Rebuild contructor arguments from their representation in 'proto'.
+  const AggregateFunction *aggregate_function
+      = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
+
+  std::vector<std::unique_ptr<const Scalar>> arguments;
+  arguments.reserve(proto.arguments_size());
+  for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
+    arguments.emplace_back(ScalarFactory::ReconstructFromProto(
+        proto.arguments(argument_idx),
+        database));
+  }
+
+  std::vector<std::unique_ptr<const Scalar>> partition_by_attributes;
+  for (int attribute_idx = 0;
+       attribute_idx < proto.partition_by_attributes_size();
+       ++attribute_idx) {
+    partition_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+        proto.partition_by_attributes(attribute_idx),
+        database));
+  }
+
+  const bool is_row = proto.is_row();
+  const std::int64_t num_preceding = proto.num_preceding();
+  const std::int64_t num_following = proto.num_following();
+
+  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
+                                             aggregate_function,
+                                             std::move(arguments),
+                                             std::move(partition_by_attributes),
+                                             is_row,
+                                             num_preceding,
+                                             num_following,
+                                             storage_manager);
+}
+
+bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+                                                   const CatalogDatabaseLite &database) {
+  if (!proto.IsInitialized() ||
+      !database.hasRelationWithId(proto.relation_id())) {
+    return false;
+  }
+
+  if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+    return false;
+  }
+
+  // TODO(chasseur): We may also want to check that the specified
+  // AggregateFunction is applicable to the specified arguments, but that
+  // requires partial deserialization and may be too heavyweight for this
+  // method.
+  // TODO(Shixuan): The TODO for AggregateFunction could also be applied here.
+  for (int argument_idx = 0;
+       argument_idx < proto.arguments_size();
+       ++argument_idx) {
+    if (!ScalarFactory::ProtoIsValid(proto.arguments(argument_idx), database)) {
+      return false;
+    }
+  }
+
+  for (int attribute_idx = 0;
+       attribute_idx < proto.partition_by_attributes_size();
+       ++attribute_idx) {
+    if (!ScalarFactory::ProtoIsValid(proto.partition_by_attributes(attribute_idx),
+                                     database)) {
+      return false;
+    }
+  }
+
+  if (proto.num_preceding() < -1 || proto.num_following() < -1) {
+    return false;
+  }
+  
+  return true;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
new file mode 100644
index 0000000..195218b
--- /dev/null
+++ b/storage/WindowAggregationOperationState.hpp
@@ -0,0 +1,141 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+#define QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregateFunction;
+class CatalogDatabaseLite;
+class CatalogRelationSchema;
+class InsertDestination;
+class StorageManager;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief Helper class for maintaining the state of window aggregation. 
+ **/
+class WindowAggregationOperationState {
+ public:
+  /**
+   * @brief Constructor for window aggregation operation state.
+   *
+   * @param input_relation Input relation on which window aggregation is computed.
+   * @param window_aggregate_functions The window aggregate function to be
+   *                                   computed.
+   * @param arguments A list of argument expressions to that aggregate.
+   * @param partition_by_attributes A list of window partition key.
+   * @param is_row True if the window frame is calculated by ROW, false if it is
+   *               calculated by RANGE.
+   * @param num_preceding The number of rows/range for the tuples preceding the
+   *                      current row. -1 means UNBOUNDED PRECEDING.
+   * @param num_following The number of rows/range for the tuples following the
+   *                      current row. -1 means UNBOUNDED FOLLOWING.
+   * @param storage_manager The StorageManager to use for allocating hash
+   *        tables.
+   */
+  WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+                                  const AggregateFunction *window_aggregate_function,
+                                  std::vector<std::unique_ptr<const Scalar>> &&arguments,
+                                  std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+                                  const bool is_row,
+                                  const std::int64_t num_preceding,
+                                  const std::int64_t num_following,
+                                  StorageManager *storage_manager);
+
+  ~WindowAggregationOperationState() {}
+
+  /**
+   * @brief Generate the window aggregation operation state from the serialized
+   *        Protocol Buffer representation.
+   *
+   * @param proto A serialized protocol buffer representation of a
+   *        WindowAggregationOperationState, originally generated by the
+   *        optimizer.
+   * @param database The database for resolving relation and attribute
+   *        references.
+   * @param storage_manager The StorageManager to use.
+   **/
+  static WindowAggregationOperationState* ReconstructFromProto(
+      const serialization::WindowAggregationOperationState &proto,
+      const CatalogDatabaseLite &database,
+      StorageManager *storage_manager);
+
+  /**
+   * @brief Check whether a serialization::AggregationOperationState is
+   *        fully-formed and all parts are valid.
+   *
+   * @param proto A serialized Protocol Buffer representation of an
+   *        AggregationOperationState, originally generated by the optimizer.
+   * @param database The Database to resolve relation and attribute references
+   *        in.
+   * @return Whether proto is fully-formed and valid.
+   **/
+  static bool ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+                           const CatalogDatabaseLite &database);
+
+ private:
+  const CatalogRelationSchema &input_relation_;
+
+  // TODO(Shixuan): Handle and State for window aggregation will be needed for
+  //                actual calculation.
+  std::unique_ptr<AggregationHandle> window_aggregation_handle_;
+  std::unique_ptr<AggregationState> window_aggregation_state_;
+  std::vector<std::unique_ptr<const Scalar>> arguments_;
+
+  // We don't add order_by_attributes here since it is not needed after sorting.
+  std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
+
+  // Window framing information.
+  const bool is_row_;
+  const std::int64_t num_preceding_;
+  const std::int64_t num_following_;
+  
+  StorageManager *storage_manager_;
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+  // If all an aggregate's argument expressions are simply attributes in
+  // 'input_relation_', then this caches the attribute IDs of those arguments.
+  std::vector<attribute_id> arguments_as_attributes_;
+#endif
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/678d94dc/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
new file mode 100644
index 0000000..cf92fe2
--- /dev/null
+++ b/storage/WindowAggregationOperationState.proto
@@ -0,0 +1,33 @@
+//   Copyright 2011-2015 Quickstep Technologies LLC.
+//   Copyright 2015-2016 Pivotal Software, Inc.
+//   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+//     University of Wisconsin\u2014Madison.
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/Expressions.proto";
+
+message WindowAggregationOperationState {
+  required int32 relation_id = 1;
+  required AggregateFunction function = 2;
+  repeated Scalar arguments = 3;
+  repeated Scalar partition_by_attributes = 4;
+  required bool is_row = 5;
+  required int64 num_preceding = 6;
+  required int64 num_following = 7;
+}