You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/05/04 04:13:29 UTC
[1/2] incubator-quickstep git commit: Dispatch BuildHashJoin and
Aggregation based on block locality. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/fix-copy-group 21559eb61 -> 4677e7d48 (forced update)
Dispatch BuildHashJoin and Aggregation based on block locality.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ece7e424
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ece7e424
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ece7e424
Branch: refs/heads/fix-copy-group
Commit: ece7e424b5c43e91dbe6e52b1ca95312696e57d2
Parents: 0572f40
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Apr 11 00:26:40 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed May 3 12:50:03 2017 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 2 ++
query_execution/ForemanDistributed.cpp | 9 +++++++--
query_execution/PolicyEnforcerDistributed.cpp | 8 ++++++++
query_execution/PolicyEnforcerDistributed.hpp | 15 ++++++++++++--
query_execution/QueryManagerDistributed.hpp | 23 ++++++++++++++++++----
5 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index eeed791..c74fa36 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -267,6 +267,7 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_BlockLocator
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryexecution_QueryExecutionMessages_proto
@@ -277,6 +278,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_relationaloperators_RelationalOperator
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_StorageBlockInfo
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 81b5ec1..e5e0eee 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,10 +243,12 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
size_t *shiftboss_index_for_aggregation) {
const S::WorkOrder &work_order_proto = proto.work_order();
QueryContext::aggregation_state_id aggr_state_index;
+ block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+ block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
break;
case S::FINALIZE_AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
@@ -259,7 +261,8 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
- proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
+ proto.query_id(), aggr_state_index, block_locator_, block, next_shiftboss_index_to_schedule,
+ shiftboss_index_for_aggregation);
return true;
}
@@ -270,11 +273,13 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
const S::WorkOrder &work_order_proto = proto.work_order();
QueryContext::join_hash_table_id join_hash_table_index;
partition_id part_id;
+ block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::BUILD_HASH:
join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id);
+ block = work_order_proto.GetExtension(S::BuildHashWorkOrder::block_id);
break;
case S::HASH_JOIN:
join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index);
@@ -289,7 +294,7 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
- proto.query_id(), join_hash_table_index, part_id, next_shiftboss_index_to_schedule,
+ proto.query_id(), join_hash_table_index, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
shiftboss_index_for_hash_join);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 46a0972..36becf2 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -192,11 +192,15 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const BlockLocator &block_locator,
+ const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index) {
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+ block_locator,
+ block,
next_shiftboss_index_to_schedule,
shiftboss_index);
}
@@ -205,12 +209,16 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
const std::size_t query_id,
const QueryContext::join_hash_table_id join_hash_table_index,
const partition_id part_id,
+ const BlockLocator &block_locator,
+ const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index) {
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
query_manager->getShiftbossIndexForHashJoin(join_hash_table_index,
part_id,
+ block_locator,
+ block,
next_shiftboss_index_to_schedule,
shiftboss_index);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index fb46b39..cd3a434 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -47,6 +47,7 @@ class TaggedMessage;
namespace quickstep {
+class BlockLocator;
class CatalogDatabaseLite;
class CatalogRelation;
class QueryProcessor;
@@ -125,31 +126,39 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
/**
* @brief Get or set the index of Shiftboss for an Aggregation related
* WorkOrder. If it is the first Aggregation on <aggr_state_index>,
- * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+ * <shiftboss_index> will be set based on block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
* Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
* has executed the first Aggregation.
*
* @param query_id The query id.
* @param aggr_state_index The Hash Table for the Aggregation.
+ * @param block_locator The BlockLocator to use.
+ * @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
* @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
**/
void getShiftbossIndexForAggregation(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const BlockLocator &block_locator,
+ const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index);
/**
* @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
* If it is the first BuildHash on <join_hash_table_index, part_id>,
- * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+ * <shiftboss_index> will be set to block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
* Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
* has executed the first BuildHash.
*
* @param query_id The query id.
* @param join_hash_table_index The Hash Table for the Join.
* @param part_id The partition ID.
+ * @param block_locator The BlockLocator to use.
+ * @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
* @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
**/
@@ -157,6 +166,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
const std::size_t query_id,
const QueryContext::join_hash_table_id join_hash_table_index,
const partition_id part_id,
+ const BlockLocator &block_locator,
+ const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 3ebc434..6a47ce8 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -25,11 +25,13 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
#include "tmb/address.h"
@@ -100,17 +102,23 @@ class QueryManagerDistributed final : public QueryManagerBase {
/**
* @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
- * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+ * the Shiftboss index is not found, set using the block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
*
* @param aggr_state_index The Hash Table for the Aggregation.
+ * @param block_locator The BlockLocator to use.
+ * @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
* @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
**/
void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+ const BlockLocator &block_locator,
+ const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index) {
DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size());
- if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex) {
+ if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex &&
+ !block_locator.getBlockLocalityInfo(block, &shiftboss_indexes_for_aggrs_[aggr_state_index])) {
shiftboss_indexes_for_aggrs_[aggr_state_index] = next_shiftboss_index_to_schedule;
}
@@ -119,21 +127,28 @@ class QueryManagerDistributed final : public QueryManagerBase {
/**
* @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
- * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+ * Shiftboss index is not found, set using the block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
*
* @param join_hash_table_index The Hash Table for the Join.
* @param part_id The partition ID.
+ * @param block_locator The BlockLocator to use.
+ * @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
* @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
**/
void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index,
const partition_id part_id,
+ const BlockLocator &block_locator,
+ const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index) {
DCHECK_LT(join_hash_table_index, shiftboss_indexes_for_hash_joins_.size());
DCHECK_LT(part_id, shiftboss_indexes_for_hash_joins_[join_hash_table_index].size());
- if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex) {
+ if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex &&
+ !block_locator.getBlockLocalityInfo(block,
+ &shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id])) {
shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = next_shiftboss_index_to_schedule;
}
[2/2] incubator-quickstep git commit: Fix a problem in CopyGroupList
+ minor style fixes.
Posted by ji...@apache.org.
Fix a problem in CopyGroupList + minor style fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4677e7d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4677e7d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4677e7d4
Branch: refs/heads/fix-copy-group
Commit: 4677e7d48fe9d17451feb748c698fd1b12b7eb4e
Parents: ece7e42
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue May 2 23:55:52 2017 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed May 3 23:13:17 2017 -0500
----------------------------------------------------------------------
storage/SplitRowStoreTupleStorageSubBlock.cpp | 169 ++++++++++---------
storage/SplitRowStoreTupleStorageSubBlock.hpp | 127 +++++++-------
...litRowStoreTupleStorageSubBlock_unittest.cpp | 80 ++++++---
3 files changed, 206 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4677e7d4/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index ad583eb..0e5cfe6 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -244,14 +244,12 @@ tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertDispatcher(
CopyGroupList copy_groups;
getCopyGroupsForAttributeMap(attribute_map, ©_groups);
- auto impl = accessor->getImplementationType();
- const bool is_rowstore_source = impl == ValueAccessor::Implementation::kSplitRowStore;
- if (is_rowstore_source) {
- copy_groups.merge_contiguous();
+ if (accessor->getImplementationType() == ValueAccessor::Implementation::kSplitRowStore) {
+ copy_groups.mergeContiguous();
}
- const bool copy_nulls = copy_groups.nullable_attrs_.size() > 0;
- const bool copy_varlen = copy_groups.varlen_attrs_.size() > 0;
+ const bool copy_nulls = copy_groups.nullable_attrs.size() > 0;
+ const bool copy_varlen = copy_groups.varlen_attrs.size() > 0;
if (fill_to_capacity) {
if (relation_.hasNullableAttributes()) {
@@ -311,96 +309,101 @@ tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesImpl(
std::size_t num_tuples_inserted = 0;
// We only append to the end of the block to cut down on complexity.
- char *tuple_slot = static_cast<char *>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_;
+ char *tuple_slot = static_cast<char *>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_;
std::uint32_t varlen_heap_offset = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
std::uint32_t varlen_heap_offset_orig = varlen_heap_offset;
- BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
- char *fixed_len_cursor = tuple_slot + BitVector<true>::BytesNeeded(num_null_attrs_);
-
-
-
std::size_t storage_available = tuple_storage_bytes_ -
(header_->variable_length_bytes_allocated +
header_->num_tuples * tuple_slot_bytes_);
+ const std::vector<ContiguousAttrs> &contiguous_attrs = copy_groups.contiguous_attrs;
+ const std::vector<VarLenAttr> &varlen_attrs = copy_groups.varlen_attrs;
+ const std::vector<NullableAttr> &nullable_attrs = copy_groups.nullable_attrs;
+
// The number of bytes that must be reserved per tuple inserted due to gaps.
std::size_t varlen_reserve = relation_.getMaximumVariableByteLength();
if (fill_to_capacity) {
- for (std::size_t vattr_idx = 0; vattr_idx < copy_groups.varlen_attrs_.size(); vattr_idx++) {
+ for (std::size_t vattr_idx = 0; vattr_idx < varlen_attrs.size(); vattr_idx++) {
varlen_reserve -= relation_.getAttributeById(
- copy_groups.varlen_attrs_[vattr_idx].dst_attr_id_)->getType().maximumByteLength();
+ varlen_attrs[vattr_idx].dst_attr_id)->getType().maximumByteLength();
}
DCHECK_GE(relation_.getMaximumVariableByteLength(), varlen_reserve);
}
InvokeOnAnyValueAccessor(
- accessor,
- [&](auto *accessor) -> void { // NOLINT(build/c++11
- do {
- const std::size_t num_c_attr = copy_groups.contiguous_attrs_.size();
- const std::size_t num_n_attr = copy_groups.nullable_attrs_.size();
- const std::size_t num_v_attr = copy_groups.varlen_attrs_.size();
-
- const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
-
- while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
- for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
- const ContiguousAttrs &cattr = copy_groups.contiguous_attrs_[cattr_idx];
- fixed_len_cursor += cattr.bytes_to_advance_;
- const void *attr_value = accessor->template getUntypedValue<false>(cattr.src_attr_id_);
- std::memcpy(fixed_len_cursor, attr_value, cattr.bytes_to_copy_);
- }
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11
+ BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
+ const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
+
+ const std::size_t num_c_attr = contiguous_attrs.size();
+ const std::size_t num_n_attr = nullable_attrs.size();
+ const std::size_t num_v_attr = varlen_attrs.size();
+
+ do {
+ while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
+ char *attr_cursor = tuple_slot + nullmap_size;
+ for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
+ const ContiguousAttrs &cattr = contiguous_attrs[cattr_idx];
+ attr_cursor += cattr.bytes_to_advance;
+ const void *attr_value =
+ accessor->template getUntypedValue<false>(cattr.src_attr_id);
+ std::memcpy(attr_cursor, attr_value, cattr.bytes_to_copy);
+ }
- if (copy_nulls) {
- tuple_null_bitmap.setMemory(tuple_slot);
- for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
- const NullableAttr &nattr = copy_groups.nullable_attrs_[nattr_idx];
- const void *attr_value = accessor->template getUntypedValue<true>(nattr.src_attr_id_);
- if (attr_value == nullptr) {
- tuple_null_bitmap.setBit(nattr.nullable_attr_idx_, true);
- }
+ if (copy_nulls) {
+ tuple_null_bitmap.setMemory(tuple_slot);
+ for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
+ const NullableAttr &nattr = nullable_attrs[nattr_idx];
+ const void *attr_value =
+ accessor->template getUntypedValue<true>(nattr.src_attr_id);
+ if (attr_value == nullptr) {
+ tuple_null_bitmap.setBit(nattr.nullable_attr_idx, true);
}
}
+ }
- if (copy_varlen) {
- for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
- const VarLenAttr &vattr = copy_groups.varlen_attrs_[vattr_idx];
- fixed_len_cursor += vattr.bytes_to_advance_;
- // Typed value is necessary as we need the length.
- const TypedValue &attr_value = accessor->template getTypedValue(vattr.src_attr_id_);
- if (attr_value.isNull()) {
- continue;
- }
- const std::size_t attr_size = attr_value.getDataSize();
- varlen_heap_offset -= attr_size;
- std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset, attr_value.getDataPtr(),
- attr_size);
- reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[0] = varlen_heap_offset;
- reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[1] = static_cast<std::uint32_t>(attr_size);
+ if (copy_varlen) {
+ for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
+ const VarLenAttr &vattr = varlen_attrs[vattr_idx];
+ attr_cursor += vattr.bytes_to_advance;
+ // Typed value is necessary as we need the length.
+ const TypedValue &attr_value =
+ accessor->template getTypedValue(vattr.src_attr_id);
+ if (attr_value.isNull()) {
+ continue;
}
+ const std::size_t attr_size = attr_value.getDataSize();
+ varlen_heap_offset -= attr_size;
+ std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset,
+ attr_value.getDataPtr(),
+ attr_size);
+ reinterpret_cast<std::uint32_t *>(attr_cursor)[0] = varlen_heap_offset;
+ reinterpret_cast<std::uint32_t *>(attr_cursor)[1] = static_cast<std::uint32_t>(attr_size);
}
- tuple_slot += tuple_slot_bytes_;
- fixed_len_cursor = tuple_slot + nullmap_size;
- num_tuples_inserted++;
}
- if (fill_to_capacity) {
- std::int64_t remaining_storage_after_inserts = storage_available -
- (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
- (varlen_heap_offset_orig - varlen_heap_offset));
- DCHECK_LE(0, remaining_storage_after_inserts);
- std::size_t additional_tuples_insert =
- remaining_storage_after_inserts / (tuple_slot_bytes_ + this->relation_.getMaximumByteLength());
- // We want to avoid a situation where we have several short insert iterations
- // near the end of an insertion cycle.
- if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
- max_num_tuples_to_insert += additional_tuples_insert;
- }
+ tuple_slot += tuple_slot_bytes_;
+ num_tuples_inserted++;
+ }
+ if (fill_to_capacity) {
+ const std::int64_t remaining_storage_after_inserts =
+ storage_available -
+ (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
+ (varlen_heap_offset_orig - varlen_heap_offset));
+ DCHECK_LE(0, remaining_storage_after_inserts);
+ const std::size_t additional_tuples_insert =
+ remaining_storage_after_inserts / (tuple_slot_bytes_ + this->relation_.getMaximumByteLength());
+ // We want to avoid a situation where we have several short insert iterations
+ // near the end of an insertion cycle.
+ if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
+ max_num_tuples_to_insert += additional_tuples_insert;
}
- } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
- num_tuples_inserted < max_num_tuples_to_insert);
- });
+ }
+ } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
+ num_tuples_inserted < max_num_tuples_to_insert);
+ });
if (copy_varlen) {
header_->variable_length_bytes_allocated += (varlen_heap_offset_orig - varlen_heap_offset);
@@ -879,16 +882,14 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
// the SplitRow for actual data and the tuple contains pairs of (heap offset, length). Having to
// copy varlen into the heap is the main difference from copying fixed length.
void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
- const std::vector<attribute_id> &attribute_map,
- CopyGroupList *copy_groups) {
+ const std::vector<attribute_id> &attribute_map,
+ CopyGroupList *copy_groups) {
DCHECK_EQ(attribute_map.size(), relation_.size());
- attribute_id num_attrs = attribute_map.size();
-
std::size_t contig_adv = 0;
std::size_t varlen_adv = 0;
- for (attribute_id attr_id = 0; attr_id < num_attrs; ++attr_id) {
- attribute_id src_attr = attribute_map[attr_id];
+ for (std::size_t attr_id = 0; attr_id < attribute_map.size(); ++attr_id) {
+ const attribute_id src_attr = attribute_map[attr_id];
// Attribute doesn't exist in src.
if (src_attr == kInvalidCatalogId) {
@@ -906,23 +907,23 @@ void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
// Attribute exists in src.
if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
// fixed len
- copy_groups->contiguous_attrs_.push_back(
- ContiguousAttrs(src_attr, fixed_len_attr_sizes_[attr_id], contig_adv));
+ copy_groups->contiguous_attrs.emplace_back(
+ src_attr, fixed_len_attr_sizes_[attr_id], contig_adv);
contig_adv = fixed_len_attr_sizes_[attr_id];
} else {
// var len
- copy_groups->varlen_attrs_.push_back(VarLenAttr(src_attr, attr_id, varlen_adv));
+ copy_groups->varlen_attrs.emplace_back(src_attr, attr_id, varlen_adv);
varlen_adv = SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize;
}
if (relation_.getNullableAttributeIndex(attr_id) != -1) {
- copy_groups->nullable_attrs_.push_back(
- NullableAttr(src_attr, relation_.getNullableAttributeIndex(attr_id)));
+ copy_groups->nullable_attrs.emplace_back(
+ src_attr, relation_.getNullableAttributeIndex(attr_id));
}
}
// This will point us to the beginning of the varlen zone.
- if (copy_groups->varlen_attrs_.size() > 0) {
- copy_groups->varlen_attrs_[0].bytes_to_advance_ += contig_adv;
+ if (copy_groups->varlen_attrs.size() > 0) {
+ copy_groups->varlen_attrs[0].bytes_to_advance += contig_adv;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4677e7d4/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index 67d36fe..ee2dc27 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -79,7 +79,7 @@ namespace splitrow_internal {
// we will create the following ContiguousAttrs copy groups
//
// ----------------------------------------------------
-// |src_id_ |bytes_to_advance_| bytes_to_copy_ |
+// |src_id |bytes_to_advance |bytes_to_copy |
// |-------------|-----------------|------------------|
// | 0| 4| 4|
// | 5| 4| 12|
@@ -87,7 +87,7 @@ namespace splitrow_internal {
// | 4| 4| 4|
// | 9| 4| 8|
// ----------------------------------------------------
-// and two NullableAttrs with src_attr_id_ set to 4 and 7.
+// and two NullableAttrs with src_attr_id set to 4 and 7.
//
// In this example, we do 6 memcpy calls and 6 address calculations
// as well as 2 bitvector lookups for each tuple. A naive copy algorithm
@@ -105,86 +105,91 @@ namespace splitrow_internal {
// we do all the variable length copies after the fixed length copies.
//
struct CopyGroup {
- attribute_id src_attr_id_; // The attr_id of starting input attribute for run.
-
explicit CopyGroup(const attribute_id source_attr_id)
- : src_attr_id_(source_attr_id) {}
+ : src_attr_id(source_attr_id) {}
+
+ // The id of starting input attribute for run.
+ const attribute_id src_attr_id;
};
struct ContiguousAttrs : public CopyGroup {
- std::size_t bytes_to_advance_; // Number of bytes to advance destination ptr
- // to get to the location where we copy THIS attribute.
- std::size_t bytes_to_copy_; // Number of bytes to copy from source.
-
- ContiguousAttrs(
- const attribute_id source_attr_id,
- const std::size_t bytes_to_copy,
- const std::size_t bytes_to_advance)
- : CopyGroup(source_attr_id),
- bytes_to_advance_(bytes_to_advance),
- bytes_to_copy_(bytes_to_copy) { }
+ ContiguousAttrs(const attribute_id source_attr_id,
+ const std::size_t bytes_to_copy_in,
+ const std::size_t bytes_to_advance_in)
+ : CopyGroup(source_attr_id),
+ bytes_to_advance(bytes_to_advance_in),
+ bytes_to_copy(bytes_to_copy_in) {}
+
+ // Number of bytes to advance destination ptr to get to the location where we
+ // copy THIS attribute.
+ std::size_t bytes_to_advance;
+
+ // Number of bytes to copy from source.
+ std::size_t bytes_to_copy;
};
struct VarLenAttr : public CopyGroup {
- std::size_t bytes_to_advance_;
- attribute_id dst_attr_id_;
VarLenAttr(const attribute_id source_attr_id,
- const attribute_id dst_attr_id,
- const std::size_t bytes_to_advance)
- : CopyGroup(source_attr_id),
- bytes_to_advance_(bytes_to_advance),
- dst_attr_id_(dst_attr_id) {}
+ const attribute_id destination_attr_id,
+ const std::size_t bytes_to_advance_in)
+ : CopyGroup(source_attr_id),
+ dst_attr_id(destination_attr_id),
+ bytes_to_advance(bytes_to_advance_in) {}
+
+ const attribute_id dst_attr_id;
+ std::size_t bytes_to_advance;
};
struct NullableAttr : public CopyGroup {
- int nullable_attr_idx_; // index into null bitmap
+ NullableAttr(const attribute_id source_attr_id,
+ const std::size_t nullable_attr_idx_in)
+ : CopyGroup(source_attr_id),
+ nullable_attr_idx(nullable_attr_idx_in) {}
- NullableAttr(attribute_id source_attr_id_,
- int nullable_attr_idx)
- : CopyGroup(source_attr_id_),
- nullable_attr_idx_(nullable_attr_idx) {}
+ // Index into null bitmap.
+ const std::size_t nullable_attr_idx;
};
struct CopyGroupList {
- CopyGroupList()
- : contiguous_attrs_(),
- nullable_attrs_(),
- varlen_attrs_() {}
+ CopyGroupList() {}
/**
* @brief Attributes which are exactly sequential are merged to a single copy.
*/
- void merge_contiguous() {
- if (contiguous_attrs_.size() < 2) {
+ void mergeContiguous() {
+ if (contiguous_attrs.size() < 2) {
return;
}
int add_to_advance = 0;
- for (std::size_t idx = 1; idx < contiguous_attrs_.size(); ++idx) {
- ContiguousAttrs *current_attr = &contiguous_attrs_[idx];
- ContiguousAttrs *previous_attr = &contiguous_attrs_[idx - 1];
- if (add_to_advance > 0) {
- current_attr->bytes_to_advance_ += add_to_advance;
+
+ std::vector<ContiguousAttrs> merged_attrs;
+ merged_attrs.emplace_back(contiguous_attrs.front());
+ for (std::size_t idx = 1; idx < contiguous_attrs.size(); ++idx) {
+ const ContiguousAttrs ¤t_attr = contiguous_attrs[idx];
+ const ContiguousAttrs &previous_attr = contiguous_attrs[idx - 1];
+
+ if (previous_attr.src_attr_id + 1 == current_attr.src_attr_id &&
+ previous_attr.bytes_to_copy == current_attr.bytes_to_advance) {
+ merged_attrs.back().bytes_to_copy += current_attr.bytes_to_copy;
+ add_to_advance += current_attr.bytes_to_advance;
+ } else {
+ merged_attrs.emplace_back(current_attr.src_attr_id,
+ current_attr.bytes_to_copy,
+ current_attr.bytes_to_advance + add_to_advance);
add_to_advance = 0;
}
- // The merge step:
- if (previous_attr->src_attr_id_ + 1 == current_attr->src_attr_id_ &&
- previous_attr->bytes_to_copy_ == current_attr->bytes_to_advance_) {
- previous_attr->bytes_to_copy_ += current_attr->bytes_to_copy_;
- add_to_advance += current_attr->bytes_to_advance_;
- contiguous_attrs_.erase(contiguous_attrs_.begin() + idx);
- idx--;
- }
}
+ contiguous_attrs = std::move(merged_attrs);
- if (varlen_attrs_.size() > 0) {
- varlen_attrs_[0].bytes_to_advance_ += add_to_advance;
+ if (varlen_attrs.size() > 0) {
+ varlen_attrs[0].bytes_to_advance += add_to_advance;
}
}
- std::vector<ContiguousAttrs> contiguous_attrs_;
- std::vector<NullableAttr> nullable_attrs_;
- std::vector<VarLenAttr> varlen_attrs_;
+ std::vector<ContiguousAttrs> contiguous_attrs;
+ std::vector<NullableAttr> nullable_attrs;
+ std::vector<VarLenAttr> varlen_attrs;
};
} // namespace splitrow_internal
@@ -368,19 +373,19 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
tuple_id bulkInsertPartialTuplesImpl(
- const splitrow_internal::CopyGroupList ©_groups,
- ValueAccessor *accessor,
- std::size_t max_num_tuples_to_insert);
+ const splitrow_internal::CopyGroupList ©_groups,
+ ValueAccessor *accessor,
+ std::size_t max_num_tuples_to_insert);
tuple_id bulkInsertDispatcher(
- const std::vector<attribute_id> &attribute_map,
- ValueAccessor *accessor,
- tuple_id max_num_tuples_to_insert,
- bool finalize);
+ const std::vector<attribute_id> &attribute_map,
+ ValueAccessor *accessor,
+ tuple_id max_num_tuples_to_insert,
+ bool finalize);
void getCopyGroupsForAttributeMap(
- const std::vector<attribute_id> &attribute_map,
- splitrow_internal::CopyGroupList *copy_groups);
+ const std::vector<attribute_id> &attribute_map,
+ splitrow_internal::CopyGroupList *copy_groups);
std::size_t getInsertLowerBound() const;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4677e7d4/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
index 9270d93..bc7e65e 100644
--- a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
+++ b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
@@ -761,40 +761,41 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest)
CopyGroupList copy_groups;
dst_store->getCopyGroupsForAttributeMap(attr_map, ©_groups);
- std::vector<ContiguousAttrs>& contiguous_attrs = copy_groups.contiguous_attrs_;
- std::vector<VarLenAttr>& varlen_attrs = copy_groups.varlen_attrs_;
+ const std::vector<ContiguousAttrs> &contiguous_attrs = copy_groups.contiguous_attrs;
+ const std::vector<VarLenAttr> &varlen_attrs = copy_groups.varlen_attrs;
const std::size_t size_of_string = dst_store->getRelation().getAttributeById(3)->getType().maximumByteLength();
// Fixed length attributes.
- EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
- EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
- EXPECT_EQ(4, contiguous_attrs[0].bytes_to_copy_);
+ EXPECT_EQ(0, contiguous_attrs[0].src_attr_id);
+ EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance);
+ EXPECT_EQ(4, contiguous_attrs[0].bytes_to_copy);
- EXPECT_EQ(1, contiguous_attrs[1].src_attr_id_);
- EXPECT_EQ(4, contiguous_attrs[1].bytes_to_advance_);
- EXPECT_EQ(4, contiguous_attrs[1].bytes_to_copy_);
+ EXPECT_EQ(1, contiguous_attrs[1].src_attr_id);
+ EXPECT_EQ(4, contiguous_attrs[1].bytes_to_advance);
+ EXPECT_EQ(4, contiguous_attrs[1].bytes_to_copy);
if (testVariableLength()) {
ASSERT_EQ(2, contiguous_attrs.size());
ASSERT_EQ(2, varlen_attrs.size());
- EXPECT_EQ(2, varlen_attrs[0].src_attr_id_);
- EXPECT_EQ(sizeof(int) + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[0].bytes_to_advance_);
+ EXPECT_EQ(2, varlen_attrs[0].src_attr_id);
+ EXPECT_EQ(sizeof(int) + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize,
+ varlen_attrs[0].bytes_to_advance);
- EXPECT_EQ(1, varlen_attrs[1].src_attr_id_);
- EXPECT_EQ(SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[1].bytes_to_advance_);
+ EXPECT_EQ(1, varlen_attrs[1].src_attr_id);
+ EXPECT_EQ(SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[1].bytes_to_advance);
} else {
- ASSERT_EQ(4, copy_groups.contiguous_attrs_.size());
- ASSERT_EQ(0, copy_groups.varlen_attrs_.size());
+ ASSERT_EQ(4, copy_groups.contiguous_attrs.size());
+ ASSERT_EQ(0, copy_groups.varlen_attrs.size());
- EXPECT_EQ(2, contiguous_attrs[2].src_attr_id_);
- EXPECT_EQ(4 + size_of_string, contiguous_attrs[2].bytes_to_advance_);
- EXPECT_EQ(size_of_string, contiguous_attrs[2].bytes_to_copy_);
+ EXPECT_EQ(2, contiguous_attrs[2].src_attr_id);
+ EXPECT_EQ(4 + size_of_string, contiguous_attrs[2].bytes_to_advance);
+ EXPECT_EQ(size_of_string, contiguous_attrs[2].bytes_to_copy);
}
- int null_count = copy_groups.nullable_attrs_.size();
+ int null_count = copy_groups.nullable_attrs.size();
if (testNullable()) {
// The relation contains 6 nullable attributes, but only 3 are inserted.
EXPECT_EQ(4, null_count);
@@ -802,20 +803,49 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest)
EXPECT_EQ(0, null_count);
}
- // test that merging works.
- copy_groups.merge_contiguous();
- EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
- EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
+ // Test that merging works.
+ copy_groups.mergeContiguous();
+ EXPECT_EQ(0, contiguous_attrs[0].src_attr_id);
+ EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance);
if (testVariableLength()) {
EXPECT_EQ(1, contiguous_attrs.size());
EXPECT_EQ(sizeof(int) * 2 + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize,
- varlen_attrs[0].bytes_to_advance_);
+ varlen_attrs[0].bytes_to_advance);
} else {
EXPECT_EQ(3, contiguous_attrs.size());
- EXPECT_EQ(8, contiguous_attrs[0].bytes_to_copy_);
- EXPECT_EQ(8 + size_of_string, contiguous_attrs[1].bytes_to_advance_);
+ EXPECT_EQ(8, contiguous_attrs[0].bytes_to_copy);
+ EXPECT_EQ(8 + size_of_string, contiguous_attrs[1].bytes_to_advance);
}
+
+ // Extra test 1 for merging: three consecutive integer attributes merged into
+ // one copy group.
+ CopyGroupList cg1;
+ dst_store->getCopyGroupsForAttributeMap(
+ { 0, 1, 2, kInvalidCatalogId, kInvalidCatalogId, kInvalidCatalogId },
+ &cg1);
+ cg1.mergeContiguous();
+
+ EXPECT_EQ(1u, cg1.contiguous_attrs.size());
+ EXPECT_EQ(0, cg1.contiguous_attrs[0].src_attr_id);
+ EXPECT_EQ(0u, cg1.contiguous_attrs[0].bytes_to_advance);
+ EXPECT_EQ(12u, cg1.contiguous_attrs[0].bytes_to_copy);
+
+ // Extra test 2 for merging: two consecutive integer attributes a0, a1 followed
+ // by an extra a1 attribute, merged into two copy groups.
+ CopyGroupList cg2;
+ dst_store->getCopyGroupsForAttributeMap(
+ { 0, 1, 1, kInvalidCatalogId, kInvalidCatalogId, kInvalidCatalogId },
+ &cg2);
+ cg2.mergeContiguous();
+
+ EXPECT_EQ(2u, cg2.contiguous_attrs.size());
+ EXPECT_EQ(0, cg2.contiguous_attrs[0].src_attr_id);
+ EXPECT_EQ(0u, cg2.contiguous_attrs[0].bytes_to_advance);
+ EXPECT_EQ(8u, cg2.contiguous_attrs[0].bytes_to_copy);
+ EXPECT_EQ(1, cg2.contiguous_attrs[1].src_attr_id);
+ EXPECT_EQ(8u, cg2.contiguous_attrs[1].bytes_to_advance);
+ EXPECT_EQ(4u, cg2.contiguous_attrs[1].bytes_to_copy);
}
TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTest) {