You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/05/04 04:13:29 UTC

[1/2] incubator-quickstep git commit: Dispatch BuildHashJoin and Aggregation based on block locality. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/fix-copy-group 21559eb61 -> 4677e7d48 (forced update)


Dispatch BuildHashJoin and Aggregation based on block locality.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ece7e424
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ece7e424
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ece7e424

Branch: refs/heads/fix-copy-group
Commit: ece7e424b5c43e91dbe6e52b1ca95312696e57d2
Parents: 0572f40
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Apr 11 00:26:40 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed May 3 12:50:03 2017 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  2 ++
 query_execution/ForemanDistributed.cpp        |  9 +++++++--
 query_execution/PolicyEnforcerDistributed.cpp |  8 ++++++++
 query_execution/PolicyEnforcerDistributed.hpp | 15 ++++++++++++--
 query_execution/QueryManagerDistributed.hpp   | 23 ++++++++++++++++++----
 5 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index eeed791..c74fa36 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -267,6 +267,7 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
                         quickstep_catalog_CatalogTypedefs
+                        quickstep_queryexecution_BlockLocator
                         quickstep_queryexecution_QueryContext
                         quickstep_queryexecution_QueryContext_proto
                         quickstep_queryexecution_QueryExecutionMessages_proto
@@ -277,6 +278,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_WorkOrderProtosContainer
                         quickstep_relationaloperators_RelationalOperator
                         quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_storage_StorageBlockInfo
                         quickstep_utility_DAG
                         quickstep_utility_Macros
                         tmb)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 81b5ec1..e5e0eee 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,10 +243,12 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
                                                        size_t *shiftboss_index_for_aggregation) {
   const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::aggregation_state_id aggr_state_index;
+  block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
     case S::AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+      block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
       break;
     case S::FINALIZE_AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
@@ -259,7 +261,8 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
-      proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
+      proto.query_id(), aggr_state_index, block_locator_, block, next_shiftboss_index_to_schedule,
+      shiftboss_index_for_aggregation);
 
   return true;
 }
@@ -270,11 +273,13 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::join_hash_table_id join_hash_table_index;
   partition_id part_id;
+  block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
     case S::BUILD_HASH:
       join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
       part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id);
+      block = work_order_proto.GetExtension(S::BuildHashWorkOrder::block_id);
       break;
     case S::HASH_JOIN:
       join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index);
@@ -289,7 +294,7 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
-      proto.query_id(), join_hash_table_index, part_id, next_shiftboss_index_to_schedule,
+      proto.query_id(), join_hash_table_index, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
       shiftboss_index_for_hash_join);
 
   return true;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 46a0972..36becf2 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -192,11 +192,15 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
 void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
     const std::size_t query_id,
     const QueryContext::aggregation_state_id aggr_state_index,
+    const BlockLocator &block_locator,
+    const block_id block,
     const std::size_t next_shiftboss_index_to_schedule,
     std::size_t *shiftboss_index) {
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+                                                 block_locator,
+                                                 block,
                                                  next_shiftboss_index_to_schedule,
                                                  shiftboss_index);
 }
@@ -205,12 +209,16 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
     const std::size_t query_id,
     const QueryContext::join_hash_table_id join_hash_table_index,
     const partition_id part_id,
