You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/04/12 19:36:05 UTC
[2/5] incubator-quickstep git commit: Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 0e75411..cd376c1 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -32,6 +32,7 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
@@ -532,6 +533,7 @@ void HashInnerJoinWorkOrder::executeWithoutCopyElision(ValueAccessor *probe_acce
}
ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (auto selection_cit = selection_.begin();
selection_cit != selection_.end();
++selection_cit) {
@@ -539,8 +541,10 @@ void HashInnerJoinWorkOrder::executeWithoutCopyElision(ValueAccessor *probe_acce
build_accessor.get(),
probe_relation_id,
probe_accessor,
- build_block_entry.second));
+ build_block_entry.second,
+ scalar_cache.get()));
}
+ scalar_cache.reset();
output_destination_->bulkInsertTuples(&temp_result);
}
@@ -649,12 +653,14 @@ void HashInnerJoinWorkOrder::executeWithCopyElision(ValueAccessor *probe_accesso
zipped_joined_tuple_ids.emplace_back(build_tids[i], probe_tids[i]);
}
+ ScalarCache scalar_cache;
for (const Scalar *scalar : non_trivial_expressions) {
temp_result.addColumn(scalar->getAllValuesForJoin(build_relation_id,
build_accessor.get(),
probe_relation_id,
probe_accessor,
- zipped_joined_tuple_ids));
+ zipped_joined_tuple_ids,
+ &scalar_cache));
}
}
@@ -765,13 +771,16 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
probe_store.createValueAccessor(&filter));
+
ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end();
++selection_it) {
temp_result.addColumn((*selection_it)->getAllValues(
- probe_accessor_with_filter.get(), &sub_blocks_ref));
+ probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
}
+ scalar_cache.reset();
output_destination_->bulkInsertTuples(&temp_result);
}
@@ -828,12 +837,15 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end(); ++selection_it) {
temp_result.addColumn((*selection_it)->getAllValues(
- probe_accessor_with_filter.get(), &sub_blocks_ref));
+ probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
}
+ scalar_cache.reset();
output_destination_->bulkInsertTuples(&temp_result);
}
@@ -886,12 +898,15 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end(); ++selection_it) {
temp_result.addColumn((*selection_it)->getAllValues(
- probe_accessor_with_filter.get(), &sub_blocks_ref));
+ probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
}
+ scalar_cache.reset();
output_destination_->bulkInsertTuples(&temp_result);
}
@@ -976,14 +991,18 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end();
++selection_it) {
temp_result.addColumn(
(*selection_it)->getAllValues(probe_accessor_with_filter.get(),
- &sub_blocks_ref));
+ &sub_blocks_ref,
+ scalar_cache.get()));
}
+ scalar_cache.reset();
output_destination_->bulkInsertTuples(&temp_result);
}
@@ -1032,12 +1051,11 @@ void HashOuterJoinWorkOrder::execute() {
&build_block_entry : *collector.getJoinedTupleMap()) {
const BlockReference build_block =
storage_manager_->getBlock(build_block_entry.first, build_relation_);
- const TupleStorageSubBlock &build_store =
- build_block->getTupleStorageSubBlock();
+ const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
- std::unique_ptr<ValueAccessor> build_accessor(
- build_store.createValueAccessor());
ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (auto selection_it = selection_.begin();
selection_it != selection_.end();
++selection_it) {
@@ -1047,8 +1065,11 @@ void HashOuterJoinWorkOrder::execute() {
build_accessor.get(),
probe_relation_id,
probe_accessor.get(),
- build_block_entry.second));
+ build_block_entry.second,
+ scalar_cache.get()));
}
+ scalar_cache.reset();
+
output_destination_->bulkInsertTuples(&temp_result);
}
@@ -1061,8 +1082,9 @@ void HashOuterJoinWorkOrder::execute() {
if (num_tuples_without_matches > 0) {
std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
- ColumnVectorsValueAccessor temp_result;
+ ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (std::size_t i = 0; i < selection_.size(); ++i) {
if (is_selection_on_build_[i]) {
// NOTE(harshad, jianqiao): The assumption here is that any operation
@@ -1090,9 +1112,12 @@ void HashOuterJoinWorkOrder::execute() {
} else {
temp_result.addColumn(
selection_[i]->getAllValues(probe_accessor_with_filter.get(),
- &sub_blocks_ref));
+ &sub_blocks_ref,
+ scalar_cache.get()));
}
}
+ scalar_cache.reset();
+
output_destination_->bulkInsertTuples(&temp_result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/relational_operators/NestedLoopsJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp
index f17402f..a6bacc7 100644
--- a/relational_operators/NestedLoopsJoinOperator.cpp
+++ b/relational_operators/NestedLoopsJoinOperator.cpp
@@ -27,6 +27,7 @@
#include "catalog/CatalogRelationSchema.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
@@ -417,6 +418,7 @@ void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_st
// evaluation and data movement, but low enough that temporary memory
// requirements don't get out of hand).
ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
selection_cit != selection_.end();
++selection_cit) {
@@ -424,8 +426,10 @@ void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_st
left_accessor.get(),
right_input_relation_id,
right_accessor.get(),
- joined_tuple_ids));
+ joined_tuple_ids,
+ scalar_cache.get()));
}
+ scalar_cache.reset();
output_destination_->bulkInsertTuples(&temp_result);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 90543c4..facc7fa 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -38,6 +38,7 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
#include "storage/AggregationOperationState.pb.h"
#include "storage/CollisionFreeVectorTable.hpp"
#include "storage/HashTableFactory.hpp"
@@ -491,9 +492,10 @@ void AggregationOperationState::aggregateBlock(const block_id input_block,
SubBlocksReference sub_blocks_ref(tuple_store,
block->getIndices(),
block->getIndicesConsistent());
+ ScalarCache scalar_cache;
for (const auto &expression : non_trivial_expressions_) {
non_trivial_results->addColumn(
- expression->getAllValues(accessor, &sub_blocks_ref));
+ expression->getAllValues(accessor, &sub_blocks_ref, &scalar_cache));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index cb1f098..0a1d484 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -276,6 +276,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
+ quickstep_expressions_scalar_ScalarCache
quickstep_storage_AggregationOperationState_proto
quickstep_storage_CollisionFreeVectorTable
quickstep_storage_HashTableBase
@@ -936,6 +937,7 @@ target_link_libraries(quickstep_storage_StorageBlock
quickstep_catalog_CatalogTypedefs
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
+ quickstep_expressions_scalar_ScalarCache
quickstep_storage_BasicColumnStoreTupleStorageSubBlock
quickstep_storage_BloomFilterIndexSubBlock
quickstep_storage_CSBTreeIndexSubBlock
@@ -1086,6 +1088,7 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
quickstep_expressions_Expressions_proto
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_scalar_ScalarCache
quickstep_expressions_windowaggregation_WindowAggregateFunction
quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
quickstep_expressions_windowaggregation_WindowAggregationHandle
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index e91c1ac..d724317 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -30,6 +30,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
#include "storage/BasicColumnStoreTupleStorageSubBlock.hpp"
#include "storage/BloomFilterIndexSubBlock.hpp"
#include "storage/CSBTreeIndexSubBlock.hpp"
@@ -369,15 +370,18 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
indices_,
indices_consistent_);
- std::unique_ptr<ValueAccessor> accessor(
- tuple_store_->createValueAccessor(filter));
+ std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
+ ScalarCache scalar_cache;
for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
selection_cit != selection.end();
++selection_cit) {
// TODO(chasseur): Can probably elide some copies for parts of the
// selection that are ScalarAttribute or ScalarLiteral.
- temp_result.addColumn((*selection_cit)->getAllValues(accessor.get(), &sub_blocks_ref));
+ temp_result.addColumn(
+ (*selection_cit)->getAllValues(accessor.get(),
+ &sub_blocks_ref,
+ &scalar_cache));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 58bdf18..2c571ef 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -33,6 +33,7 @@
#include "expressions/Expressions.pb.h"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
@@ -236,11 +237,16 @@ void WindowAggregationOperationState::windowAggregateBlocks(
argument_accessor = new ColumnVectorsValueAccessor();
}
+ std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
for (const std::unique_ptr<const Scalar> &argument : arguments_) {
argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
- &sub_block_ref));
+ &sub_block_ref,
+ scalar_cache.get()));
}
+ // Release common subexpression cache as early as possible.
+ scalar_cache.reset();
+
InvokeOnAnyValueAccessor(tuple_accessor,
[&] (auto *tuple_accessor) -> void { // NOLINT(build/c++11)
tuple_accessor->beginIteration();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/types/containers/ColumnVector.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp
index fc65656..430a844 100644
--- a/types/containers/ColumnVector.hpp
+++ b/types/containers/ColumnVector.hpp
@@ -43,6 +43,9 @@ namespace quickstep {
// TODO(chasseur): Look into ways to allocate ColumnVector memory from the
// StorageManager.
+class ColumnVector;
+typedef std::shared_ptr<const ColumnVector> ColumnVectorPtr;
+
/**
* @brief A vector of values of the same type. Two implementations exist:
* NativeColumnVector (an array of fixed-size data elements) and
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index 6dc1124..d9cf49d 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -74,22 +74,23 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
* this value-accessor is responsible for freeing this column
* vector.
**/
- void addColumn(ColumnVector *column, const bool owns = true) {
+ void addColumn(ColumnVectorPtr column) {
// If this is not the first column to be added, make sure it is the same
// length as the others.
DCHECK(columns_.empty()
|| (column->isNative()
- ? (static_cast<const NativeColumnVector*>(column)->size() == column_length_)
- : (static_cast<const IndirectColumnVector*>(column)->size() == column_length_)));
+ ? (static_cast<const NativeColumnVector*>(column.get())->size() == column_length_)
+ : (static_cast<const IndirectColumnVector*>(column.get())->size() == column_length_)));
columns_.push_back(column);
column_native_.push_back(column->isNative());
- if (owns) {
- deleter_.addObject(column);
- }
- column_length_
- = column->isNative()
- ? static_cast<const NativeColumnVector*>(column)->size()
- : static_cast<const IndirectColumnVector*>(column)->size();
+ column_length_ =
+ column->isNative()
+ ? static_cast<const NativeColumnVector*>(column.get())->size()
+ : static_cast<const IndirectColumnVector*>(column.get())->size();
+ }
+
+ void addColumn(ColumnVector *column) {
+ addColumn(ColumnVectorPtr(column));
}
inline void beginIteration() {
@@ -309,11 +310,10 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
&& (static_cast<std::vector<ColumnVector*>::size_type>(attr_id) < columns_.size());
}
- std::vector<ColumnVector*> columns_;
+ std::vector<ColumnVectorPtr> columns_;
std::vector<bool> column_native_;
std::size_t column_length_;
std::size_t current_position_;
- ScopedDeleter deleter_;
DISALLOW_COPY_AND_ASSIGN(ColumnVectorsValueAccessor);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ca04462..ea9ee43 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -182,6 +182,7 @@ add_library(quickstep_utility_ExecutionDAGVisualizer
ExecutionDAGVisualizer.cpp
ExecutionDAGVisualizer.hpp)
add_library(quickstep_utility_Glob Glob.cpp Glob.hpp)
+add_library(quickstep_utility_HashError ../empty_src.cpp HashError.hpp)
add_library(quickstep_utility_HashPair ../empty_src.cpp HashPair.hpp)
add_library(quickstep_utility_Macros ../empty_src.cpp Macros.hpp)
add_library(quickstep_utility_MemStream ../empty_src.cpp MemStream.hpp)
@@ -350,6 +351,7 @@ target_link_libraries(quickstep_utility
quickstep_utility_EqualsAnyConstant
quickstep_utility_ExecutionDAGVisualizer
quickstep_utility_Glob
+ quickstep_utility_HashError
quickstep_utility_HashPair
quickstep_utility_Macros
quickstep_utility_MemStream
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/utility/HashError.hpp
----------------------------------------------------------------------
diff --git a/utility/HashError.hpp b/utility/HashError.hpp
new file mode 100644
index 0000000..3a59979
--- /dev/null
+++ b/utility/HashError.hpp
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+#define QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+
+#include <exception>
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ * @{
+ */
+
+class HashNotSupported : public std::exception {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param message The error message.
+ **/
+ HashNotSupported(const std::string &message)
+ : message_(message) {}
+
+ ~HashNotSupported() throw() {}
+
+ virtual const char* what() const throw() {
+ return message_.c_str();
+ }
+
+ private:
+ const std::string message_;
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_HASH_ERROR_HPP_