You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/04/12 19:36:05 UTC

[2/5] incubator-quickstep git commit: Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 0e75411..cd376c1 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -32,6 +32,7 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -532,6 +533,7 @@ void HashInnerJoinWorkOrder::executeWithoutCopyElision(ValueAccessor *probe_acce
     }
 
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (auto selection_cit = selection_.begin();
          selection_cit != selection_.end();
          ++selection_cit) {
@@ -539,8 +541,10 @@ void HashInnerJoinWorkOrder::executeWithoutCopyElision(ValueAccessor *probe_acce
                                                                   build_accessor.get(),
                                                                   probe_relation_id,
                                                                   probe_accessor,
-                                                                  build_block_entry.second));
+                                                                  build_block_entry.second,
+                                                                  scalar_cache.get()));
     }
+    scalar_cache.reset();
 
     output_destination_->bulkInsertTuples(&temp_result);
   }
@@ -649,12 +653,14 @@ void HashInnerJoinWorkOrder::executeWithCopyElision(ValueAccessor *probe_accesso
         zipped_joined_tuple_ids.emplace_back(build_tids[i], probe_tids[i]);
       }
 
+      ScalarCache scalar_cache;
       for (const Scalar *scalar : non_trivial_expressions) {
         temp_result.addColumn(scalar->getAllValuesForJoin(build_relation_id,
                                                           build_accessor.get(),
                                                           probe_relation_id,
                                                           probe_accessor,
-                                                          zipped_joined_tuple_ids));
+                                                          zipped_joined_tuple_ids,
+                                                          &scalar_cache));
       }
     }
 
@@ -765,13 +771,16 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_store.createValueAccessor(&filter));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end();
        ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -828,12 +837,15 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -886,12 +898,15 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -976,14 +991,18 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end();
        ++selection_it) {
     temp_result.addColumn(
         (*selection_it)->getAllValues(probe_accessor_with_filter.get(),
-                                      &sub_blocks_ref));
+                                      &sub_blocks_ref,
+                                      scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -1032,12 +1051,11 @@ void HashOuterJoinWorkOrder::execute() {
            &build_block_entry : *collector.getJoinedTupleMap()) {
     const BlockReference build_block =
         storage_manager_->getBlock(build_block_entry.first, build_relation_);
-    const TupleStorageSubBlock &build_store =
-        build_block->getTupleStorageSubBlock();
+    const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+    std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
 
-    std::unique_ptr<ValueAccessor> build_accessor(
-        build_store.createValueAccessor());
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (auto selection_it = selection_.begin();
          selection_it != selection_.end();
          ++selection_it) {
@@ -1047,8 +1065,11 @@ void HashOuterJoinWorkOrder::execute() {
               build_accessor.get(),
               probe_relation_id,
               probe_accessor.get(),
-              build_block_entry.second));
+              build_block_entry.second,
+              scalar_cache.get()));
     }
+    scalar_cache.reset();
+
     output_destination_->bulkInsertTuples(&temp_result);
   }
 
@@ -1061,8 +1082,9 @@ void HashOuterJoinWorkOrder::execute() {
   if (num_tuples_without_matches > 0) {
     std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
         probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
-    ColumnVectorsValueAccessor temp_result;
 
+    ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (std::size_t i = 0; i < selection_.size(); ++i) {
       if (is_selection_on_build_[i]) {
         // NOTE(harshad, jianqiao): The assumption here is that any operation
@@ -1090,9 +1112,12 @@ void HashOuterJoinWorkOrder::execute() {
       } else {
         temp_result.addColumn(
             selection_[i]->getAllValues(probe_accessor_with_filter.get(),
-                                        &sub_blocks_ref));
+                                        &sub_blocks_ref,
+                                        scalar_cache.get()));
       }
     }
+    scalar_cache.reset();
+
     output_destination_->bulkInsertTuples(&temp_result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/relational_operators/NestedLoopsJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp
index f17402f..a6bacc7 100644
--- a/relational_operators/NestedLoopsJoinOperator.cpp
+++ b/relational_operators/NestedLoopsJoinOperator.cpp
@@ -27,6 +27,7 @@
 #include "catalog/CatalogRelationSchema.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -417,6 +418,7 @@ void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_st
     // evaluation and data movement, but low enough that temporary memory
     // requirements don't get out of hand).
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
          selection_cit != selection_.end();
          ++selection_cit) {
@@ -424,8 +426,10 @@ void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_st
                                                                   left_accessor.get(),
                                                                   right_input_relation_id,
                                                                   right_accessor.get(),
-                                                                  joined_tuple_ids));
+                                                                  joined_tuple_ids,
+                                                                  scalar_cache.get()));
     }