+    const BlockLocator &block_locator,
+    const block_id block,
     const std::size_t next_shiftboss_index_to_schedule,
     std::size_t *shiftboss_index) {
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForHashJoin(join_hash_table_index,
                                               part_id,
+                                              block_locator,
+                                              block,
                                               next_shiftboss_index_to_schedule,
                                               shiftboss_index);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index fb46b39..cd3a434 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -47,6 +47,7 @@ class TaggedMessage;
 
 namespace quickstep {
 
+class BlockLocator;
 class CatalogDatabaseLite;
 class CatalogRelation;
 class QueryProcessor;
@@ -125,31 +126,39 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   /**
    * @brief Get or set the index of Shiftboss for an Aggregation related
    * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
-   * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+   * <shiftboss_index> will be set based on block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
    * has executed the first Aggregation.
    *
    * @param query_id The query id.
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForAggregation(
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
+      const BlockLocator &block_locator,
+      const block_id block,
       const std::size_t next_shiftboss_index_to_schedule,
       std::size_t *shiftboss_index);
 
   /**
    * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
    * If it is the first BuildHash on <join_hash_table_index, part_id>,
-   * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+   * <shiftboss_index> will be set to block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
    * has executed the first BuildHash.
    *
    * @param query_id The query id.
    * @param join_hash_table_index The Hash Table for the Join.
    * @param part_id The partition ID.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
@@ -157,6 +166,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
       const std::size_t query_id,
       const QueryContext::join_hash_table_id join_hash_table_index,
       const partition_id part_id,
+      const BlockLocator &block_locator,
+      const block_id block,
       const std::size_t next_shiftboss_index_to_schedule,
       std::size_t *shiftboss_index);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 3ebc434..6a47ce8 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -25,11 +25,13 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionState.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/address.h"
@@ -100,17 +102,23 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   /**
    * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
-   * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+   * the Shiftboss index is not found, set using the block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    *
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+                                       const BlockLocator &block_locator,
+                                       const block_id block,
                                        const std::size_t next_shiftboss_index_to_schedule,
                                        std::size_t *shiftboss_index) {
     DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size());
-    if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex) {
+    if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex &&
+        !block_locator.getBlockLocalityInfo(block, &shiftboss_indexes_for_aggrs_[aggr_state_index])) {
       shiftboss_indexes_for_aggrs_[aggr_state_index] = next_shiftboss_index_to_schedule;
     }
 
@@ -119,21 +127,28 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   /**
    * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
-   * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+   * Shiftboss index is not found, set using the block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    *
    * @param join_hash_table_index The Hash Table for the Join.
    * @param part_id The partition ID.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index,
                                     const partition_id part_id,
+                                    const BlockLocator &block_locator,
+                                    const block_id block,
                                     const std::size_t next_shiftboss_index_to_schedule,
                                     std::size_t *shiftboss_index) {
     DCHECK_LT(join_hash_table_index, shiftboss_indexes_for_hash_joins_.size());
     DCHECK_LT(part_id, shiftboss_indexes_for_hash_joins_[join_hash_table_index].size());
 
-    if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex) {
+    if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex &&
+        !block_locator.getBlockLocalityInfo(block,
+                                            &shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id])) {
       shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = next_shiftboss_index_to_schedule;
     }
 


[2/2] incubator-quickstep git commit: Fix a problem in CopyGroupList + minor style fixes.

Posted by ji...@apache.org.
Fix a problem in CopyGroupList + minor style fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4677e7d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4677e7d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4677e7d4

Branch: refs/heads/fix-copy-group
Commit: 4677e7d48fe9d17451feb748c698fd1b12b7eb4e
Parents: ece7e42
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue May 2 23:55:52 2017 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed May 3 23:13:17 2017 -0500

----------------------------------------------------------------------
 storage/SplitRowStoreTupleStorageSubBlock.cpp   | 169 ++++++++++---------
 storage/SplitRowStoreTupleStorageSubBlock.hpp   | 127 +++++++-------
 ...litRowStoreTupleStorageSubBlock_unittest.cpp |  80 ++++++---
 3 files changed, 206 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4677e7d4/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index ad583eb..0e5cfe6 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -244,14 +244,12 @@ tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertDispatcher(
 
   CopyGroupList copy_groups;
   getCopyGroupsForAttributeMap(attribute_map, &copy_groups);
-  auto impl = accessor->getImplementationType();
-  const bool is_rowstore_source = impl == ValueAccessor::Implementation::kSplitRowStore;
-  if (is_rowstore_source) {
-    copy_groups.merge_contiguous();
+  if (accessor->getImplementationType() == ValueAccessor::Implementation::kSplitRowStore) {
+    copy_groups.mergeContiguous();
   }
 
-  const bool copy_nulls = copy_groups.nullable_attrs_.size() > 0;
-  const bool copy_varlen = copy_groups.varlen_attrs_.size() > 0;
+  const bool copy_nulls = copy_groups.nullable_attrs.size() > 0;
+  const bool copy_varlen = copy_groups.varlen_attrs.size() > 0;
 
   if (fill_to_capacity) {
     if (relation_.hasNullableAttributes()) {
@@ -311,96 +309,101 @@ tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesImpl(
   std::size_t num_tuples_inserted = 0;
 
   // We only append to the end of the block to cut down on complexity.
-  char *tuple_slot = static_cast<char *>(tuple_storage_) +  header_->num_tuples * tuple_slot_bytes_;
+  char *tuple_slot = static_cast<char *>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_;
 
   std::uint32_t varlen_heap_offset = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
   std::uint32_t varlen_heap_offset_orig = varlen_heap_offset;
 
-  BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
-  char *fixed_len_cursor = tuple_slot + BitVector<true>::BytesNeeded(num_null_attrs_);
-
-
-
   std::size_t storage_available = tuple_storage_bytes_ -
                                     (header_->variable_length_bytes_allocated +
                                      header_->num_tuples * tuple_slot_bytes_);
 
+  const std::vector<ContiguousAttrs> &contiguous_attrs = copy_groups.contiguous_attrs;
+  const std::vector<VarLenAttr> &varlen_attrs = copy_groups.varlen_attrs;
+  const std::vector<NullableAttr> &nullable_attrs = copy_groups.nullable_attrs;
+
   // The number of bytes that must be reserved per tuple inserted due to gaps.
   std::size_t varlen_reserve = relation_.getMaximumVariableByteLength();
   if (fill_to_capacity) {
-    for (std::size_t vattr_idx = 0; vattr_idx < copy_groups.varlen_attrs_.size(); vattr_idx++) {
+    for (std::size_t vattr_idx = 0; vattr_idx < varlen_attrs.size(); vattr_idx++) {
       varlen_reserve -= relation_.getAttributeById(
-        copy_groups.varlen_attrs_[vattr_idx].dst_attr_id_)->getType().maximumByteLength();
+          varlen_attrs[vattr_idx].dst_attr_id)->getType().maximumByteLength();
     }
     DCHECK_GE(relation_.getMaximumVariableByteLength(), varlen_reserve);
   }
 
   InvokeOnAnyValueAccessor(
-    accessor,
-    [&](auto *accessor) -> void {  // NOLINT(build/c++11
-      do {
-        const std::size_t num_c_attr = copy_groups.contiguous_attrs_.size();
-        const std::size_t num_n_attr = copy_groups.nullable_attrs_.size();
-        const std::size_t num_v_attr = copy_groups.varlen_attrs_.size();
-
-        const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
-
-        while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
-          for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
-            const ContiguousAttrs &cattr = copy_groups.contiguous_attrs_[cattr_idx];
-            fixed_len_cursor += cattr.bytes_to_advance_;
-            const void *attr_value = accessor->template getUntypedValue<false>(cattr.src_attr_id_);
-            std::memcpy(fixed_len_cursor, attr_value, cattr.bytes_to_copy_);
-          }
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11
+    BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
+    const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
+
+    const std::size_t num_c_attr = contiguous_attrs.size();
+    const std::size_t num_n_attr = nullable_attrs.size();
+    const std::size_t num_v_attr = varlen_attrs.size();
+
+    do {
+      while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
+        char *attr_cursor = tuple_slot + nullmap_size;
+        for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
+          const ContiguousAttrs &cattr = contiguous_attrs[cattr_idx];
+          attr_cursor += cattr.bytes_to_advance;
+          const void *attr_value =
+              accessor->template getUntypedValue<false>(cattr.src_attr_id);
+          std::memcpy(attr_cursor, attr_value, cattr.bytes_to_copy);
+        }
 
-          if (copy_nulls) {
-            tuple_null_bitmap.setMemory(tuple_slot);
-            for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
-              const NullableAttr &nattr = copy_groups.nullable_attrs_[nattr_idx];
-              const void *attr_value = accessor->template getUntypedValue<true>(nattr.src_attr_id_);
-              if (attr_value == nullptr) {
-                tuple_null_bitmap.setBit(nattr.nullable_attr_idx_, true);
-              }
+        if (copy_nulls) {
+          tuple_null_bitmap.setMemory(tuple_slot);
+          for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
+            const NullableAttr &nattr = nullable_attrs[nattr_idx];
+            const void *attr_value =
+                accessor->template getUntypedValue<true>(nattr.src_attr_id);
+            if (attr_value == nullptr) {
+              tuple_null_bitmap.setBit(nattr.nullable_attr_idx, true);
             }
           }
+        }
 
-          if (copy_varlen) {
-            for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
-              const VarLenAttr &vattr = copy_groups.varlen_attrs_[vattr_idx];
-              fixed_len_cursor += vattr.bytes_to_advance_;
-              // Typed value is necessary as we need the length.
-              const TypedValue &attr_value = accessor->template getTypedValue(vattr.src_attr_id_);
-              if (attr_value.isNull()) {
-                continue;
-              }
-              const std::size_t attr_size = attr_value.getDataSize();
-              varlen_heap_offset -= attr_size;
-              std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset, attr_value.getDataPtr(),
-                          attr_size);
-              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[0] = varlen_heap_offset;
-              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[1] = static_cast<std::uint32_t>(attr_size);
+        if (copy_varlen) {
+          for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
+            const VarLenAttr &vattr = varlen_attrs[vattr_idx];
+            attr_cursor += vattr.bytes_to_advance;
+            // Typed value is necessary as we need the length.
+            const TypedValue &attr_value =
+                accessor->template getTypedValue(vattr.src_attr_id);
+            if (attr_value.isNull()) {
+              continue;
             }
+            const std::size_t attr_size = attr_value.getDataSize();
+            varlen_heap_offset -= attr_size;
+            std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset,
+                        attr_value.getDataPtr(),
+                        attr_size);
+            reinterpret_cast<std::uint32_t *>(attr_cursor)[0] = varlen_heap_offset;
+            reinterpret_cast<std::uint32_t *>(attr_cursor)[1] = static_cast<std::uint32_t>(attr_size);
           }
-          tuple_slot += tuple_slot_bytes_;
-          fixed_len_cursor = tuple_slot + nullmap_size;
-          num_tuples_inserted++;
         }
-        if (fill_to_capacity) {
-          std::int64_t remaining_storage_after_inserts = storage_available -
-                                                         (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
-                                                          (varlen_heap_offset_orig - varlen_heap_offset));
-          DCHECK_LE(0, remaining_storage_after_inserts);
-          std::size_t additional_tuples_insert =
-            remaining_storage_after_inserts / (tuple_slot_bytes_ + this->relation_.getMaximumByteLength());
-          // We want to avoid a situation where we have several short insert iterations
-          // near the end of an insertion cycle.
-          if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
-            max_num_tuples_to_insert += additional_tuples_insert;
-          }
+        tuple_slot += tuple_slot_bytes_;
+        num_tuples_inserted++;
+      }
+      if (fill_to_capacity) {
+        const std::int64_t remaining_storage_after_inserts =
+            storage_available -
+                (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
+                    (varlen_heap_offset_orig - varlen_heap_offset));
+        DCHECK_LE(0, remaining_storage_after_inserts);
+        const std::size_t additional_tuples_insert =
+          remaining_storage_after_inserts / (tuple_slot_bytes_ + this->relation_.getMaximumByteLength());
+        // We want to avoid a situation where we have several short insert iterations
+        // near the end of an insertion cycle.
+        if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
+          max_num_tuples_to_insert += additional_tuples_insert;
         }
-      } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
-               num_tuples_inserted < max_num_tuples_to_insert);
-    });
+      }
+    } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
+             num_tuples_inserted < max_num_tuples_to_insert);
+  });
 
   if (copy_varlen) {
     header_->variable_length_bytes_allocated += (varlen_heap_offset_orig - varlen_heap_offset);
@@ -879,16 +882,14 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
 // the SplitRow for actual data and the tuple contains pairs of (heap offset, length). Having to
 // copy varlen into the heap is the main difference from copying fixed length.
 void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
-  const std::vector<attribute_id> &attribute_map,
-  CopyGroupList *copy_groups) {
+    const std::vector<attribute_id> &attribute_map,
+    CopyGroupList *copy_groups) {
   DCHECK_EQ(attribute_map.size(), relation_.size());
 
-  attribute_id num_attrs = attribute_map.size();
-
   std::size_t contig_adv = 0;
   std::size_t varlen_adv = 0;
-  for (attribute_id attr_id = 0; attr_id < num_attrs; ++attr_id) {
-    attribute_id src_attr = attribute_map[attr_id];
+  for (std::size_t attr_id = 0; attr_id < attribute_map.size(); ++attr_id) {
+    const attribute_id src_attr = attribute_map[attr_id];
 
     // Attribute doesn't exist in src.
     if (src_attr == kInvalidCatalogId) {
@@ -906,23 +907,23 @@ void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
     // Attribute exists in src.
     if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
       // fixed len
-      copy_groups->contiguous_attrs_.push_back(
-        ContiguousAttrs(src_attr, fixed_len_attr_sizes_[attr_id], contig_adv));
+      copy_groups->contiguous_attrs.emplace_back(
+          src_attr, fixed_len_attr_sizes_[attr_id], contig_adv);
       contig_adv = fixed_len_attr_sizes_[attr_id];
     } else {
       // var len
-      copy_groups->varlen_attrs_.push_back(VarLenAttr(src_attr, attr_id, varlen_adv));
+      copy_groups->varlen_attrs.emplace_back(src_attr, attr_id, varlen_adv);
       varlen_adv = SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize;
     }
 
     if (relation_.getNullableAttributeIndex(attr_id) != -1) {
-      copy_groups->nullable_attrs_.push_back(
-        NullableAttr(src_attr, relation_.getNullableAttributeIndex(attr_id)));
+      copy_groups->nullable_attrs.emplace_back(
+          src_attr, relation_.getNullableAttributeIndex(attr_id));
     }
   }
   // This will point us to the beginning of the varlen zone.
-  if (copy_groups->varlen_attrs_.size() > 0) {
-    copy_groups->varlen_attrs_[0].bytes_to_advance_ += contig_adv;
+  if (copy_groups->varlen_attrs.size() > 0) {
+    copy_groups->varlen_attrs[0].bytes_to_advance += contig_adv;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4677e7d4/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index 67d36fe..ee2dc27 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -79,7 +79,7 @@ namespace splitrow_internal {
 // we will create the following ContiguousAttrs copy groups
 //
 //  ----------------------------------------------------
-//  |src_id_      |bytes_to_advance_| bytes_to_copy_   |
+//  |src_id       |bytes_to_advance |bytes_to_copy     |
 //  |-------------|-----------------|------------------|
 //  |            0|                4|                 4|
 //  |            5|                4|                12|
@@ -87,7 +87,7 @@ namespace splitrow_internal {
 //  |            4|                4|                 4|
 //  |            9|                4|                 8|
 //  ----------------------------------------------------
-// and two NullableAttrs with src_attr_id_ set to 4 and 7.
+// and two NullableAttrs with src_attr_id set to 4 and 7.
 //
 // In this example, we do 6 memcpy calls and 6 address calculations
 // as well as 2 bitvector lookups for each tuple. A naive copy algorithm
@@ -105,86 +105,91 @@ namespace splitrow_internal {
 // we do all the variable length copies after the fixed length copies.
 //
 struct CopyGroup {
-  attribute_id src_attr_id_;  // The attr_id of starting input attribute for run.
-
   explicit CopyGroup(const attribute_id source_attr_id)
-    : src_attr_id_(source_attr_id) {}
+      : src_attr_id(source_attr_id) {}
+
+  // The id of starting input attribute for run.
+  const attribute_id src_attr_id;
 };
 
 struct ContiguousAttrs : public CopyGroup {
-  std::size_t bytes_to_advance_;  // Number of bytes to advance destination ptr
-                                  // to get to the location where we copy THIS attribute.
-  std::size_t bytes_to_copy_;     // Number of bytes to copy from source.
-
-  ContiguousAttrs(
-    const attribute_id source_attr_id,
-    const std::size_t bytes_to_copy,
-    const std::size_t bytes_to_advance)
-    : CopyGroup(source_attr_id),
-      bytes_to_advance_(bytes_to_advance),
-      bytes_to_copy_(bytes_to_copy) { }
+  ContiguousAttrs(const attribute_id source_attr_id,
+                  const std::size_t bytes_to_copy_in,
+                  const std::size_t bytes_to_advance_in)
+      : CopyGroup(source_attr_id),
+        bytes_to_advance(bytes_to_advance_in),
+        bytes_to_copy(bytes_to_copy_in) {}
+
+  // Number of bytes to advance destination ptr to get to the location where we
+  // copy THIS attribute.
+  std::size_t bytes_to_advance;
+
+  // Number of bytes to copy from source.
+  std::size_t bytes_to_copy;
 };
 
 struct VarLenAttr : public CopyGroup {
-  std::size_t bytes_to_advance_;
-  attribute_id dst_attr_id_;
   VarLenAttr(const attribute_id source_attr_id,
-             const attribute_id dst_attr_id,
-             const std::size_t bytes_to_advance)
-    : CopyGroup(source_attr_id),
-      bytes_to_advance_(bytes_to_advance),
-      dst_attr_id_(dst_attr_id) {}
+             const attribute_id destination_attr_id,
+             const std::size_t bytes_to_advance_in)
+      : CopyGroup(source_attr_id),
+        dst_attr_id(destination_attr_id),
+        bytes_to_advance(bytes_to_advance_in) {}
+
+  const attribute_id dst_attr_id;
+  std::size_t bytes_to_advance;
 };
 
 struct NullableAttr : public CopyGroup {
-  int nullable_attr_idx_;  // index into null bitmap
+  NullableAttr(const attribute_id source_attr_id,
+               const std::size_t nullable_attr_idx_in)
+      : CopyGroup(source_attr_id),
+        nullable_attr_idx(nullable_attr_idx_in) {}
 
-  NullableAttr(attribute_id source_attr_id_,
-               int nullable_attr_idx)
-    : CopyGroup(source_attr_id_),
-      nullable_attr_idx_(nullable_attr_idx) {}
+  // Index into null bitmap.
+  const std::size_t nullable_attr_idx;
 };
 
 struct CopyGroupList {
-  CopyGroupList()
-    : contiguous_attrs_(),
-      nullable_attrs_(),
-      varlen_attrs_() {}
+  CopyGroupList() {}
 
   /**
    * @brief Attributes which are exactly sequential are merged to a single copy.
    */
-  void merge_contiguous() {
-    if (contiguous_attrs_.size() < 2) {
+  void mergeContiguous() {
+    if (contiguous_attrs.size() < 2) {
       return;
     }
 
     int add_to_advance = 0;
-    for (std::size_t idx = 1; idx < contiguous_attrs_.size(); ++idx) {
-      ContiguousAttrs *current_attr = &contiguous_attrs_[idx];
-      ContiguousAttrs *previous_attr = &contiguous_attrs_[idx - 1];
-      if (add_to_advance > 0) {
-        current_attr->bytes_to_advance_ += add_to_advance;
+
+    std::vector<ContiguousAttrs> merged_attrs;
+    merged_attrs.emplace_back(contiguous_attrs.front());
+    for (std::size_t idx = 1; idx < contiguous_attrs.size(); ++idx) {
+      const ContiguousAttrs &current_attr = contiguous_attrs[idx];
+      const ContiguousAttrs &previous_attr = contiguous_attrs[idx - 1];
+
+      if (previous_attr.src_attr_id + 1 == current_attr.src_attr_id &&
+          previous_attr.bytes_to_copy == current_attr.bytes_to_advance) {
+        merged_attrs.back().bytes_to_copy += current_attr.bytes_to_copy;
+        add_to_advance += current_attr.bytes_to_advance;
+      } else {
+        merged_attrs.emplace_back(current_attr.src_attr_id,
+                                  current_attr.bytes_to_copy,
+                                  current_attr.bytes_to_advance + add_to_advance);
         add_to_advance = 0;
       }
-      // The merge step:
-      if (previous_attr->src_attr_id_ + 1 == current_attr->src_attr_id_ &&
-            previous_attr->bytes_to_copy_ == current_attr->bytes_to_advance_) {
-        previous_attr->bytes_to_copy_ += current_attr->bytes_to_copy_;
-        add_to_advance += current_attr->bytes_to_advance_;
-        contiguous_attrs_.erase(contiguous_attrs_.begin() + idx);
-        idx--;
-      }
     }
+    contiguous_attrs = std::move(merged_attrs);
 
-    if (varlen_attrs_.size() > 0) {
-      varlen_attrs_[0].bytes_to_advance_ += add_to_advance;
+    if (varlen_attrs.size() > 0) {
+      varlen_attrs[0].bytes_to_advance += add_to_advance;
     }
   }
 
-  std::vector<ContiguousAttrs> contiguous_attrs_;
-  std::vector<NullableAttr> nullable_attrs_;
-  std::vector<VarLenAttr> varlen_attrs_;
+  std::vector<ContiguousAttrs> contiguous_attrs;
+  std::vector<NullableAttr> nullable_attrs;
+  std::vector<VarLenAttr> varlen_attrs;
 };
 
 }  // namespace splitrow_internal
@@ -368,19 +373,19 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
 
   template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
   tuple_id bulkInsertPartialTuplesImpl(
-    const splitrow_internal::CopyGroupList &copy_groups,
-    ValueAccessor *accessor,
-    std::size_t max_num_tuples_to_insert);
+      const splitrow_internal::CopyGroupList &copy_groups,
+      ValueAccessor *accessor,
+      std::size_t max_num_tuples_to_insert);
 
   tuple_id bulkInsertDispatcher(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    tuple_id max_num_tuples_to_insert,
-    bool finalize);
+      const std::vector<attribute_id> &attribute_map,
+      ValueAccessor *accessor,
+      tuple_id max_num_tuples_to_insert,
+      bool finalize);
 
   void getCopyGroupsForAttributeMap(
-    const std::vector<attribute_id> &attribute_map,
-    splitrow_internal::CopyGroupList *copy_groups);
+      const std::vector<attribute_id> &attribute_map,
+      splitrow_internal::CopyGroupList *copy_groups);
 
   std::size_t getInsertLowerBound() const;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4677e7d4/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
index 9270d93..bc7e65e 100644
--- a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
+++ b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
@@ -761,40 +761,41 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest)
   CopyGroupList copy_groups;
   dst_store->getCopyGroupsForAttributeMap(attr_map, &copy_groups);
 
-  std::vector<ContiguousAttrs>& contiguous_attrs = copy_groups.contiguous_attrs_;
-  std::vector<VarLenAttr>& varlen_attrs = copy_groups.varlen_attrs_;
+  const std::vector<ContiguousAttrs> &contiguous_attrs = copy_groups.contiguous_attrs;
+  const std::vector<VarLenAttr> &varlen_attrs = copy_groups.varlen_attrs;
 
   const std::size_t size_of_string = dst_store->getRelation().getAttributeById(3)->getType().maximumByteLength();
 
   // Fixed length attributes.
-  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
-  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
-  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_copy_);
+  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_copy);
 
