You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/01 17:12:17 UTC
[1/4] incubator-quickstep git commit: RANGE mode and computation
optimization. - Supported RANGE mode for window aggregation. - Optimized the
AVG calculation time complexity from O(nk) to O(n),
where n is the number of tuples and k is the window size. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/tmb-tagged-msg 126f0a25e -> ccea2ff83 (forced update)
RANGE mode and computation optimization.
- Supported RANGE mode for window aggregation.
- Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d0172fde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d0172fde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d0172fde
Branch: refs/heads/tmb-tagged-msg
Commit: d0172fde0cffbf10bc858090e11169e11834be89
Parents: e53186e
Author: shixuan <sh...@apache.org>
Authored: Tue Jul 26 11:49:07 2016 -0500
Committer: shixuan <sh...@apache.org>
Committed: Mon Aug 1 09:38:34 2016 -0500
----------------------------------------------------------------------
expressions/window_aggregation/CMakeLists.txt | 15 +-
.../WindowAggregateFunction.hpp | 19 +-
.../WindowAggregateFunctionAvg.cpp | 14 +-
.../WindowAggregateFunctionAvg.hpp | 6 +-
.../WindowAggregateFunctionCount.cpp | 6 +-
.../WindowAggregateFunctionCount.hpp | 6 +-
.../WindowAggregateFunctionMax.cpp | 6 +-
.../WindowAggregateFunctionMax.hpp | 6 +-
.../WindowAggregateFunctionMin.cpp | 6 +-
.../WindowAggregateFunctionMin.hpp | 6 +-
.../WindowAggregateFunctionSum.cpp | 6 +-
.../WindowAggregateFunctionSum.hpp | 6 +-
.../WindowAggregationHandle.cpp | 186 ++++++++++++++++
.../WindowAggregationHandle.hpp | 100 ++++++---
.../WindowAggregationHandleAvg.cpp | 201 ++++++-----------
.../WindowAggregationHandleAvg.hpp | 35 ++-
.../WindowAggregationHandleAvg_unittest.cpp | 220 +++++++++++++++----
query_optimizer/ExecutionGenerator.cpp | 11 +-
query_optimizer/resolver/Resolver.cpp | 19 +-
.../tests/execution_generator/Select.test | 41 +++-
storage/WindowAggregationOperationState.cpp | 69 +++---
storage/WindowAggregationOperationState.hpp | 9 +-
storage/WindowAggregationOperationState.proto | 1 +
23 files changed, 692 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
index 6a16fcc..3a79b7e 100644
--- a/expressions/window_aggregation/CMakeLists.txt
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -44,7 +44,7 @@ add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
WindowAggregateFunctionSum.cpp
WindowAggregateFunctionSum.hpp)
add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
- ../../empty_src.cpp
+ WindowAggregationHandle.cpp
WindowAggregationHandle.hpp)
add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
WindowAggregationHandleAvg.cpp
@@ -130,10 +130,17 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
glog
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_scalar_Scalar
quickstep_storage_StorageBlockInfo
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
quickstep_types_operations_comparisons_Comparison
quickstep_types_operations_comparisons_ComparisonFactory
quickstep_types_operations_comparisons_ComparisonID
@@ -141,8 +148,6 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
glog
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_scalar_Scalar
- quickstep_expressions_scalar_ScalarAttribute
quickstep_expressions_windowaggregation_WindowAggregationHandle
quickstep_storage_ValueAccessor
quickstep_types_Type
@@ -179,11 +184,13 @@ add_executable(WindowAggregationHandle_tests
target_link_libraries(WindowAggregationHandle_tests
gtest
gtest_main
+ quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_scalar_Scalar
+ quickstep_expressions_scalar_ScalarAttribute
quickstep_expressions_windowaggregation_WindowAggregateFunction
quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
quickstep_expressions_windowaggregation_WindowAggregationHandle
- quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
quickstep_expressions_windowaggregation_WindowAggregationID
quickstep_storage_ValueAccessor
quickstep_types_CharType
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index e40479b..7ffc4ae 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+#include <memory>
#include <string>
#include <vector>
@@ -32,6 +33,7 @@
namespace quickstep {
class CatalogRelationSchema;
+class Scalar;
class Type;
class WindowAggregationHandle;
@@ -120,16 +122,23 @@ class WindowAggregateFunction {
*
* @param argument_types A list of zero or more Types (in order) for
* arguments to this WindowAggregateFunction.
- * @param partition_key_types A list or zero or more Types for partition keys
- * to this WindowAggregateFunction.
+ * @param partition_by_attributes A list of attributes used as partition key.
+ * @param order_by_attributes A list of attributes used as order key.
+ * @param is_row True if the frame mode is ROWS, false if RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
*
* @return A new WindowAggregationHandle that can be used to compute this
- * WindowAggregateFunction over the specified argument_types. Caller
- * is responsible for deleting the returned object.
+ * WindowAggregateFunction over the specified window definition.
+ * Caller is responsible for deleting the returned object.
**/
virtual WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const = 0;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const = 0;
protected:
explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index bc31a53..beb1c7a 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -73,13 +73,21 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
<< " that AVG can not be applied to.";
- return new WindowAggregationHandleAvg(partition_key_types,
- *argument_types.front());
+ return new WindowAggregationHandleAvg(partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following,
+ argument_types[0]);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 32fd9d5..0e50415 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionAvg()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
index 504e000..ccd81ac 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
@@ -47,7 +47,11 @@ const Type* WindowAggregateFunctionCount::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionCount::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleCount for argument Types "
<< "that COUNT can not be applied to (> 1 argument).";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
index 1b40fdd..2e5506a 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionCount : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionCount()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
index f3997c7..acfce82 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
@@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMax::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionMax::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleMax for argument Type(s) "
<< "that MAX can not be applied to.";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
index 00c788e..a215703 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionMax : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionMax()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
index a13e28e..cd845bd 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
@@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMin::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionMin::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleMin for argument Type(s) "
<< "that MIN can not be applied to.";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
index aeba539..fab88a8 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionMin : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionMin()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
index 636c53a..e2aeb60 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
@@ -71,7 +71,11 @@ const Type* WindowAggregateFunctionSum::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionSum::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleSum for argument Type(s) "
<< "that SUM can not be applied to.";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
index 047113c..8d7d61d 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionSum : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionSum()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.cpp b/expressions/window_aggregation/WindowAggregationHandle.cpp
new file mode 100644
index 0000000..835eaff
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.cpp
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationHandle::WindowAggregationHandle(
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following)
+ : is_row_(is_row),
+ num_preceding_(num_preceding),
+ num_following_(num_following) {
+ // IDs and types of partition keys.
+ std::vector<const Type*> partition_key_types;
+ for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+ partition_key_ids_.push_back(
+ partition_by_attribute->getAttributeIdForValueAccessor());
+ partition_key_types.push_back(&partition_by_attribute->getType());
+ }
+
+ // Comparison operators for checking if two tuples belong to the same partition.
+ for (const Type *partition_key_type : partition_key_types) {
+ partition_equal_comparators_.emplace_back(
+ ComparisonFactory::GetComparison(ComparisonID::kEqual)
+ .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+ }
+
+ // IDs and types of order keys.
+ const Type *first_order_key_type = nullptr;
+ for (const std::unique_ptr<const Scalar> &order_by_attribute : order_by_attributes) {
+ order_key_ids_.push_back(
+ order_by_attribute->getAttributeIdForValueAccessor());
+ if (first_order_key_type == nullptr) {
+ first_order_key_type = &order_by_attribute->getType();
+ }
+ }
+
+ // ID and type of the first order key if in RANGE mode.
+ if (!is_row) {
+ DCHECK(first_order_key_type != nullptr);
+
+ // Comparators and operators to check window frame in RANGE mode.
+ const Type &long_type = TypeFactory::GetType(kLong, false);
+ range_compare_type_ =
+ TypeFactory::GetUnifyingType(*first_order_key_type, long_type);
+
+ range_add_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .makeUncheckedBinaryOperatorForTypes(*first_order_key_type, long_type));
+ range_comparator_.reset(
+ ComparisonFactory::GetComparison(ComparisonID::kLessOrEqual)
+ .makeUncheckedComparatorForTypes(*range_compare_type_, *range_compare_type_));
+ }
+}
+
+bool WindowAggregationHandle::samePartition(
+ const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const {
+ // If test tuple does not exist.
+ if (test_tuple_id < 0 ||
+ test_tuple_id >= tuple_accessor->getNumTuples()) {
+ return false;
+ }
+
+ // Check all partition by attributes.
+ for (std::size_t partition_by_index = 0;
+ partition_by_index < partition_key_ids_.size();
+ ++partition_by_index) {
+ if (!partition_equal_comparators_[partition_by_index]->compareTypedValues(
+ tuple_accessor->getTypedValue(partition_key_ids_[partition_by_index]),
+ tuple_accessor->getTypedValueAtAbsolutePosition(
+ partition_key_ids_[partition_by_index], test_tuple_id))) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool WindowAggregationHandle::inWindow(
+ const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const {
+ // If test tuple does not exist.
+ if (!samePartition(tuple_accessor, test_tuple_id)) {
+ return false;
+ }
+
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
+
+ // If test tuple is the current tuple, then it is in the window.
+ if (test_tuple_id == current_tuple_id) {
+ return true;
+ }
+
+ // In ROWS mode, check the difference of tuple_id.
+ if (is_row_) {
+ if (num_preceding_ != -1 &&
+ test_tuple_id < current_tuple_id - num_preceding_) {
+ return false;
+ }
+
+ if (num_following_ != -1 &&
+ test_tuple_id > current_tuple_id + num_following_) {
+ return false;
+ }
+ } else {
+ // In RANGE mode, check the difference of first order key value.
+ // Get the test value.
+ const Type &long_type = TypeFactory::GetType(kLong, false);
+ TypedValue test_value =
+ range_add_operator_->applyToTypedValues(
+ tuple_accessor->getTypedValueAtAbsolutePosition(order_key_ids_[0], test_tuple_id),
+ long_type.makeZeroValue());
+
+ // NULL will be considered not in range.
+ if (test_value.isNull() ||
+ tuple_accessor->getTypedValue(order_key_ids_[0]).isNull()) {
+ return false;
+ }
+
+ // Get the boundary value if it is not UNBOUNDED.
+ if (num_preceding_ > -1) {
+ // num_preceding needs to be negated for calculation.
+ std::int64_t neg_num_preceding = -num_preceding_;
+ TypedValue start_boundary_value =
+ range_add_operator_->applyToTypedValues(
+ tuple_accessor->getTypedValue(order_key_ids_[0]),
+ long_type.makeValue(&neg_num_preceding));
+ if (!range_comparator_->compareTypedValues(start_boundary_value, test_value)) {
+ return false;
+ }
+ }
+
+ if (num_following_ > -1) {
+ TypedValue end_boundary_value =
+ range_add_operator_->applyToTypedValues(
+ tuple_accessor->getTypedValue(order_key_ids_[0]),
+ long_type.makeValue(&num_following_));
+ if (!range_comparator_->compareTypedValues(test_value, end_boundary_value)) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 65f95d9..41d1d96 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -27,19 +27,23 @@
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/operations/comparisons/Comparison.hpp"
#include "types/operations/comparisons/ComparisonFactory.hpp"
#include "types/operations/comparisons/ComparisonID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
-class InsertDestinationInterface;
class Scalar;
-class StorageManager;
class Type;
class ValueAccessor;
@@ -55,27 +59,29 @@ class ValueAccessor;
*
* A WindowAggregationHandle is created by calling
* WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
- * provides methods that are used to actually compute the window aggregate,
- * storing intermediate results in WindowAggregationState objects.
+ * provides methods that are used to actually compute the window aggregate.
*
* The work flow for computing a window aggregate is:
- * 1. Create an initial state by createInitialState().
- * 2. One thread will handle all the computation, iterating from the first
+ * 1. One thread will handle all the computation, iterating from the first
* tuple to the last tuple. Note there will be two modes that could be
* used upon different situations:
* a. If the window aggregate is defined as accumulative, which are:
* i. Functions applied to whole partition, such as rank(), ntile()
- * and dense_rank().
+ * and dense_rank(). (Not implemented yet).
* ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
* AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
* FOLLOWING".
* Then, for functions except median, we could store some global
- * values in the state without keeping all the tuple values around.
+ * values without keeping all the tuple values around. For simplicity,
+ * in avg(), count() and sum(), we treat the accumulative one as
+ * sliding window since the time complexity does not vary.
* b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
- * 3 FOLLOWING", we have to store all the tuples in the state so that
+ * 3 FOLLOWING", we have to store all the tuples in the state (at
+ * least two pointers to the start tuple and end tuple), so that
* we could know which values should be dropped as the window slides.
- * For each computed value, generate a tuple store in the column vector.
- * 3. Insert the new column into the original relation and return.
+ * For each computed value, generate a TypedValue and store it into a
+ * ColumnVector for window aggregate values.
+ * 2. Return the result ColumnVector.
*
* TODO(Shixuan): Currently we don't support parallelization. The basic idea for
* parallelization is to calculate the partial result inside each block. Each
@@ -96,37 +102,67 @@ class WindowAggregationHandle {
*
* @param block_accessors A pointer to the value accessor of block attributes.
* @param arguments The ColumnVectors of arguments
- * @param partition_by_ids The ids of partition keys.
- * @param is_row True if the frame mode is ROWS, false if it is RANGE.
- * @param num_preceding The number of rows/range that precedes the current row.
- * @param num_following The number of rows/range that follows the current row.
*
* @return A ColumnVector of the calculated window aggregates.
**/
virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const = 0;
+ const std::vector<ColumnVector*> &arguments) const = 0;
protected:
/**
* @brief Constructor.
*
- * @param partition_key_types The Types of the partition key.
+ * @param partition_by_attributes A list of attributes used as partition key.
+ * @param order_by_attributes A list of attributes used as order key.
+ * @param is_row True if the frame mode is ROWS, false if RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ **/
+ WindowAggregationHandle(
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following);
+
+ /**
+ * @brief Check if test tuple is in the same partition as the current
+ * tuple in the accessor.
+ *
+ * @param tuple_accessor The ValueAccessor for tuples.
+ * @param test_tuple_id The id of the test tuple.
+ *
+ * @return True if test tuple is in the same partition as the current tuple in
+ * the accessor, false if not.
**/
- explicit WindowAggregationHandle(
- const std::vector<const Type*> &partition_key_types) {
- // Comparison operators for checking if two tuples belong to the same partition.
- for (const Type *partition_key_type : partition_key_types) {
- equal_comparators_.emplace_back(
- ComparisonFactory::GetComparison(ComparisonID::kEqual)
- .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
- }
- }
-
- std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+ bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const;
+
+ /**
+ * @brief Check if test tuple is in the defined range.
+ *
+ * @param tuple_accessor The ValueAccessor for tuples.
+ * @param test_tuple_id The id of the test tuple.
+ *
+ * @return True if test tuple is in the defined window, false if not.
+ **/
+ bool inWindow(const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const;
+
+ // IDs and comparators for partition keys.
+ std::vector<attribute_id> partition_key_ids_;
+ std::vector<std::unique_ptr<UncheckedComparator>> partition_equal_comparators_;
+
+ // IDs, type, Comparator and operator for frame boundary check in RANGE mode.
+ std::vector<attribute_id> order_key_ids_;
+ std::unique_ptr<UncheckedBinaryOperator> range_add_operator_;
+ std::unique_ptr<UncheckedComparator> range_comparator_; // Less than or Equal
+ const Type* range_compare_type_;
+
+ // Window frame information.
+ const bool is_row_;
+ const std::int64_t num_preceding_;
+ const std::int64_t num_following_;
private:
DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index a6a10d4..e6a4b3f 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -24,8 +24,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/scalar/Scalar.hpp"
-#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/Type.hpp"
#include "types/TypeFactory.hpp"
@@ -42,14 +41,21 @@
namespace quickstep {
WindowAggregationHandleAvg::WindowAggregationHandleAvg(
- const std::vector<const Type*> &partition_key_types,
- const Type &type)
- : WindowAggregationHandle(partition_key_types),
- argument_type_(type) {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ const Type *argument_type)
+ : WindowAggregationHandle(partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_id;
- switch (type.getTypeID()) {
+ switch (argument_type->getTypeID()) {
case kInt:
case kLong:
type_id = kLong;
@@ -59,7 +65,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
type_id = kDouble;
break;
default:
- type_id = type.getTypeID();
+ type_id = argument_type->getTypeID();
break;
}
@@ -76,7 +82,13 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
// Add operator for summing argument values.
fast_add_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
- .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
+
+ // Subtract operator for dropping argument values off the window.
+ fast_subtract_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kSubtract)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
+
// Divide operator for dividing sum by count to get final average.
divide_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
@@ -85,11 +97,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
ColumnVector* WindowAggregationHandleAvg::calculate(
ColumnVectorsValueAccessor *tuple_accessor,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const {
+ const std::vector<ColumnVector*> &arguments) const {
DCHECK_EQ(1u, arguments.size());
DCHECK(arguments[0]->isNative());
DCHECK_EQ(static_cast<std::size_t>(tuple_accessor->getNumTuples()),
@@ -98,144 +106,69 @@ ColumnVector* WindowAggregationHandleAvg::calculate(
// Initialize the output column and argument accessor.
NativeColumnVector *window_aggregates =
new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
- ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
+ ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
argument_accessor->addColumn(arguments[0]);
+ // Initialize the information about the window.
+ TypedValue sum = sum_type_->makeZeroValue();
+ std::uint64_t count = 0;
+ tuple_id start_tuple_id = 0; // The id of the first tuple in the window.
+ tuple_id end_tuple_id = 0; // The id of the tuple that just passed the last
+ // tuple in the window.
+
// Create a window for each tuple and calculate the window aggregate.
tuple_accessor->beginIteration();
argument_accessor->beginIteration();
while (tuple_accessor->next() && argument_accessor->next()) {
- const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
- argument_accessor,
- partition_by_ids,
- is_row,
- num_preceding,
- num_following);
- window_aggregates->appendTypedValue(window_aggregate);
- }
-
- return window_aggregates;
-}
-
-TypedValue WindowAggregationHandleAvg::calculateOneWindow(
- ColumnVectorsValueAccessor *tuple_accessor,
- ColumnVectorsValueAccessor *argument_accessor,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const {
- // Initialize.
- TypedValue sum = sum_type_->makeZeroValue();
- TypedValue current_value = argument_accessor->getTypedValue(0);
- std::uint64_t count = 0;
-
- // Ignore the value if null.
- if (!current_value.isNull()) {
- sum = fast_add_operator_->applyToTypedValues(sum, current_value);
- count++;
- }
-
- // Get the partition key for the current row.
- std::vector<TypedValue> current_row_partition_key;
- for (attribute_id partition_by_id : partition_by_ids) {
- current_row_partition_key.push_back(
- tuple_accessor->getTypedValue(partition_by_id));
- }
-
- // Get current position.
- tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
-
- // Find preceding tuples.
- int count_preceding = 0;
- tuple_id preceding_tuple_id = current_tuple_id;
- while (num_preceding == -1 || count_preceding < num_preceding) {
- preceding_tuple_id--;
-
- // No more preceding tuples.
- if (preceding_tuple_id < 0) {
- break;
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
+
+ // If current tuple is not in the same partition as the previous tuple,
+ // reset the window.
+ if (!samePartition(tuple_accessor, current_tuple_id - 1)) {
+ start_tuple_id = current_tuple_id;
+ end_tuple_id = current_tuple_id;
+ count = 0;
+ sum = sum_type_->makeZeroValue();
}
- // Get the partition keys and compare. If not the same partition as the
- // current row, stop searching preceding tuples.
- if (!samePartition(tuple_accessor,
- current_row_partition_key,
- preceding_tuple_id,
- partition_by_ids)) {
- break;
+ // Drop tuples that will be out of the window from the beginning.
+ while (!inWindow(tuple_accessor, start_tuple_id)) {
+ TypedValue start_value =
+ argument_accessor->getTypedValueAtAbsolutePosition(0, start_tuple_id);
+ // Ignore the value if NULL.
+ if (!start_value.isNull()) {
+ sum = fast_subtract_operator_->applyToTypedValues(sum, start_value);
+ count--;
+ }
+
+ start_tuple_id++;
}
- // Actually count the element and do the calculation.
- count_preceding++;
- TypedValue preceding_value =
- argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id);
+ // Add tuples that will be included by the window at the end.
+ while (inWindow(tuple_accessor, end_tuple_id)) {
+ TypedValue end_value =
+ argument_accessor->getTypedValueAtAbsolutePosition(0, end_tuple_id);
- // Ignore the value if null.
- if (!preceding_value.isNull()) {
- sum = fast_add_operator_->applyToTypedValues(sum, preceding_value);
- count++;
- }
- }
-
- // Find following tuples.
- int count_following = 0;
- tuple_id following_tuple_id = current_tuple_id;
- while (num_following == -1 || count_following < num_following) {
- following_tuple_id++;
+ // Ignore the value if NULL.
+ if (!end_value.isNull()) {
+ sum = fast_add_operator_->applyToTypedValues(sum, end_value);
+ count++;
+ }
- // No more following tuples.
- if (following_tuple_id == tuple_accessor->getNumTuples()) {
- break;
+ end_tuple_id++;
}
- // Get the partition keys and compare. If not the same partition as the
- // current row, stop searching preceding tuples.
- if (!samePartition(tuple_accessor,
- current_row_partition_key,
- following_tuple_id,
- partition_by_ids)) {
- break;
+ // If all values are NULLs, return NULL; Otherwise, return the quotient.
+ if (count == 0) {
+ window_aggregates->appendTypedValue(result_type_->makeNullValue());
+ } else {
+ window_aggregates->appendTypedValue(
+ divide_operator_->applyToTypedValues(sum, TypedValue(static_cast<double>(count))));
}
-
- // Actually count the element and do the calculation.
- count_following++;
- TypedValue following_value =
- argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id);
-
- // Ignore the value if null.
- if (!following_value.isNull()) {
- sum = fast_add_operator_->applyToTypedValues(sum, following_value);
- count++;
- }
- }
-
- // If all values are NULLs, return NULL; Otherwise, return the quotient.
- if (count == 0) {
- return result_type_->makeNullValue();
- } else {
- return divide_operator_->applyToTypedValues(sum,
- TypedValue(static_cast<double>(count)));
}
-}
-bool WindowAggregationHandleAvg::samePartition(
- const ColumnVectorsValueAccessor *tuple_accessor,
- const std::vector<TypedValue> ¤t_row_partition_key,
- const tuple_id boundary_tuple_id,
- const std::vector<attribute_id> &partition_by_ids) const {
- for (std::size_t partition_by_index = 0;
- partition_by_index < partition_by_ids.size();
- ++partition_by_index) {
- if (!equal_comparators_[partition_by_index]->compareTypedValues(
- current_row_partition_key[partition_by_index],
- tuple_accessor->getTypedValueAtAbsolutePosition(
- partition_by_ids[partition_by_index], boundary_tuple_id))) {
- return false;
- }
- }
-
- return true;
+ return window_aggregates;
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 5b41779..f7f2e4d 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -31,7 +31,6 @@
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/binary_operations/BinaryOperation.hpp"
-#include "types/operations/comparisons/Comparison.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -54,11 +53,7 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
~WindowAggregationHandleAvg() override {}
ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const override;
+ const std::vector<ColumnVector*> &arguments) const override;
private:
friend class WindowAggregateFunctionAvg;
@@ -66,29 +61,25 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
/**
* @brief Constructor.
*
- * @param partition_key_types The Types of the partition key.
- * @param type Type of the avg value.
+ * @param partition_by_attributes A list of attributes used as partition key.
+ * @param order_by_attributes A list of attributes used as order key.
+ * @param is_row True if the frame mode is ROWS, false if RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ * @param argument_type Type of the argument.
**/
- WindowAggregationHandleAvg(const std::vector<const Type*> &partition_key_types,
- const Type &type);
-
- TypedValue calculateOneWindow(
- ColumnVectorsValueAccessor *tuple_accessor,
- ColumnVectorsValueAccessor *argument_accessor,
- const std::vector<attribute_id> &partition_by_ids,
+ WindowAggregationHandleAvg(
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
- const std::int64_t num_following) const;
-
- bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
- const std::vector<TypedValue> ¤t_row_partition_key,
- const tuple_id boundary_tuple_id,
- const std::vector<attribute_id> &partition_by_ids) const;
+ const std::int64_t num_following,
+ const Type *argument_type);
- const Type &argument_type_;
const Type *sum_type_;
const Type *result_type_;
std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+ std::unique_ptr<UncheckedBinaryOperator> fast_subtract_operator_;
std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index c044a98..cb58083 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -23,11 +23,13 @@
#include <memory>
#include <vector>
+#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
-#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
#include "expressions/window_aggregation/WindowAggregationID.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/CharType.hpp"
@@ -58,6 +60,9 @@ namespace {
constexpr int kNullInterval = 25;
constexpr int kNumPreceding = 2;
constexpr int kNumFollowing = 2;
+ constexpr int kPartitionKeyIndex = 0;
+ constexpr int kOrderKeyIndex = 1;
+ constexpr int kNumTuplesPerOrderKey = 2;
} // namespace
@@ -65,12 +70,27 @@ namespace {
class WindowAggregationHandleAvgTest : public::testing::Test {
protected:
// Handle initialization.
- void initializeHandle(const Type &argument_type) {
+ WindowAggregationHandle* initializeHandle(const Type &argument_type,
+ const bool is_row = true,
+ const std::int64_t num_preceding = -1,
+ const std::int64_t num_following = 0) {
const WindowAggregateFunction &function =
WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
+ const Type &int_type = TypeFactory::GetType(kInt, false);
+ std::vector<std::unique_ptr<const Scalar>> partition_by_attributes;
+ std::vector<std::unique_ptr<const Scalar>> order_by_attributes;
+ partition_by_attributes.emplace_back(
+ new ScalarAttribute(CatalogAttribute(nullptr, "partition_key", int_type, kPartitionKeyIndex)));
+ order_by_attributes.emplace_back(
+ new ScalarAttribute(CatalogAttribute(nullptr, "order_key", int_type, kOrderKeyIndex)));
std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
- handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
- std::move(partition_key_types)));
+
+ return function.createHandle(std::vector<const Type*>(1, &argument_type),
+ partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following);
}
// Test canApplyToTypes().
@@ -117,24 +137,25 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
template <typename GenericType, typename OutputType = DoubleType>
void checkWindowAggregationAvgGeneric() {
- const GenericType &type = GenericType::Instance(true);
- initializeHandle(type);
-
// Create argument, partition key and cpptype vectors.
std::vector<typename GenericType::cpptype*> argument_cpp_vector;
argument_cpp_vector.reserve(kNumTuples);
ColumnVector *argument_type_vector =
createArgumentGeneric<GenericType>(&argument_cpp_vector);
NativeColumnVector *partition_key_vector =
- new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+ new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
+ NativeColumnVector *order_key_vector =
+ new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
for (int i = 0; i < kNumTuples; ++i) {
partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+ order_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerOrderKey));
}
// Create tuple ValueAccessor.
ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
tuple_accessor->addColumn(partition_key_vector);
+ tuple_accessor->addColumn(order_key_vector);
tuple_accessor->addColumn(argument_type_vector);
// Test UNBOUNDED PRECEDING AND CURRENT ROW.
@@ -182,45 +203,95 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
std::vector<ColumnVector*> arguments;
arguments.push_back(argument_type_vector);
- // The partition key index is 0.
- std::vector<attribute_id> partition_key(1, 0);
- ColumnVector *result =
- handle_avg_->calculate(tuple_accessor,
- std::move(arguments),
- partition_key,
- true /* is_row */,
- -1 /* num_preceding: UNBOUNDED PRECEDING */,
- 0 /* num_following: CURRENT ROW */);
+ // Check ROWS mode.
+ WindowAggregationHandle *rows_handle =
+ initializeHandle(GenericType::Instance(true),
+ true /* is_row */,
+ -1 /* num_preceding: UNBOUNDED PRECEDING */,
+ 0 /* num_following: CURRENT ROW */);
+ ColumnVector *rows_result =
+ rows_handle->calculate(tuple_accessor, arguments);
// Get the cpptype result.
- std::vector<typename OutputType::cpptype*> result_cpp_vector;
- typename GenericType::cpptype sum;
- int count;
+ std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
+ typename GenericType::cpptype rows_sum;
+ int rows_count;
for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
// Start of new partition
if (i % kNumTuplesPerPartition == 0) {
- SetDataType(0, &sum);
- count = 0;
+ SetDataType(0, &rows_sum);
+ rows_count = 0;
}
typename GenericType::cpptype *value = argument_cpp_vector[i];
if (value != nullptr) {
- sum += *value;
- count++;
+ rows_sum += *value;
+ rows_count++;
}
- if (count == 0) {
- result_cpp_vector.push_back(nullptr);
+ if (rows_count == 0) {
+ rows_result_cpp_vector.push_back(nullptr);
} else {
typename OutputType::cpptype *result_cpp_value =
new typename OutputType::cpptype;
- *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
- result_cpp_vector.push_back(result_cpp_value);
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(rows_sum) / rows_count;
+ rows_result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
+
+ CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+ // Check RANGE mode.
+ WindowAggregationHandle *range_handle =
+ initializeHandle(GenericType::Instance(true),
+ false /* is_row */,
+ -1 /* num_preceding: UNBOUNDED PRECEDING */,
+ 0 /* num_following: CURRENT ROW */);
+ ColumnVector *range_result =
+ range_handle->calculate(tuple_accessor, arguments);
+
+ // Get the cpptype result.
+ std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+ typename GenericType::cpptype range_sum;
+ int range_count;
+ std::size_t current_tuple = 0;
+ while (current_tuple < kNumTuples) {
+ // Start of new partition
+ if (current_tuple % kNumTuplesPerPartition == 0) {
+ SetDataType(0, &range_sum);
+ range_count = 0;
+ }
+
+ // We have to consider following tuples with the same order key value.
+ std::size_t next_tuple = current_tuple;
+ while (next_tuple < kNumTuples &&
+ next_tuple / kNumTuplesPerPartition == current_tuple / kNumTuplesPerPartition &&
+ next_tuple / kNumTuplesPerOrderKey == current_tuple / kNumTuplesPerOrderKey) {
+ typename GenericType::cpptype *value = argument_cpp_vector[next_tuple];
+ if (value != nullptr) {
+ range_sum += *value;
+ range_count++;
+ }
+
+ next_tuple++;
+ }
+
+ // Calculate the result cpp value.
+ typename OutputType::cpptype *result_cpp_value = nullptr;
+ if (range_count != 0) {
+ result_cpp_value = new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(range_sum) / range_count;
+ }
+
+ // Add the result values to the tuples with in the same order key value.
+ while (current_tuple != next_tuple) {
+ range_result_cpp_vector.push_back(result_cpp_value);
+ current_tuple++;
}
}
- CheckAvgValues(result_cpp_vector, result);
+ CheckAvgValues(range_result_cpp_vector, range_result);
}
template <typename GenericType, typename OutputType>
@@ -229,20 +300,19 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
std::vector<ColumnVector*> arguments;
arguments.push_back(argument_type_vector);
- // The partition key index is 0.
- std::vector<attribute_id> partition_key(1, 0);
- ColumnVector *result =
- handle_avg_->calculate(tuple_accessor,
- std::move(arguments),
- partition_key,
- true /* is_row */,
- kNumPreceding,
- kNumFollowing);
+ // Check ROWS mode.
+ WindowAggregationHandle *rows_handle =
+ initializeHandle(GenericType::Instance(true),
+ true /* is_row */,
+ kNumPreceding,
+ kNumFollowing);
+ ColumnVector *rows_result =
+ rows_handle->calculate(tuple_accessor, arguments);
// Get the cpptype result.
// For each value, calculate all surrounding values in the window.
- std::vector<typename OutputType::cpptype*> result_cpp_vector;
+ std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
typename GenericType::cpptype sum;
@@ -281,19 +351,81 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
}
if (count == 0) {
- result_cpp_vector.push_back(nullptr);
+ rows_result_cpp_vector.push_back(nullptr);
} else {
typename OutputType::cpptype *result_cpp_value =
new typename OutputType::cpptype;
*result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
- result_cpp_vector.push_back(result_cpp_value);
+ rows_result_cpp_vector.push_back(result_cpp_value);
}
}
- CheckAvgValues(result_cpp_vector, result);
- }
+ CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+ // Check RANGE mode.
+ WindowAggregationHandle *range_handle =
+ initializeHandle(GenericType::Instance(true),
+ false /* is_row */,
+ kNumPreceding,
+ kNumFollowing);
+ ColumnVector *range_result =
+ range_handle->calculate(tuple_accessor, arguments);
+
+ // Get the cpptype result.
+ // For each value, calculate all surrounding values in the window.
+ std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+
+ for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ int count = 0;
+
+ if (argument_cpp_vector[i] != nullptr) {
+ sum += *argument_cpp_vector[i];
+ count++;
+ }
+
+ int preceding_bound = i / kNumTuplesPerOrderKey - kNumPreceding;
+ for (std::size_t precede = 1; precede <= kNumTuples; ++precede) {
+ // Not in range or the same partition.
+ if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+ static_cast<int>((i - precede) / kNumTuplesPerOrderKey) < preceding_bound) {
+ break;
+ }
+
+ if (argument_cpp_vector[i - precede] != nullptr) {
+ sum += *argument_cpp_vector[i - precede];
+ count++;
+ }
+ }
+
+ int following_bound = i / kNumTuplesPerOrderKey + kNumFollowing;
+ for (int follow = 1; follow <= kNumTuples; ++follow) {
+ // Not in range or the same partition.
+ if (i + follow >= kNumTuples ||
+ i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+ static_cast<int>((i + follow) / kNumTuplesPerOrderKey) > following_bound) {
+ break;
+ }
+
+ if (argument_cpp_vector[i + follow] != nullptr) {
+ sum += *argument_cpp_vector[i + follow];
+ count++;
+ }
+ }
+
+ if (count == 0) {
+ rows_result_cpp_vector.push_back(nullptr);
+ } else {
+ typename OutputType::cpptype *result_cpp_value =
+ new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+ range_result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
- std::unique_ptr<WindowAggregationHandle> handle_avg_;
+ CheckAvgValues(range_result_cpp_vector, range_result);
+ }
};
template <>
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..88103df 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1663,7 +1663,7 @@ void ExecutionGenerator::convertWindowAggregate(
std::static_pointer_cast<const E::WindowAggregateFunction>(
named_window_aggregate_expression->expression());
- // Set the AggregateFunction.
+ // Set the WindowAggregateFunction.
window_aggr_state_proto->mutable_function()->MergeFrom(
window_aggregate_function->window_aggregate().getProto());
@@ -1683,6 +1683,15 @@ void ExecutionGenerator::convertWindowAggregate(
->MergeFrom(concretized_partition_by_attribute->getProto());
}
+ // Set order keys.
+ for (const E::ScalarPtr &order_by_attribute
+ : window_info.order_by_attributes) {
+ unique_ptr<const Scalar> concretized_order_by_attribute(
+ order_by_attribute->concretize(attribute_substitution_map_));
+ window_aggr_state_proto->add_order_by_attributes()
+ ->MergeFrom(concretized_order_by_attribute->getProto());
+ }
+
// Set window frame info.
if (window_info.frame_info == nullptr) {
// If the frame is not specified, use the default setting:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index c224388..46808bf 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -814,9 +814,9 @@ L::LogicalPtr Resolver::resolveInsertSelection(
cast_expressions.emplace_back(selection_attributes[aid]);
} else {
// TODO(jianqiao): implement Cast operation for non-numeric types.
- if (destination_type.getSuperTypeID() == Type::kNumeric
- && selection_type.getSuperTypeID() == Type::kNumeric
- && destination_type.isSafelyCoercibleFrom(selection_type)) {
+ if (destination_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+ selection_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+ destination_type.isSafelyCoercibleFrom(selection_type)) {
// Add cast operation
const E::AttributeReferencePtr attr = selection_attributes[aid];
const E::ExpressionPtr cast_expr =
@@ -1691,6 +1691,19 @@ E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window,
// Resolve window frame
if (parse_window.frame_info() != nullptr) {
const quickstep::ParseFrameInfo *parse_frame_info = parse_window.frame_info();
+ // For FRAME mode, the first attribute in ORDER BY must be numeric.
+ // TODO(Shixuan): Time-related types should also be supported. To handle
+ // this, some changes in the parser needs to be done since the time range
+ // should be specified with time units. Also, UNBOUNDED flags might be
+ // needed because -1 might not make sense in this case.
+ if (!parse_frame_info->is_row &&
+ (order_by_attributes.empty() ||
+ order_by_attributes[0]->getValueType().getSuperTypeID() != Type::SuperTypeID::kNumeric)) {
+ THROW_SQL_ERROR_AT(&parse_window)
+ << "A numeric attribute should be specified as the first ORDER BY "
+ << "attribute in FRAME mode";
+ }
+
frame_info = new E::WindowFrameInfo(parse_frame_info->is_row,
parse_frame_info->num_preceding,
parse_frame_info->num_following);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 30a3c39..6bada6c 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -1025,17 +1025,40 @@ WINDOW w AS
+--------------------+-----------+------------------------+
==
-# Currently this is not supported, an empty table will be returned.
-SELECT int_col, sum(float_col) OVER
-(PARTITION BY char_col, long_col
- ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
- RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+SELECT float_col, double_col, avg(double_col) OVER
+(ORDER BY float_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING)
FROM test;
--
-+-----------+------------------------+
-|int_col |sum(float_col) |
-+-----------+------------------------+
-+-----------+------------------------+
++---------------+------------------------+------------------------+
+|float_col |double_col |avg(double_col) |
++---------------+------------------------+------------------------+
+| 4.89897966| 117.57550765359254| -5.2010907233390986|
+| 4.79583168| -110.30412503619254| -3.3458568752518572|
+| 4.69041586| 103.18914671611546| -3.3458568752518572|
+| 4.5825758| -96.234089594072643| -4.2942570191393745|
+| 4.47213602| NULL| -4.2942570191393745|
+| 4.35889912| -82.81907992727281| -3.1771278735018194|
+| 4.2426405| 76.367532368147124| -3.1771278735018194|
+| 4.12310553| -70.092795635500224| -3.6217507631683268|
+| 4| 64| -3.0100796703699935|
+| 3.87298346| -58.094750193111253| -3.0100796703699935|
+| 3.7416575| 52.38320341483518| -3.0100796703699935|
+| 3.60555124| -46.872166581031856| -3.1193833079868254|
+| 3.46410155| 41.569219381653056| -3.1193833079868254|
+| 3.31662488| -36.4828726939094| -2.8361542397614437|
+| 3.1622777| NULL| -2.8361542397614437|
+| 3| -27| -2.7526926834086507|
+| 2.82842708| 22.627416997969522| -8.4826069851706123|
+| 2.64575124| -18.520259177452136| -9.0010404404476727|
+| 2.44948983| 14.696938456699067| -4.1547599319129516|
+| 2.23606801| -11.180339887498949| -4.2708832009567148|
+| 2| 8| 0.11724429467951912|
+| 1.73205078| -5.196152422706632| -4.7108157334609286|
+| 1.41421354| 2.8284271247461903| -5.1226841602152344|
+| 1| -1| -1.638218767582549|
+| 0| NULL| 1.1580686755098886|
++---------------+------------------------+------------------------+
==
SELECT sum(avg(int_col) OVER w) FROM test
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 0cdfc1a..49fa44d 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -56,15 +56,13 @@ WindowAggregationOperationState::WindowAggregationOperationState(
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
StorageManager *storage_manager)
: input_relation_(input_relation),
arguments_(std::move(arguments)),
- is_row_(is_row),
- num_preceding_(num_preceding),
- num_following_(num_following),
storage_manager_(storage_manager) {
// Get the Types of this window aggregate's arguments so that we can create an
// AggregationHandle.
@@ -76,18 +74,14 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Check if window aggregate function could apply to the arguments.
DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
- // IDs and types of partition keys.
- std::vector<const Type*> partition_by_types;
- for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
- partition_by_ids_.push_back(
- partition_by_attribute->getAttributeIdForValueAccessor());
- partition_by_types.push_back(&partition_by_attribute->getType());
- }
-
// Create the handle and initial state.
window_aggregation_handle_.reset(
- window_aggregate_function->createHandle(std::move(argument_types),
- std::move(partition_by_types)));
+ window_aggregate_function->createHandle(argument_types,
+ partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following));
}
WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -113,6 +107,15 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
database));
}
+ std::vector<std::unique_ptr<const Scalar>> order_by_attributes;
+ for (int attribute_idx = 0;
+ attribute_idx < proto.order_by_attributes_size();
+ ++attribute_idx) {
+ order_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+ proto.order_by_attributes(attribute_idx),
+ database));
+ }
+
const bool is_row = proto.is_row();
const std::int64_t num_preceding = proto.num_preceding();
const std::int64_t num_following = proto.num_following();
@@ -121,6 +124,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
&WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
std::move(arguments),
partition_by_attributes,
+ order_by_attributes,
is_row,
num_preceding,
num_following,
@@ -160,6 +164,15 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
}
}
+ for (int attribute_idx = 0;
+ attribute_idx < proto.order_by_attributes_size();
+ ++attribute_idx) {
+ if (!ScalarFactory::ProtoIsValid(proto.order_by_attributes(attribute_idx),
+ database)) {
+ return false;
+ }
+ }
+
if (proto.num_preceding() < -1 || proto.num_following() < -1) {
return false;
}
@@ -177,14 +190,6 @@ void WindowAggregationOperationState::windowAggregateBlocks(
return;
}
- // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
- // work. This will need Order Key to be passed so that we know where the
- // window should start and end.
- if (!is_row_) {
- std::cout << "Currently we don't support RANGE frame mode :(\n";
- return;
- }
-
// Get the total number of tuples.
int num_tuples = 0;
for (const block_id block_idx : block_ids) {
@@ -226,7 +231,11 @@ void WindowAggregationOperationState::windowAggregateBlocks(
block->getIndices(),
block->getIndicesConsistent());
ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
- ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+ ColumnVectorsValueAccessor *argument_accessor = nullptr;
+ if (!arguments_.empty()) {
+ argument_accessor = new ColumnVectorsValueAccessor();
+ }
+
for (const std::unique_ptr<const Scalar> &argument : arguments_) {
argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
&sub_block_ref));
@@ -235,9 +244,15 @@ void WindowAggregationOperationState::windowAggregateBlocks(
InvokeOnAnyValueAccessor(tuple_accessor,
[&] (auto *tuple_accessor) -> void { // NOLINT(build/c++11)
tuple_accessor->beginIteration();
- argument_accessor->beginIteration();
+ if (argument_accessor != nullptr) {
+ argument_accessor->beginIteration();
+ }
+
+ while (tuple_accessor->next()) {
+ if (argument_accessor != nullptr) {
+ argument_accessor->next();
+ }
- while (tuple_accessor->next() && argument_accessor->next()) {
for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
ColumnVector *attr_vec = attribute_vecs[attr_id];
if (attr_vec->isNative()) {
@@ -275,11 +290,7 @@ void WindowAggregationOperationState::windowAggregateBlocks(
// Do actual calculation in handle.
ColumnVector *window_aggregates =
window_aggregation_handle_->calculate(all_blocks_accessor,
- std::move(argument_vecs),
- partition_by_ids_,
- is_row_,
- num_preceding_,
- num_following_);
+ argument_vecs);
all_blocks_accessor->addColumn(window_aggregates);
output_destination->bulkInsertTuples(all_blocks_accessor);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index 9792a99..726b102 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -57,6 +57,7 @@ class WindowAggregationOperationState {
* computed.
* @param arguments A list of argument expressions to that aggregate.
* @param partition_by_attributes A list of window partition key.
+ * @param order_by_attributes A list of window order key.
* @param is_row True if the window frame is calculated by ROW, false if it is
* calculated by RANGE.
* @param num_preceding The number of rows/range for the tuples preceding the
@@ -69,6 +70,7 @@ class WindowAggregationOperationState {
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
@@ -120,13 +122,6 @@ class WindowAggregationOperationState {
const std::vector<block_id> block_ids_;
std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
std::vector<std::unique_ptr<const Scalar>> arguments_;
- std::vector<attribute_id> partition_by_ids_;
-
- // Frame info.
- const bool is_row_;
- const std::int64_t num_preceding_;
- const std::int64_t num_following_;
-
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index d888461..f879713 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -30,4 +30,5 @@ message WindowAggregationOperationState {
required bool is_row = 5;
required int64 num_preceding = 6; // -1 means UNBOUNDED PRECEDING.
required int64 num_following = 7; // -1 means UNBOUNDED FOLLOWING.
+ repeated Scalar order_by_attributes = 8;
}
[2/4] incubator-quickstep git commit: Update the NOTICE file to
acknowledge all the copyrights
Posted by zu...@apache.org.
Update the NOTICE file to acknowledge all the copyrights
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/260b8624
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/260b8624
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/260b8624
Branch: refs/heads/tmb-tagged-msg
Commit: 260b8624a8a1f2161e8c38fe56bd00ae1bfb579b
Parents: d0172fd
Author: Jignesh Patel <jm...@hotmail.com>
Authored: Mon Aug 1 09:56:44 2016 -0500
Committer: Jignesh Patel <jm...@hotmail.com>
Committed: Mon Aug 1 09:56:44 2016 -0500
----------------------------------------------------------------------
NOTICE | 36 ++++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/260b8624/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 9cfd585..32db8b8 100644
--- a/NOTICE
+++ b/NOTICE
@@ -6,3 +6,39 @@ The Apache Software Foundation (http://www.apache.org/).
Portions Copyright (c) 2011-2015, Quickstep Technologies, LLC.
Portions Copyright (c) 2015-2016, Pivotal Software, Inc.
+
+[Copyright for third_party/benchmark]
+Portions Copyright (c) Arne Beer <ar...@twobeer.de>
+Portions Copyright (c) Christopher Seymour <ch...@hotmail.com>
+Portions Copyright (c) David Coeurjolly <da...@liris.cnrs.fr>
+Portions Copyright (c) Dominic Hamon <dm...@stripysock.com>
+Portions Copyright (c) Eugene Zhuk <eu...@gmail.com>
+Portions Copyright (c) Evgeny Safronov <di...@gmail.com>
+Portions Copyright (c) Felix Homann <li...@showlabor.de>
+Portions Copyright (c) Google Inc.
+Portions Copyright (c) JianXiong Zhou <zh...@gmail.com>
+Portions Copyright (c) Lei Xu <ed...@gmail.com>
+Portions Copyright (c) Matt Clarkson <ma...@gmail.com>
+Portions Copyright (c) Oleksandr Sochka <sa...@gmail.com>
+Portions Copyright (c) Paul Redmond <pa...@gmail.com>
+Portions Copyright (c) Shuo Chen <ch...@chenshuo.com>
+Portions Copyright (c) Yusuke Suzuki <ut...@gmail.com>
+
+[Copyright for third_party/cpplint]
+Portions Copyright (c) 2009 Google Inc
+
+[Copyright for third_party/farmhash]
+Copyright (c) 2014 Google, Inc.
+
+[Copyright for third_party/gflags]
+Copyright (c) 2006, Google Inc.
+
+[Copyright for third_party/glog]
+Copyright (c) 2008, Google Inc.
+
+[Copyright for third_party/gpertools]
+Copyright (c) 2005, Google Inc.
+
+[Copyright for third_party/linenoise]
+Copyright (c) 2010-2014, Salvatore Sanfilippo <antirez at gmail dot com>
+Copyright (c) 2010-2013, Pieter Noordhuis <pcnoordhuis at gmail dot com>
[4/4] incubator-quickstep git commit: Cleaned up the messages w/ a
dummy payload.
Posted by zu...@apache.org.
Cleaned up the messages w/ a dummy payload.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccea2ff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccea2ff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccea2ff8
Branch: refs/heads/tmb-tagged-msg
Commit: ccea2ff83ea73e950d52c152cc422a9e93cf6aad
Parents: 52a32a3
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 29 23:52:33 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 1 10:11:19 2016 -0700
----------------------------------------------------------------------
query_execution/ForemanSingleNode.cpp | 4 +-
query_execution/QueryExecutionMessages.proto | 4 -
query_execution/QueryExecutionUtil.hpp | 5 +-
query_execution/README.md | 110 +++++++++----------
query_execution/WorkerMessage.hpp | 13 +--
query_execution/tests/BlockLocator_unittest.cpp | 11 +-
storage/tests/DataExchange_unittest.cpp | 11 +-
7 files changed, 60 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index cda02a7..d2b56ae 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -167,9 +167,7 @@ void ForemanSingleNode::run() {
if (!policy_enforcer_->hasQueries()) {
// Signal the main thread that there are no queries to be executed.
// Currently the message doesn't have any real content.
- const int dummy_payload = 0;
- TaggedMessage completion_tagged_message(
- &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
+ TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 308d736..f2219f6 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,10 +20,6 @@ import "catalog/Catalog.proto";
import "query_execution/QueryContext.proto";
import "relational_operators/WorkOrder.proto";
-// Used for any messages that do not carry payloads.
-message EmptyMessage {
-}
-
// Note: There are different types of completion messages for normal work orders
// rebuild work orders. This can be potentially helpful when we want to collect
// different statistics for executing different types of work orders.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 6ea4a29..5994f22 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -123,10 +123,7 @@ class QueryExecutionUtil {
style.Broadcast(true);
Address address;
address.All(true);
- std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
- TaggedMessage poison_tagged_message(poison_message.get(),
- sizeof(*poison_message),
- kPoisonMessage);
+ TaggedMessage poison_tagged_message(kPoisonMessage);
const tmb::MessageBus::SendStatus send_status = bus->Send(
sender_id, address, style, std::move(poison_tagged_message));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/README.md
----------------------------------------------------------------------
diff --git a/query_execution/README.md b/query_execution/README.md
index 22ad91d..12e0f57 100644
--- a/query_execution/README.md
+++ b/query_execution/README.md
@@ -3,19 +3,19 @@
## Types of threads
There are two kinds of threads in Quickstep - Foreman and Worker. The foreman
thread controls the query execution progress, finds schedulable work (called as
-WorkOrder) and assigns (or schedules) it for execution to the Worker threads.
+WorkOrder) and assigns (or schedules) it for execution to the Worker threads.
The Worker threads receive the WorkOrders and execute them. After execution they
-send a completion message (or response message) back to Foreman.
+send a completion message (or response message) back to Foreman.
## High level functionality of Foreman
-Foreman requests all the RelationalOperators in the physical query plan
+Foreman requests all the RelationalOperators in the physical query plan
represented as a DAG to give any schedulable work (in the form of WorkOrders).
While doing so, Foreman has to respect dependencies between operators. There are
-two kinds of dependencies between operators - pipeline breaking (or blocking)
+two kinds of dependencies between operators - pipeline breaking (or blocking)
and pipeline non-breaking (or non-blocking). In the first case, the output of
the producer operator can't be pipelined to the consumer operator. In the second
case, the Foreman will facilitate the pipelining of the intermediate output
-produced by the producer operator to the consumer operator.
+produced by the producer operator to the consumer operator.
## Messages in execution engine
@@ -26,110 +26,110 @@ of the message.
Foreman -> Worker : WorkerMessage which consists of the following things
- A pointer to the WorkOrder to be executed. The WorkOrder could be a normal
WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of
-WorkOrder::execute() method which is overriden by all of the RelationalOperator
-classes. A rebuild WorkOrder has one StorageBlock as input and calls a
+WorkOrder::execute() method which is overriden by all of the RelationalOperator
+classes. A rebuild WorkOrder has one StorageBlock as input and calls a
rebuild() method on the block. More details about rebuild() can be found in the
-storage module.
+storage module.
- The index of the relational operator in the query plan DAG that produced the
-WorkOrder.
-
-Main thread -> Worker : WorkerMessage of type PoisonMessage. This message is
-used to terminate the Worker thread, typically when shutting down the Quickstep
-process.
+WorkOrder.
### ForemanMessage
-Multiple senders are possible for this message. There are multiple types of
+Multiple senders are possible for this message. There are multiple types of
ForemanMessages, each of which indicates the purpose of the message.
-Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and
-RebuildCompletion are sent after a Worker finishes executing a respective type
-of WorkOrder. This message helps the Foreman track the progress of individual
-operators as well as the whole query.
+Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and
+RebuildCompletion are sent after a Worker finishes executing a respective type
+of WorkOrder. This message helps the Foreman track the progress of individual
+operators as well as the whole query.
Some relational operators and InsertDestination -> Foreman : ForemanMessage of
-types DataPipeline and WorkOrdersAvailable. InsertDestination first determines
+types DataPipeline and WorkOrdersAvailable. InsertDestination first determines
when an output block of a relational operator gets full. Once a block is full,
-it streams the unique block ID of the filled block along with the index of the
-relational operator that produced the block to Foreman with the message type
+it streams the unique block ID of the filled block along with the index of the
+relational operator that produced the block to Foreman with the message type
DataPipeline. Some operators which modify the block in place also send similar
-messages to Foreman.
+messages to Foreman.
### FeedbackMessage
This message is sent from Workers to the Foreman during a WorkOrder execution.
In certain operators, e.g. TextScan (used for bulk loading data from text files)
-and Sort, there is a communication between the relational operator and its
-WorkOrders. In such cases, when a WorkOrder is under execution on a Worker
+and Sort, there is a communication between the relational operator and its
+WorkOrders. In such cases, when a WorkOrder is under execution on a Worker
thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman.
Foreman relays this message to the relational operator that produced the sender
-WorkOrder. The relational operator uses this message to update its internal
-state to potentially generate newer WorkOrders.
+WorkOrder. The relational operator uses this message to update its internal
+state to potentially generate newer WorkOrders.
+
+### PoisonMessage
+This message is used to terminate a thread (i.e., Foreman and Worker), typically
+when shutting down the Quickstep process.
## How does the Foreman react after receiving various messages?
### WorkOrder completion message
* Update the book-keeping of pending WorkOrders per Worker and per operator.
* Fetch new WorkOrders if available for the operator of whose WorkOrder was
-just executed.
+just executed.
* Update the state of an operator - the possible options are:
- Normal WorkOrders are still under execution
- All normal WorkOrders have finished execution and rebuild WorkOrders are yet
- to be generated.
+ to be generated.
- All normal WorkOrders have finished execution, rebuild WorkOrders have been
- generated and issued to Workers.
+ generated and issued to Workers.
- All normal and rebuild WorkOrders have been executed AND all the dependency
- operators for the given operator have finished execution, therefore the given
- operator has finished its execution.
-* Fetch the WorkOrders from the dependents of the given operator.
+ operators for the given operator have finished execution, therefore the given
+ operator has finished its execution.
+* Fetch the WorkOrders from the dependents of the given operator.
### Rebuild WorkOrder completion message
* Update the book-keeping of pending WorkOrders per Worker and per operator.
* If all the rebuild WorkOrders have finished their execution, try to fetch the
WorkOrders of the dependent operators of the operator whose rebuild WorkOrder
-was just executed.
+was just executed.
### Data pipeline message
-* Find the consumer operators (i.e. operators which have a non
-pipeline-breaking link) of the producer operator.
-* Stream the block ID to the eligible consumer operators.
-* Fetch new WorkOrders from these consumer operators which may have become
-available because of the streaming of data.
+* Find the consumer operators (i.e. operators which have a non
+pipeline-breaking link) of the producer operator.
+* Stream the block ID to the eligible consumer operators.
+* Fetch new WorkOrders from these consumer operators which may have become
+available because of the streaming of data.
### WorkOrder available message
* Fetch new WorkOrders that may have become available.
### Feedback message
-* Relay the feedback message to a specified relational operator. The recipient
-operator is specified in the header of the message.
+* Relay the feedback message to a specified relational operator. The recipient
+operator is specified in the header of the message.
## Example
-We look at a sample query to better describe the flow of messages -
+We look at a sample query to better describe the flow of messages -
SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20;
-This is an equi-join query which can be implemented using a hash join. We assume
-that S is a larger relation and the build relation is the output of the
+This is an equi-join query which can be implemented using a hash join. We assume
+that S is a larger relation and the build relation is the output of the
selection on R.
The query execution plan involves the following operators:
-* SelectOperator to filter R based on predicate R.c < 20 (We call the output as
-R')
+* SelectOperator to filter R based on predicate R.c < 20 (We call the output as
+R')
* BuildHashOperator to construct a hash table on R'
* HashJoinOperator to probe the hash table, where the probe relation is S
* DestroyHashTableOperator to destroy the hash table after the join is done
-* Multiple DropTableOperators to destroy the temporaray relations produced as
-output.
+* Multiple DropTableOperators to destroy the temporaray relations produced as
+output.
R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4.
-We assume that the SelectOperator produces one filled block and one partially
-filled block as output. Note that in the query plan DAG, the link between
-SelectOperator and BuildHashOperator allows streaming of data. The
-HashJoinOperator's WorkOrder can't be generated unless all of the
+We assume that the SelectOperator produces one filled block and one partially
+filled block as output. Note that in the query plan DAG, the link between
+SelectOperator and BuildHashOperator allows streaming of data. The
+HashJoinOperator's WorkOrder can't be generated unless all of the
BuildHashOperator's WorkOrders have finished their execution. The execution is
-assumed to be performed by a single Worker thread.
+assumed to be performed by a single Worker thread.
-The following table describes the message exchange that happens during the
-query excution. We primarily focus on three operators - Select, BuildHash and
-HashJoin (probe).
+The following table describes the message exchange that happens during the
+query excution. We primarily focus on three operators - Select, BuildHash and
+HashJoin (probe).
| Sender | Receiver | Message | Message Description |
|:-----------------:|----------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
index 560c1ba..a0434de 100644
--- a/query_execution/WorkerMessage.hpp
+++ b/query_execution/WorkerMessage.hpp
@@ -35,7 +35,6 @@ class WorkerMessage {
enum class WorkerMessageType {
kRebuildWorkOrder = 0,
kWorkOrder,
- kPoison
};
/**
@@ -70,15 +69,6 @@ class WorkerMessage {
}
/**
- * @brief A static factory method for generating a poison message.
- *
- * @return The constructed PoisonMessage.
- **/
- static WorkerMessage* PoisonMessage() {
- return new WorkerMessage(nullptr, 0, WorkerMessageType::kPoison);
- }
-
- /**
* @brief Destructor.
**/
~WorkerMessage() {
@@ -128,8 +118,7 @@ class WorkerMessage {
/**
* @brief Constructor.
*
- * @param work_unit The work order to be executed by the worker. A NULL
- * workorder indicates a poison message.
+ * @param work_unit The work order to be executed by the worker.
* @param relational_op_index The index of the relational operator in the
* query plan DAG that generated the given WorkOrder.
* @param type Type of the WorkerMessage.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index fe7b86b..fd25e9e 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -90,16 +90,7 @@ class BlockLocatorTest : public ::testing::Test {
virtual void TearDown() {
storage_manager_.reset();
- serialization::EmptyMessage proto;
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kPoisonMessage);
- free(proto_bytes);
+ TaggedMessage message(kPoisonMessage);
LOG(INFO) << "Worker (id '" << worker_client_id_
<< "') sent PoisonMessage (typed '" << kPoisonMessage
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
index 38d12f6..4bad17b 100644
--- a/storage/tests/DataExchange_unittest.cpp
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -105,16 +105,7 @@ class DataExchangeTest : public ::testing::Test {
data_exchanger_expected_.shutdown();
storage_manager_expected_.reset();
- serialization::EmptyMessage proto;
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kPoisonMessage);
- free(proto_bytes);
+ TaggedMessage message(kPoisonMessage);
LOG(INFO) << "Worker (id '" << worker_client_id_
<< "') sent PoisonMessage (typed '" << kPoisonMessage
[3/4] incubator-quickstep git commit: TMB: Added Support for an Empty,
but Typed TaggedMessage.
Posted by zu...@apache.org.
TMB: Added Support for an Empty, but Typed TaggedMessage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/52a32a37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/52a32a37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/52a32a37
Branch: refs/heads/tmb-tagged-msg
Commit: 52a32a372612a3e8f5ab268886d4b9cbb546c205
Parents: 260b862
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 29 19:57:49 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 1 10:11:19 2016 -0700
----------------------------------------------------------------------
third_party/tmb/include/tmb/tagged_message.h | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52a32a37/third_party/tmb/include/tmb/tagged_message.h
----------------------------------------------------------------------
diff --git a/third_party/tmb/include/tmb/tagged_message.h b/third_party/tmb/include/tmb/tagged_message.h
index 49dcee7..f3a77ee 100644
--- a/third_party/tmb/include/tmb/tagged_message.h
+++ b/third_party/tmb/include/tmb/tagged_message.h
@@ -63,6 +63,15 @@ class TaggedMessage {
}
/**
+ * @brief Constructor which creates an empty, but typed message.
+ **/
+ explicit TaggedMessage(const message_type_id message_type)
+ : payload_inline_(true),
+ message_type_(message_type) {
+ payload_.in_line.size = 0;
+ }
+
+ /**
* @brief Constructor.
*
* @param msg A pointer to the message contents in memory, which will be