+    scalar_cache.reset();
 
     output_destination_->bulkInsertTuples(&temp_result);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 90543c4..facc7fa 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -38,6 +38,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/CollisionFreeVectorTable.hpp"
 #include "storage/HashTableFactory.hpp"
@@ -491,9 +492,10 @@ void AggregationOperationState::aggregateBlock(const block_id input_block,
     SubBlocksReference sub_blocks_ref(tuple_store,
                                       block->getIndices(),
                                       block->getIndicesConsistent());
+    ScalarCache scalar_cache;
     for (const auto &expression : non_trivial_expressions_) {
       non_trivial_results->addColumn(
-          expression->getAllValues(accessor, &sub_blocks_ref));
+          expression->getAllValues(accessor, &sub_blocks_ref, &scalar_cache));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index cb1f098..0a1d484 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -276,6 +276,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_storage_AggregationOperationState_proto
                       quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_HashTableBase
@@ -936,6 +937,7 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_storage_BasicColumnStoreTupleStorageSubBlock
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
@@ -1086,6 +1088,7 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_expressions_windowaggregation_WindowAggregateFunction
                       quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
                       quickstep_expressions_windowaggregation_WindowAggregationHandle

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index e91c1ac..d724317 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -30,6 +30,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "storage/BasicColumnStoreTupleStorageSubBlock.hpp"
 #include "storage/BloomFilterIndexSubBlock.hpp"
 #include "storage/CSBTreeIndexSubBlock.hpp"
@@ -369,15 +370,18 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
                                       indices_,
                                       indices_consistent_);
 
-    std::unique_ptr<ValueAccessor> accessor(
-        tuple_store_->createValueAccessor(filter));
+    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
+    ScalarCache scalar_cache;
 
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
          selection_cit != selection.end();
          ++selection_cit) {
       // TODO(chasseur): Can probably elide some copies for parts of the
       // selection that are ScalarAttribute or ScalarLiteral.
-      temp_result.addColumn((*selection_cit)->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result.addColumn(
+          (*selection_cit)->getAllValues(accessor.get(),
+                                         &sub_blocks_ref,
+                                         &scalar_cache));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 58bdf18..2c571ef 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -33,6 +33,7 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunction.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
 #include "expressions/window_aggregation/WindowAggregationHandle.hpp"
@@ -236,11 +237,16 @@ void WindowAggregationOperationState::windowAggregateBlocks(
       argument_accessor = new ColumnVectorsValueAccessor();
     }
 
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (const std::unique_ptr<const Scalar> &argument : arguments_) {
       argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
-                                                          &sub_block_ref));
+                                                          &sub_block_ref,
+                                                          scalar_cache.get()));
     }
 