-  EXPECT_EQ(1, contiguous_attrs[1].src_attr_id_);
-  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_advance_);
-  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_copy_);
+  EXPECT_EQ(1, contiguous_attrs[1].src_attr_id);
+  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_advance);
+  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_copy);
 
   if (testVariableLength()) {
     ASSERT_EQ(2, contiguous_attrs.size());
     ASSERT_EQ(2, varlen_attrs.size());
 
-    EXPECT_EQ(2, varlen_attrs[0].src_attr_id_);
-    EXPECT_EQ(sizeof(int) + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[0].bytes_to_advance_);
+    EXPECT_EQ(2, varlen_attrs[0].src_attr_id);
+    EXPECT_EQ(sizeof(int) + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize,
+              varlen_attrs[0].bytes_to_advance);
 
-    EXPECT_EQ(1, varlen_attrs[1].src_attr_id_);
-    EXPECT_EQ(SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[1].bytes_to_advance_);
+    EXPECT_EQ(1, varlen_attrs[1].src_attr_id);
+    EXPECT_EQ(SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[1].bytes_to_advance);
 
   } else {
-    ASSERT_EQ(4, copy_groups.contiguous_attrs_.size());
-    ASSERT_EQ(0, copy_groups.varlen_attrs_.size());
+    ASSERT_EQ(4, copy_groups.contiguous_attrs.size());
+    ASSERT_EQ(0, copy_groups.varlen_attrs.size());
 
-    EXPECT_EQ(2, contiguous_attrs[2].src_attr_id_);
-    EXPECT_EQ(4 + size_of_string, contiguous_attrs[2].bytes_to_advance_);
-    EXPECT_EQ(size_of_string, contiguous_attrs[2].bytes_to_copy_);
+    EXPECT_EQ(2, contiguous_attrs[2].src_attr_id);
+    EXPECT_EQ(4 + size_of_string, contiguous_attrs[2].bytes_to_advance);
+    EXPECT_EQ(size_of_string, contiguous_attrs[2].bytes_to_copy);
   }
 
