You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/24 18:09:01 UTC

[2/2] incubator-quickstep git commit: Fixes

Fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/11b01099
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/11b01099
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/11b01099

Branch: refs/heads/LIP-for-tpch-merged
Commit: 11b01099e40e342b809e8b36ed399581a86b5d7c
Parents: a9cfdd1
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Aug 24 13:08:55 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Aug 24 13:08:55 2016 -0500

----------------------------------------------------------------------
 query_optimizer/PhysicalGenerator.cpp           |  6 +-
 query_optimizer/rules/SwapProbeBuild.cpp        | 13 +++-
 relational_operators/HashJoinOperator.cpp       | 36 +++-------
 storage/InsertDestination.cpp                   | 28 ++++++++
 storage/InsertDestination.hpp                   |  5 ++
 types/containers/ColumnVector.hpp               | 72 --------------------
 types/containers/ColumnVectorsValueAccessor.hpp | 14 ----
 utility/BitVector.hpp                           | 27 --------
 8 files changed, 58 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index e093272..c8928d7 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -97,12 +97,12 @@ P::PhysicalPtr PhysicalGenerator::generateInitialPlan(
 
 P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   std::vector<std::unique_ptr<Rule<P::Physical>>> rules;
-  if (FLAGS_reorder_hash_joins) {
     rules.emplace_back(new PruneColumns());
+  if (FLAGS_reorder_hash_joins) {
     rules.emplace_back(new StarSchemaHashJoinOrderOptimization());
+  } else {
+    rules.emplace_back(new SwapProbeBuild());
   }
-  rules.emplace_back(new PruneColumns());
-  // rules.emplace_back(new SwapProbeBuild());
   rules.emplace_back(new FuseJoinSelect());
   rules.emplace_back(new PruneColumns());
   rules.emplace_back(new AttachBloomFilters());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/query_optimizer/rules/SwapProbeBuild.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/SwapProbeBuild.cpp b/query_optimizer/rules/SwapProbeBuild.cpp
index cc3f1e2..572888f 100644
--- a/query_optimizer/rules/SwapProbeBuild.cpp
+++ b/query_optimizer/rules/SwapProbeBuild.cpp
@@ -45,7 +45,18 @@ P::PhysicalPtr SwapProbeBuild::applyToNode(const P::PhysicalPtr &input) {
     std::size_t left_cardinality = cost_model_->estimateCardinality(left);
     std::size_t right_cardinality = cost_model_->estimateCardinality(right);
 
-    if (right_cardinality > left_cardinality) {
+    const bool left_unique =
+        left->impliesUniqueAttributes(hash_join->left_join_attributes());
+    const bool right_unique =
+        right->impliesUniqueAttributes(hash_join->right_join_attributes());
+
+    if (!left_unique && right_unique) {
+      LOG_IGNORING_RULE(input);
+      return input;
+    }
+
+    if ((left_unique && !right_unique) ||
+        right_cardinality > left_cardinality) {
       std::vector<E::AttributeReferencePtr> left_join_attributes = hash_join->left_join_attributes();
       std::vector<E::AttributeReferencePtr> right_join_attributes = hash_join->right_join_attributes();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index a45eb24..3dc9aae 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -471,8 +471,7 @@ void HashInnerJoinWorkOrder::execute() {
 
 //  materialize_line->emplace_back();
 //  iterate_line->emplace_back();
-//  std::cout << "here!\n";
-  ColumnVectorsValueAccessor temp_result;
+  MutableBlockReference output_block;
   for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
            &build_block_entry : *collector.getJoinedTuples()) {
 //    iterate_line->back().endEvent();
@@ -530,40 +529,25 @@ void HashInnerJoinWorkOrder::execute() {
     // benefit (probably only a real performance win when there are very few
     // matching tuples in each individual inner block but very many inner
     // blocks with at least one match).
-    std::vector<std::unique_ptr<ColumnVector>> columns;
+    ColumnVectorsValueAccessor temp_result;
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
          selection_cit != selection_.end();
          ++selection_cit) {
-      columns.emplace_back(
-          std::unique_ptr<ColumnVector>(
-              (*selection_cit)->getAllValuesForJoin(build_relation_id,
-                                                    build_accessor.get(),
-                                                    probe_relation_id,
-                                                    probe_accessor.get(),
-                                                    build_block_entry.second)));
+      temp_result.addColumn(
+          (*selection_cit)->getAllValuesForJoin(build_relation_id,
+                                                build_accessor.get(),
+                                                probe_relation_id,
+                                                probe_accessor.get(),
+                                                build_block_entry.second));
     }
-
-    temp_result.appendColumns(&columns, build_block_entry.second.size());
-
-//    ColumnVectorsValueAccessor temp_result;
-//    for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
-//         selection_cit != selection_.end();
-//         ++selection_cit) {
-//      temp_result.addColumn(
-//          (*selection_cit)->getAllValuesForJoin(build_relation_id,
-//                                                build_accessor.get(),
-//                                                probe_relation_id,
-//                                                probe_accessor.get(),
-//                                                build_block_entry.second));
-//    }
-//    output_destination_->bulkInsertTuples(&temp_result);
+    output_destination_->bulkInsertTuples(&temp_result, &output_block);
 
 //    iterate_line->emplace_back();
   }
 //  iterate_line->back().endEvent();
 //  iterate_line->back().setPayload(getOperatorIndex(), 0);
 
-  output_destination_->bulkInsertTuples(&temp_result);
+  output_destination_->returnBlock(&output_block);
 
 //  materialize_line->back().endEvent();
 //  materialize_line->back().setPayload(getOperatorIndex(), collector.getJoinedTuples()->size());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 5e83453..bca9dff 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -221,6 +221,32 @@ void InsertDestination::bulkInsertTuples(ValueAccessor *accessor, bool always_ma
   });
 }
 
+void InsertDestination::bulkInsertTuples(ValueAccessor *accessor,
+                                         MutableBlockReference *output_block) {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    accessor->beginIteration();
+    while (!accessor->iterationFinished()) {
+      // FIXME(chasseur): Deal with TupleTooLargeForBlock exception.
+      if (!output_block->valid()) {
+        *output_block = this->getBlockForInsertion();
+      }
+      if ((*output_block)->bulkInsertTuples(accessor) == 0 ||
+          !accessor->iterationFinished()) {
+        // output_block is full.
+        this->returnBlock(std::move(*output_block), true);
+      }
+    }
+  });
+}
+
+void InsertDestination::returnBlock(MutableBlockReference *output_block) {
+  if (output_block->valid()) {
+    this->returnBlock(std::move(*output_block), false);
+  }
+}
+
 void InsertDestination::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map,
     ValueAccessor *accessor,