+    // Release common subexpression cache as early as possible.
+    scalar_cache.reset();
+
     InvokeOnAnyValueAccessor(tuple_accessor,
                              [&] (auto *tuple_accessor) -> void {  // NOLINT(build/c++11)
       tuple_accessor->beginIteration();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/types/containers/ColumnVector.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp
index fc65656..430a844 100644
--- a/types/containers/ColumnVector.hpp
+++ b/types/containers/ColumnVector.hpp
@@ -43,6 +43,9 @@ namespace quickstep {
 // TODO(chasseur): Look into ways to allocate ColumnVector memory from the
 // StorageManager.
 
+class ColumnVector;
+typedef std::shared_ptr<const ColumnVector> ColumnVectorPtr;
+
 /**
  * @brief A vector of values of the same type. Two implementations exist:
  *        NativeColumnVector (an array of fixed-size data elements) and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index 6dc1124..d9cf49d 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -74,22 +74,23 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
    *             this value-accessor is responsible for freeing this column
    *             vector.
    **/
-  void addColumn(ColumnVector *column, const bool owns = true) {
+  void addColumn(ColumnVectorPtr column) {
     // If this is not the first column to be added, make sure it is the same
     // length as the others.
     DCHECK(columns_.empty()
            || (column->isNative()
-               ? (static_cast<const NativeColumnVector*>(column)->size() == column_length_)
-               : (static_cast<const IndirectColumnVector*>(column)->size() == column_length_)));
+               ? (static_cast<const NativeColumnVector*>(column.get())->size() == column_length_)
+               : (static_cast<const IndirectColumnVector*>(column.get())->size() == column_length_)));
     columns_.push_back(column);
     column_native_.push_back(column->isNative());
-    if (owns) {
-      deleter_.addObject(column);
-    }
-    column_length_
-        = column->isNative()
-          ? static_cast<const NativeColumnVector*>(column)->size()
-          : static_cast<const IndirectColumnVector*>(column)->size();
+    column_length_ =
+        column->isNative()
+            ? static_cast<const NativeColumnVector*>(column.get())->size()
+            : static_cast<const IndirectColumnVector*>(column.get())->size();
+  }
+
+  void addColumn(ColumnVector *column) {
+    addColumn(ColumnVectorPtr(column));
   }
 
   inline void beginIteration() {
@@ -309,11 +310,10 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
            && (static_cast<std::vector<ColumnVector*>::size_type>(attr_id) < columns_.size());
   }
 
-  std::vector<ColumnVector*> columns_;
+  std::vector<ColumnVectorPtr> columns_;
   std::vector<bool> column_native_;
   std::size_t column_length_;
   std::size_t current_position_;
-  ScopedDeleter deleter_;
 
   DISALLOW_COPY_AND_ASSIGN(ColumnVectorsValueAccessor);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ca04462..ea9ee43 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -182,6 +182,7 @@ add_library(quickstep_utility_ExecutionDAGVisualizer
             ExecutionDAGVisualizer.cpp
             ExecutionDAGVisualizer.hpp)
 add_library(quickstep_utility_Glob Glob.cpp Glob.hpp)
+add_library(quickstep_utility_HashError ../empty_src.cpp HashError.hpp)
 add_library(quickstep_utility_HashPair ../empty_src.cpp HashPair.hpp)
 add_library(quickstep_utility_Macros ../empty_src.cpp Macros.hpp)
 add_library(quickstep_utility_MemStream ../empty_src.cpp MemStream.hpp)
@@ -350,6 +351,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_ExecutionDAGVisualizer
                       quickstep_utility_Glob
+                      quickstep_utility_HashError
                       quickstep_utility_HashPair
                       quickstep_utility_Macros
                       quickstep_utility_MemStream

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cd01af24/utility/HashError.hpp
----------------------------------------------------------------------
diff --git a/utility/HashError.hpp b/utility/HashError.hpp
new file mode 100644
index 0000000..3a59979
--- /dev/null
+++ b/utility/HashError.hpp
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+#define QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+
+#include <exception>
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class HashNotSupported : public std::exception {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param message The error message.
+   **/
+  HashNotSupported(const std::string &message)
+      : message_(message) {}
+
+  ~HashNotSupported() throw() {}
+
+  virtual const char* what() const throw() {
+    return message_.c_str();
+  }
+
+ private:
+  const std::string message_;
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_HASH_ERROR_HPP_