-  int null_count =  copy_groups.nullable_attrs_.size();
+  int null_count =  copy_groups.nullable_attrs.size();
   if (testNullable()) {
     // The relation contains 6 nullable attributes, but only 3 are inserted.
     EXPECT_EQ(4, null_count);
@@ -802,20 +803,49 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest)
     EXPECT_EQ(0, null_count);
   }
 
-  // test that merging works.
-  copy_groups.merge_contiguous();
-  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
-  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
+  // Test that merging works.
+  copy_groups.mergeContiguous();
+  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance);
 
   if (testVariableLength()) {
     EXPECT_EQ(1, contiguous_attrs.size());
     EXPECT_EQ(sizeof(int) * 2 + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize,
-              varlen_attrs[0].bytes_to_advance_);
+              varlen_attrs[0].bytes_to_advance);
   } else {
     EXPECT_EQ(3, contiguous_attrs.size());
-    EXPECT_EQ(8, contiguous_attrs[0].bytes_to_copy_);
-    EXPECT_EQ(8 + size_of_string, contiguous_attrs[1].bytes_to_advance_);
+    EXPECT_EQ(8, contiguous_attrs[0].bytes_to_copy);
+    EXPECT_EQ(8 + size_of_string, contiguous_attrs[1].bytes_to_advance);
   }