@@ -312,6 +338,7 @@ void AlwaysCreateBlockInsertDestination::returnBlock(MutableBlockReference &&blo
   // Due to the nature of this InsertDestination, a block will always be
   // streamed no matter if it's full or not.
   sendBlockFilledMessage(block->getID());
+  block.release();
 }
 
 MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
@@ -389,6 +416,7 @@ void BlockPoolInsertDestination::returnBlock(MutableBlockReference &&block, cons
   }
   // Note that the block will only be sent if it's full (true).
   sendBlockFilledMessage(block->getID());
+  block.release();
 }
 
 const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInternal() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 408e76b..5ae35b4 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -147,6 +147,11 @@ class InsertDestination : public InsertDestinationInterface {
 
   void bulkInsertTuples(ValueAccessor *accessor, bool always_mark_full = false) override;
 
+  void bulkInsertTuples(ValueAccessor *accessor,
+                        MutableBlockReference *output_block);
+
+  void returnBlock(MutableBlockReference *block);
+
   void bulkInsertTuplesWithRemappedAttributes(
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/types/containers/ColumnVector.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp
index c1cbff6..0d37004 100644
--- a/types/containers/ColumnVector.hpp
+++ b/types/containers/ColumnVector.hpp
@@ -107,8 +107,6 @@ class ColumnVector {
    **/
   virtual bool isNative() const = 0;
 
-  virtual bool append(const ColumnVector *column_vector) = 0;
-
  protected:
   const Type &type_;
 
@@ -401,49 +399,6 @@ class NativeColumnVector : public ColumnVector {
     }
   }
 
-  bool append(const ColumnVector *column_vector) override {
-    // Other ColumnVector also has to be native.
-    if (!column_vector->isNative()) {
-      return false;
-    }
-    const NativeColumnVector *casted_column_vector =
-        static_cast<const NativeColumnVector*>(column_vector);
-    // Both ColumnVectors has to have same type to be appended.
-    if (!type_.equals(casted_column_vector->type_)
-            || type_length_ != casted_column_vector->type_length_) {
-      return false;
-    }
-    // Let's be generous about new reserved space.
-    std::size_t new_actual_length = actual_length_ + casted_column_vector->actual_length_;
-    std::size_t new_reserved_length = 0;
-    if (new_actual_length > reserved_length_) {
-      new_reserved_length = 2 * new_actual_length;
-    } else {
-      new_reserved_length = reserved_length_;
-    }
-
-    void *new_buffer = std::realloc(values_,
-                                    type_length_ * new_reserved_length);
-
-    if (new_buffer == nullptr) {
-      return false;
-    }
-    std::swap(values_, new_buffer);
-    std::memcpy(static_cast<char*>(values_)
-                    + (type_length_ * actual_length_), // First empty position of this' buffer
-                casted_column_vector->values_,         // First postion of other's buffer
-                type_length_ * casted_column_vector->actual_length_);  // Number of bytes
-
-    reserved_length_ = new_reserved_length;
-    actual_length_ = new_actual_length;
-
-    if (null_bitmap_) {
-      return null_bitmap_->append((casted_column_vector->null_bitmap_).get());
-    }
-
-    return true;
-  }
-
  private:
   const std::size_t type_length_;
   void *values_;
@@ -636,33 +591,6 @@ class IndirectColumnVector : public ColumnVector {
     values_[position] = std::move(value);
   }
 
-  bool append(const ColumnVector *column_vector) override {
-    if (column_vector->isNative()) {
-      return false;
-    }
-    const IndirectColumnVector *casted_column_vector =
-        static_cast<const IndirectColumnVector*>(column_vector);
-    // Both ColumnVectors has to have same type to be appended.
-    if (!type_.equals(casted_column_vector->type_)
-        || type_is_nullable_ != casted_column_vector->type_is_nullable_) {
-      return false;
-    }
-
-    std::size_t new_actual_length = values_.size() + casted_column_vector->values_.size();
-    std::size_t new_reserved_length
-        = (new_actual_length > reserved_length_)
-          ? (new_actual_length * 2)
-          : (reserved_length_);
-
-    values_.reserve(new_reserved_length);
-    values_.insert(values_.end(),
-                   casted_column_vector->values_.begin(),
-                   casted_column_vector->values_.end());
-    reserved_length_ = new_reserved_length;
-
-    return true;
-  }
-
  private:
   const bool type_is_nullable_;
   std::size_t reserved_length_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index ed2ee76..abf99ab 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -93,20 +93,6 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
           : static_cast<const IndirectColumnVector*>(column)->size();
   }
 
-  void appendColumns(std::vector<std::unique_ptr<ColumnVector>> *columns,
-                     const std::size_t length) {
-    if (columns_.empty()) {
-      for (auto &column : *columns) {
-        addColumn(column.release(), true);
-      }
-    } else {
-      for (std::size_t i = 0; i < columns_.size(); ++i) {
-        columns_[i]->append(columns->at(i).get());
-      }
-      column_length_ += length;
-    }
-  }
-
   void increaseColumnLength(const std::size_t delta_length) {
     column_length_ += delta_length;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/11b01099/utility/BitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BitVector.hpp b/utility/BitVector.hpp
index c47d150..6af0961 100644
--- a/utility/BitVector.hpp
+++ b/utility/BitVector.hpp
@@ -829,33 +829,6 @@ class BitVector {
     return num_bits_;
   }
 
-  bool append(BitVector *other) {
-    std::size_t total_data_array_size = data_array_size_ + other->data_array_size_;
-
-    std::size_t *new_data_array_ = reinterpret_cast<std::size_t*>(std::realloc(data_array_, total_data_array_size));
-    if (new_data_array_ == nullptr) {
-      return false;
-    }
-    // Swap pointers.
-    std::swap(data_array_, new_data_array_);
-
-    // Copy other BitVector's data.
-    std::memcpy(reinterpret_cast<std::uint8_t*>(data_array_) + data_array_size_,
-                other->data_array_,
-                other->data_array_size_);
-
-    // Not complete (it is working only under vector<std::size_t> impl.)
-    const std::size_t excess_bits_at_original = num_bits_ % kSizeTBits;
-    const std::size_t old_num_bits = num_bits_;
-    // Update private fields to make shiftTailForward work correctly.
-    data_array_size_ = total_data_array_size;
-    num_bits_ = num_bits_ + other->num_bits_;
-
-    shiftTailForward(old_num_bits, excess_bits_at_original);
-
-    return true;
-  }
-
  private:
   // This works as long as the bit-width of size_t is power of 2:
   static const std::size_t kLowerOrderMask = (sizeof(std::size_t) << 3) - 1;