+
+  // Extra test 1 for merging: three consecutive integer attributes merged into
+  // one copy group.
+  CopyGroupList cg1;
+  dst_store->getCopyGroupsForAttributeMap(
+      { 0, 1, 2, kInvalidCatalogId, kInvalidCatalogId, kInvalidCatalogId },
+      &cg1);
+  cg1.mergeContiguous();
+
+  EXPECT_EQ(1u, cg1.contiguous_attrs.size());
+  EXPECT_EQ(0, cg1.contiguous_attrs[0].src_attr_id);
+  EXPECT_EQ(0u, cg1.contiguous_attrs[0].bytes_to_advance);
+  EXPECT_EQ(12u, cg1.contiguous_attrs[0].bytes_to_copy);
+
+  // Extra test 2 for merging: two consecutive integer attributes a0, a1 followed
+  // by an extra a1 attribute, merged into two copy groups.
+  CopyGroupList cg2;
+  dst_store->getCopyGroupsForAttributeMap(
+      { 0, 1, 1, kInvalidCatalogId, kInvalidCatalogId, kInvalidCatalogId },
+      &cg2);
+  cg2.mergeContiguous();
+
+  EXPECT_EQ(2u, cg2.contiguous_attrs.size());
+  EXPECT_EQ(0, cg2.contiguous_attrs[0].src_attr_id);
+  EXPECT_EQ(0u, cg2.contiguous_attrs[0].bytes_to_advance);
+  EXPECT_EQ(8u, cg2.contiguous_attrs[0].bytes_to_copy);
+  EXPECT_EQ(1, cg2.contiguous_attrs[1].src_attr_id);
+  EXPECT_EQ(8u, cg2.contiguous_attrs[1].bytes_to_advance);
+  EXPECT_EQ(4u, cg2.contiguous_attrs[1].bytes_to_copy);
 }
 
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTest) {