You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/19 05:52:09 UTC

incubator-quickstep git commit: Refectored bulk insertion to the SplitRow store

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 55480d8d1 -> d3a092059


Refectored bulk insertion to the SplitRow store

The inner loop of the insert algorithm has been changed to reduce
function calls to only those that are absolutely necessary. Also, we
merge copies which come from other rowstore source, speeding up
insertion time.

Also adds support for the idea of 'partial inserts'. Partial
inserts are when you are only inserting a subset of the columns at a
time. Partial inserts will be used in a later commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d3a09205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d3a09205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d3a09205

Branch: refs/heads/master
Commit: d3a0920595e4c40ba5e86907359b6e254b5b4958
Parents: 55480d8
Author: cramja <ma...@gmail.com>
Authored: Wed Oct 5 16:40:30 2016 -0500
Committer: jianqiao <ji...@node-2.jianqiao.quickstep-pg0.wisc.cloudlab.us>
Committed: Wed Oct 19 00:32:08 2016 -0500

----------------------------------------------------------------------
 storage/SplitRowStoreTupleStorageSubBlock.cpp   | 691 +++++++++----------
 storage/SplitRowStoreTupleStorageSubBlock.hpp   | 186 +++++
 ...litRowStoreTupleStorageSubBlock_unittest.cpp | 449 ++++++++++--
 utility/BitVector.hpp                           |  14 +
 4 files changed, 906 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index f955c99..068e975 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -41,54 +41,61 @@ namespace quickstep {
 
 QUICKSTEP_REGISTER_TUPLE_STORE(SplitRowStoreTupleStorageSubBlock, SPLIT_ROW_STORE);
 
+using splitrow_internal::CopyGroupList;
+using splitrow_internal::ContiguousAttrs;
+using splitrow_internal::NullableAttr;
+using splitrow_internal::VarLenAttr;
+
+const std::size_t SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize = sizeof(std::uint32_t) * 2;
+
 namespace {
 
-template <typename ValueAccessorT, bool nullable_attrs>
-inline std::size_t CalculateVariableSize(
+  template<typename ValueAccessorT, bool nullable_attrs>
+  inline std::size_t CalculateVariableSize(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor) {
-  std::size_t total_size = 0;
-  attribute_id accessor_attr_id = 0;
-  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-       attr_it != relation.end();
-       ++attr_it, ++accessor_attr_id) {
-    if (!attr_it->getType().isVariableLength()) {
-      continue;
-    }
+    std::size_t total_size = 0;
+    attribute_id accessor_attr_id = 0;
+    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+         attr_it != relation.end();
+         ++attr_it, ++accessor_attr_id) {
+      if (!attr_it->getType().isVariableLength()) {
+        continue;
+      }
 
-    TypedValue value(accessor.getTypedValue(accessor_attr_id));
-    if (nullable_attrs && value.isNull()) {
-      continue;
+      TypedValue value(accessor.getTypedValue(accessor_attr_id));
+      if (nullable_attrs && value.isNull()) {
+        continue;
+      }
+      total_size += value.getDataSize();
     }
-    total_size += value.getDataSize();
+    return total_size;
   }
-  return total_size;
-}
 
-template <typename ValueAccessorT, bool nullable_attrs>
-inline std::size_t CalculateVariableSizeWithRemappedAttributes(
+  template<typename ValueAccessorT, bool nullable_attrs>
+  inline std::size_t CalculateVariableSizeWithRemappedAttributes(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor,
     const std::vector<attribute_id> &attribute_map) {
-  std::size_t total_size = 0;
-  std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-       attr_it != relation.end();
-       ++attr_it, ++attr_map_it) {
-    if (!attr_it->getType().isVariableLength()) {
-      continue;
-    }
+    std::size_t total_size = 0;
+    std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+         attr_it != relation.end();
+         ++attr_it, ++attr_map_it) {
+      if (!attr_it->getType().isVariableLength()) {
+        continue;
+      }
 
-    TypedValue value(accessor.getTypedValue(*attr_map_it));
-    if (nullable_attrs && value.isNull()) {
-      continue;
+      TypedValue value(accessor.getTypedValue(*attr_map_it));
+      if (nullable_attrs && value.isNull()) {
+        continue;
+      }
+      total_size += value.getDataSize();
     }
-    total_size += value.getDataSize();
+    return total_size;
   }
-  return total_size;
-}
 
-}  // namespace
+}  // anonymous namespace
 
 SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
     const CatalogRelationSchema &relation,
@@ -101,7 +108,10 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                            new_block,
                            sub_block_memory,
                            sub_block_memory_size),
-      header_(static_cast<Header*>(sub_block_memory)) {
+      header_(static_cast<Header*>(sub_block_memory)),
+      num_null_attrs_(0),
+      num_fixed_attrs_(0),
+      num_var_attrs_(0) {
   if (!DescriptionIsValid(relation_, description_)) {
     FATAL_ERROR("Attempted to construct a SplitRowStoreTupleStorageSubBlock from an invalid description.");
   }
@@ -143,6 +153,21 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                    + sizeof(Header) + occupancy_bitmap_bytes_;
   tuple_storage_bytes_ = sub_block_memory_size_ - (sizeof(Header) + occupancy_bitmap_bytes_);
 
+  // Some accounting information for bulk inserts.
+  for (attribute_id attr_id = 0;
+       attr_id < static_cast<attribute_id>(relation.size());
+       ++attr_id) {
+    const Type& attr_type = relation.getAttributeById(attr_id)->getType();
+    if (attr_type.isVariableLength()) {
+      fixed_len_attr_sizes_.push_back(kInvalidAttributeID);
+      num_var_attrs_++;
+    } else {
+      fixed_len_attr_sizes_.push_back(attr_type.maximumByteLength());
+      num_fixed_attrs_++;
+    }
+    num_null_attrs_ += attr_type.isNullable();
+  }
+
   if (new_block) {
     // Only need to initialize these fields, the rest of the block will be
     // zeroed-out by the StorageManager.
@@ -194,380 +219,217 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
 }
 
 tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(ValueAccessor *accessor) {
-  const tuple_id original_num_tuples = header_->num_tuples;
-  tuple_id pos = 0;
+  std::vector<attribute_id> simple_remap;
+  for (attribute_id attr_id = 0;
+      attr_id < static_cast<attribute_id>(relation_.size());
+      ++attr_id) {
+    simple_remap.push_back(attr_id);
+  }
+  return bulkInsertDispatcher(simple_remap, accessor, kCatalogMaxID, true);
+}
 
-  InvokeOnAnyValueAccessor(
-      accessor,
-      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuples(
+  const std::vector<attribute_id> &attribute_map,
+  ValueAccessor *accessor,
+  const tuple_id max_num_tuples_to_insert) {
+  return bulkInsertDispatcher(attribute_map, accessor, max_num_tuples_to_insert, false);
+}
+
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertDispatcher(
+  const std::vector<attribute_id> &attribute_map,
+  ValueAccessor *accessor,
+  tuple_id max_num_tuples_to_insert,
+  bool finalize) {
+  const bool fill_to_capacity = max_num_tuples_to_insert == kCatalogMaxID;
+
+  CopyGroupList copy_groups;
+  getCopyGroupsForAttributeMap(attribute_map, &copy_groups);
+  auto impl = accessor->getImplementationType();
+  const bool is_rowstore_source =
+    (impl == ValueAccessor::Implementation::kPackedRowStore ||
+     impl == ValueAccessor::Implementation::kSplitRowStore);
+  if (is_rowstore_source) {
+    copy_groups.merge_contiguous();
+  }
+
+  const bool copy_nulls = copy_groups.nullable_attrs_.size() > 0;
+  const bool copy_varlen = copy_groups.varlen_attrs_.size() > 0;
+
+  if (fill_to_capacity) {
     if (relation_.hasNullableAttributes()) {
-      if (relation_.isVariableLength()) {
-        while (accessor->next()) {
-          // If packed, insert at the end of the slot array, otherwise find the
-          // first hole.
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSize<decltype(*accessor), true>(relation_, *accessor);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
-          }
-          // Allocate variable-length storage.
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          // Find the slot and locate its sub-structures.
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          // Start writing variable-length data at the beginning of the newly
-          // allocated range.
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
-            if ((nullable_idx != -1) && (attr_value.isNull())) {
-              // Set null bit and move on.
-              tuple_null_bitmap.setBit(nullable_idx, true);
-              continue;
-            }
-            if (variable_idx != -1) {
-              // Write offset and size into the slot, then copy the actual
-              // value into the variable-length storage region.
-              const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              // Copy fixed-length value directly into the slot.
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
-            }
-          }
-          // Update occupancy bitmap and header.
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+      // TODO(marc) This is an annoying gotcha: the insertion loop assumes the null
+      // bitmaps are zero'd for a fresh insert. We could clear the bit map on each tuple
+      // iteration, but that'd be costlier.
+      std::int64_t remaining_bytes = tuple_storage_bytes_ -
+                                     (header_->variable_length_bytes_allocated +
+                                      (header_->num_tuples * tuple_slot_bytes_));
+      memset(static_cast<char *>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_, 0x0, remaining_bytes);
+    }
+  }
+
+  tuple_id num_inserted = 0;
+  if (max_num_tuples_to_insert == kCatalogMaxID) {
+    max_num_tuples_to_insert = getInsertLowerBound();
+  }
+  if (copy_varlen) {
+    if (copy_nulls) {
+      if (fill_to_capacity) {
+        num_inserted = bulkInsertPartialTuplesImpl<true, true, true>(copy_groups, accessor,
+                                                                     max_num_tuples_to_insert);
       } else {
-        // Same as above, but skip variable-length checks.
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            if (nullable_idx != -1) {
-              const void *attr_value = accessor->template getUntypedValue<true>(accessor_attr_id);
-              if (attr_value == nullptr) {
-                tuple_null_bitmap.setBit(nullable_idx, true);
-              } else {
-                std::memcpy(fixed_length_attr_storage
-                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                            attr_value,
-                            attr_it->getType().maximumByteLength());
-              }
-            } else {
-              const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
-              std::memcpy(fixed_length_attr_storage
-                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                          attr_value,
-                          attr_it->getType().maximumByteLength());
-            }
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+        num_inserted = bulkInsertPartialTuplesImpl<true, true, false>(copy_groups, accessor,
+                                                                      max_num_tuples_to_insert);
       }
     } else {
-      if (relation_.isVariableLength()) {
-        // Same as most general case above, but skip null checks.
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSize<decltype(*accessor), false>(relation_, *accessor);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
-          }
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
-            if (variable_idx != -1) {
-              const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
-            }
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+      if (fill_to_capacity) {
+        num_inserted = bulkInsertPartialTuplesImpl<false, true, true>(copy_groups, accessor,
+                                                                      max_num_tuples_to_insert);
       } else {
-        // Simplest case: skip both null and variable-length checks.
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
-            std::memcpy(fixed_length_attr_storage
-                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                        attr_value,
-                        attr_it->getType().maximumByteLength());
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+        num_inserted = bulkInsertPartialTuplesImpl<false, true, false>(copy_groups, accessor,
+                                                                       max_num_tuples_to_insert);
       }
     }
-  });
+  } else {
+    if (copy_nulls) {
+      num_inserted = bulkInsertPartialTuplesImpl<true, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
+    } else {
+      num_inserted = bulkInsertPartialTuplesImpl<false, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
+    }
+  }
 
-  return header_->num_tuples - original_num_tuples;
+  if (finalize) {
+    bulkInsertPartialTuplesFinalize(num_inserted);
+  }
+  return num_inserted;
 }
 
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor) {
-  DEBUG_ASSERT(attribute_map.size() == relation_.size());
-  const tuple_id original_num_tuples = header_->num_tuples;
-  tuple_id pos = 0;
+// copy_nulls is true if the incoming attributes include at least one nullable attribute
+// copy_varlen is true if the incoming attributes include at least one varlen attribute
+template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesImpl(
+  const CopyGroupList &copy_groups,
+  ValueAccessor *accessor,
+  std::size_t max_num_tuples_to_insert) {
+  std::size_t num_tuples_inserted = 0;
+
+  // We only append to the end of the block to cut down on complexity.
+  char *tuple_slot = static_cast<char *>(tuple_storage_) +  header_->num_tuples * tuple_slot_bytes_;
+
+  std::uint32_t varlen_heap_offset = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+  std::uint32_t varlen_heap_offset_orig = varlen_heap_offset;
+
+  BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
+  char *fixed_len_cursor = tuple_slot + BitVector<true>::BytesNeeded(num_null_attrs_);
+
+
+
+  std::size_t storage_available = tuple_storage_bytes_ -
+                                    (header_->variable_length_bytes_allocated +
+                                     header_->num_tuples * tuple_slot_bytes_);
+
+  // The number of bytes that must be reserved per tuple inserted due to gaps.
+  std::size_t varlen_reserve = relation_.getMaximumVariableByteLength();
+  if (fill_to_capacity) {
+    for (std::size_t vattr_idx = 0; vattr_idx < copy_groups.varlen_attrs_.size(); vattr_idx++) {
+      varlen_reserve -= relation_.getAttributeById(
+        copy_groups.varlen_attrs_[vattr_idx].dst_attr_id_)->getType().maximumByteLength();
+    }
+  }
 
   InvokeOnAnyValueAccessor(
-      accessor,
-      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-    if (relation_.hasNullableAttributes()) {
-      if (relation_.isVariableLength()) {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), true>(
-                  relation_, *accessor, attribute_map);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
+    accessor,
+    [&](auto *accessor) -> void {  // NOLINT(build/c++11
+      do {
+        const std::size_t num_c_attr = copy_groups.contiguous_attrs_.size();
+        const std::size_t num_n_attr = copy_groups.nullable_attrs_.size();
+        const std::size_t num_v_attr = copy_groups.varlen_attrs_.size();
+
+        const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
+
+        while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
+          for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
+            const ContiguousAttrs &cattr = copy_groups.contiguous_attrs_[cattr_idx];
+            fixed_len_cursor += cattr.bytes_to_advance_;
+            const void *attr_value = accessor->template getUntypedValue<false>(cattr.src_attr_id_);
+            std::memcpy(fixed_len_cursor, attr_value, cattr.bytes_to_copy_);
           }
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
-            if ((nullable_idx != -1) && (attr_value.isNull())) {
-              tuple_null_bitmap.setBit(nullable_idx, true);
-              continue;
-            }
-            if (variable_idx != -1) {
-              const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
-            }
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
-      } else {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            if (nullable_idx != -1) {
-              const void *attr_value = accessor->template getUntypedValue<true>(*attr_map_it);
+
+          if (copy_nulls) {
+            tuple_null_bitmap.setMemory(tuple_slot);
+            for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
+              const NullableAttr &nattr = copy_groups.nullable_attrs_[nattr_idx];
+              const void *attr_value = accessor->template getUntypedValue<true>(nattr.src_attr_id_);
               if (attr_value == nullptr) {
-                tuple_null_bitmap.setBit(nullable_idx, true);
-              } else {
-                std::memcpy(fixed_length_attr_storage
-                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                            attr_value,
-                            attr_it->getType().maximumByteLength());
+                tuple_null_bitmap.setBit(nattr.nullable_attr_idx_, true);
               }
-            } else {
-              const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
-              std::memcpy(fixed_length_attr_storage
-                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                          attr_value,
-                          attr_it->getType().maximumByteLength());
             }
           }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
-      }
-    } else {
-      if (relation_.isVariableLength()) {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), false>(
-                  relation_, *accessor, attribute_map);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
-          }
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
-            if (variable_idx != -1) {
+
+          if (copy_varlen) {
+            for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
+              const VarLenAttr &vattr = copy_groups.varlen_attrs_[vattr_idx];
+              fixed_len_cursor += vattr.bytes_to_advance_;
+              // Typed value is necessary as we need the length.
+              const TypedValue &attr_value = accessor->template getTypedValue(vattr.src_attr_id_);
+              if (attr_value.isNull()) {
+                continue;
+              }
               const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+              varlen_heap_offset -= attr_size;
+              std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset, attr_value.getDataPtr(),
+                          attr_size);
+              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[0] = varlen_heap_offset;
+              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[1] = static_cast<std::uint32_t>(attr_size);
             }
           }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
+          tuple_slot += tuple_slot_bytes_;
+          fixed_len_cursor = tuple_slot + nullmap_size;
+          num_tuples_inserted++;
         }
-      } else {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
-            std::memcpy(fixed_length_attr_storage
-                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                        attr_value,
-                        attr_it->getType().maximumByteLength());
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
+        if (fill_to_capacity) {
+          std::int64_t remaining_storage_after_inserts = storage_available -
+                                                         (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
+                                                          (varlen_heap_offset_orig - varlen_heap_offset));
+          DCHECK_LE(0, remaining_storage_after_inserts);
+          std::size_t additional_tuples_insert =
+            remaining_storage_after_inserts / this->relation_.getMaximumByteLength();
+          // We want to avoid a situation where we have several short insert iterations
+          // near the end of an insertion cycle.
+          if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
+            max_num_tuples_to_insert += additional_tuples_insert;
           }
         }
-      }
-    }
-  });
+      } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
+               num_tuples_inserted < max_num_tuples_to_insert);
+    });
+
+  if (copy_varlen) {
+    header_->variable_length_bytes_allocated += (varlen_heap_offset_orig - varlen_heap_offset);
+  }
 
-  return header_->num_tuples - original_num_tuples;
+  return num_tuples_inserted;
+}
+
+void SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesFinalize(
+    const tuple_id num_tuples_inserted) {
+  occupancy_bitmap_->setBitRange(header_->max_tid + 1, num_tuples_inserted, true);
+  header_->num_tuples += num_tuples_inserted;
+  header_->max_tid += num_tuples_inserted;
+}
+
+std::size_t SplitRowStoreTupleStorageSubBlock::getInsertLowerBound() const {
+  const std::size_t remaining_storage_bytes = tuple_storage_bytes_ -
+                                              (header_->variable_length_bytes_allocated +
+                                               ((header_->max_tid + 1) * tuple_slot_bytes_));
+  const std::size_t tuple_max_size = tuple_slot_bytes_ + relation_.getMaximumVariableByteLength();
+  return remaining_storage_bytes / tuple_max_size;
+}
+
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor) {
+  DCHECK_EQ(relation_.size(), attribute_map.size());
+  return bulkInsertDispatcher(attribute_map, accessor, kCatalogMaxID, true);
 }
 
 const void* SplitRowStoreTupleStorageSubBlock::getAttributeValue(
@@ -1002,4 +864,67 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
   return InsertResult(pos, false);
 }
 
+// Copy groups are used by insert algorithms to efficiently copy attributes from a
+// variety of source schemas with some matching attributes in the destination (this) store.
+// SplitRow has 3 distinct zones which define a physical tuple:
+//    [null_bitmap] [fixed_length_zone] [var_len_pairs]
+// When we do our insert algorithm, we first copy over fixed length attributes. Since there
+// can be gaps, and reorderings in the source schema, we need to know:
+//    * Where to copy the src attr into (ie offset from start of fixed_len_zone)
+//    * How many bytes to copy
+//    * Which src attr we are copying
+// When copying fixed length attributes, we calculate the offset into our tuple, do a memcpy for
+// the length of the data with the src attribute.
+//
+// Copying variable length attributes pairs is similar. Note that there is a heap at the end of
+// the SplitRow for actual data and the tuple contains pairs of (heap offset, length). Having to
+// copy varlen into the heap is the main difference from copying fixed length.
+void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
+  const std::vector<attribute_id> &attribute_map,
+  CopyGroupList *copy_groups) {
+  DCHECK_EQ(attribute_map.size(), relation_.size());
+
+  attribute_id num_attrs = attribute_map.size();
+
+  std::size_t contig_adv = 0;
+  std::size_t varlen_adv = 0;
+  for (attribute_id attr_id = 0; attr_id < num_attrs; ++attr_id) {
+    attribute_id src_attr = attribute_map[attr_id];
+
+    // Attribute doesn't exist in src.
+    if (src_attr == kInvalidCatalogId) {
+      // create a placeholder for now
+      if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
+        // fixed len
+        contig_adv += fixed_len_attr_sizes_[attr_id];
+      } else {
+        // var len
+        varlen_adv += kVarLenSlotSize;
+      }
+      continue;
+    }
+
+    // Attribute exists in src.
+    if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
+      // fixed len
+      copy_groups->contiguous_attrs_.push_back(
+        ContiguousAttrs(src_attr, fixed_len_attr_sizes_[attr_id], contig_adv));
+      contig_adv = fixed_len_attr_sizes_[attr_id];
+    } else {
+      // var len
+      copy_groups->varlen_attrs_.push_back(VarLenAttr(src_attr, attr_id, varlen_adv));
+      varlen_adv = SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize;
+    }
+
+    if (relation_.getNullableAttributeIndex(attr_id) != -1) {
+      copy_groups->nullable_attrs_.push_back(
+        NullableAttr(src_attr, relation_.getNullableAttributeIndex(attr_id)));
+    }
+  }
+  // This will point us to the beginning of the varlen zone.
+  if (copy_groups->varlen_attrs_.size() > 0) {
+    copy_groups->varlen_attrs_[0].bytes_to_advance_ += contig_adv;
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index a930103..681001e 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -45,6 +45,150 @@ class ValueAccessor;
 
 QUICKSTEP_DECLARE_SUB_BLOCK_TYPE_REGISTERED(SplitRowStoreTupleStorageSubBlock);
 
+namespace splitrow_internal {
+// A CopyGroup contains information about ane run of attributes in the source
+// ValueAccessor that can be copied into the output block. The
+// getCopyGroupsForAttributeMap function below takes an attribute map for a source
+// and converts it into a sequence of runs. The goal is to minimize the number
+// of memcpy calls and address calculations that occur during bulk insertion.
+// Contiguous attributes from a rowstore source can be merged into a single copy group.
+//
+// A single ContiguousAttrs CopyGroup consists of contiguous attributes, nullable
+// or not. "Contiguous" here means that their attribute IDs are successive in both
+// the source and destination relations.
+//
+// A NullAttr refers to exactly one nullable attribute. Nullable columns are
+// represented using fixed length inline data as well as a null bitmap.
+// In a particular tuple, if the attribute has a null value, the inline data
+// has no meaning. So it is safe to copy it or not. We use this fact to merge
+// runs together aggressively, i.e., a ContiguousAttrs group may include a
+// nullable attribute. However, we also create a NullableAttr in that case in
+// order to check the null bitmap.
+//
+// A gap is a run of destination (output) attributes that don't come from a
+// particular source. This occurs during bulkInsertPartialTuples. They must be
+// skipped during the insert (not copied over). They are indicated by a
+// kInvalidCatalogId in the attribute map. For efficiency, the gap size
+// is merged into the bytes_to_advance_ of previous ContiguousAttrs copy group.
+// For gaps at the start of the attribute map, we just create a ContiguousAttrs
+// copy group with 0 bytes to copy and dummy (0) source attribute id.
+//
+// eg. For 4B integer attrs, from a row store source,
+// if the input attribute_map is {-1,0,5,6,7,-1,2,4,9,10,-1}
+// with input/output attributes 4 and 7 being nullable,
+// we will create the following ContiguousAttrs copy groups
+//
+//  ----------------------------------------------------
+//  |src_id_      |bytes_to_advance_| bytes_to_copy_   |
+//  |-------------|-----------------|------------------|
+//  |            0|                4|                 4|
+//  |            5|                4|                12|
+//  |            2|               16|                 4|
+//  |            4|                4|                 4|
+//  |            9|                4|                 8|
+//  ----------------------------------------------------
+// and two NullableAttrs with src_attr_id_ set to 4 and 7.
+//
+// In this example, we do 6 memcpy calls and 6 address calculations
+// as well as 2 bitvector lookups for each tuple. A naive copy algorithm
+// would do 11 memcpy calls and address calculations, along with the
+// bitvector lookups, not to mention the schema lookups,
+// all interspersed in a complex loop with lots of branches.
+//
+// If the source was a column store, then we can't merge contiguous
+// attributes (or gaps). So we would have 11 ContigousAttrs copy groups with
+// three of them having bytes_to_copy = 0 (corresponding to the gaps) and
+// the rest having bytes_to_copy_ = 4.
+//
+// SplitRowStore supports variable length attributes. Since the layout of the
+// tuple is like: [null bitmap][fixed length attributes][variable length offsets]
+// we do all the variable length copies after the fixed length copies.
+//
+struct CopyGroup {
+  attribute_id src_attr_id_;  // The attr_id of starting input attribute for run.
+
+  explicit CopyGroup(const attribute_id source_attr_id)
+    : src_attr_id_(source_attr_id) {}
+};
+
+struct ContiguousAttrs : public CopyGroup {
+  std::size_t bytes_to_advance_;  // Number of bytes to advance destination ptr
+                                  // to get to the location where we copy THIS attribute.
+  std::size_t bytes_to_copy_;     // Number of bytes to copy from source.
+
+  ContiguousAttrs(
+    const attribute_id source_attr_id,
+    const std::size_t bytes_to_copy,
+    const std::size_t bytes_to_advance)
+    : CopyGroup(source_attr_id),
+      bytes_to_advance_(bytes_to_advance),
+      bytes_to_copy_(bytes_to_copy) { }
+};
+
+struct VarLenAttr : public CopyGroup {
+  std::size_t bytes_to_advance_;
+  attribute_id dst_attr_id_;
+  VarLenAttr(const attribute_id source_attr_id,
+             const attribute_id dst_attr_id,
+             const std::size_t bytes_to_advance)
+    : CopyGroup(source_attr_id),
+      bytes_to_advance_(bytes_to_advance),
+      dst_attr_id_(dst_attr_id) {}
+};
+
+struct NullableAttr : public CopyGroup {
+  int nullable_attr_idx_;  // index into null bitmap
+
+  NullableAttr(attribute_id source_attr_id_,
+               int nullable_attr_idx)
+    : CopyGroup(source_attr_id_),
+      nullable_attr_idx_(nullable_attr_idx) {}
+};
+
+struct CopyGroupList {
+  CopyGroupList()
+    : contiguous_attrs_(),
+      nullable_attrs_(),
+      varlen_attrs_() {}
+
+  /**
+   * @brief Attributes which are exactly sequential are merged to a single copy.
+   */
+  void merge_contiguous() {
+    if (contiguous_attrs_.size() < 2) {
+      return;
+    }
+
+    int add_to_advance = 0;
+    for (std::size_t idx = 1; idx < contiguous_attrs_.size(); ++idx) {
+      ContiguousAttrs *current_attr = &contiguous_attrs_[idx];
+      ContiguousAttrs *previous_attr = &contiguous_attrs_[idx - 1];
+      if (add_to_advance > 0) {
+        current_attr->bytes_to_advance_ += add_to_advance;
+        add_to_advance = 0;
+      }
+      // The merge step:
+      if (previous_attr->src_attr_id_ + 1 == current_attr->src_attr_id_ &&
+            previous_attr->bytes_to_copy_ == current_attr->bytes_to_advance_) {
+        previous_attr->bytes_to_copy_ += current_attr->bytes_to_copy_;
+        add_to_advance += current_attr->bytes_to_advance_;
+        contiguous_attrs_.erase(contiguous_attrs_.begin() + idx);
+        idx--;
+      }
+    }
+
+    if (varlen_attrs_.size() > 0) {
+      varlen_attrs_[0].bytes_to_advance_ += add_to_advance;
+    }
+  }
+
+  std::vector<ContiguousAttrs> contiguous_attrs_;
+  std::vector<NullableAttr> nullable_attrs_;
+  std::vector<VarLenAttr> varlen_attrs_;
+};
+
+}  // namespace splitrow_internal
+
 /** \addtogroup Storage
  *  @{
  */
@@ -60,6 +204,8 @@ QUICKSTEP_DECLARE_SUB_BLOCK_TYPE_REGISTERED(SplitRowStoreTupleStorageSubBlock);
  *       storage can be reclaimed by calling rebuild().
  **/
 class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
+  static const std::size_t kVarLenSlotSize;
+
  public:
   SplitRowStoreTupleStorageSubBlock(const CatalogRelationSchema &relation,
                                     const TupleStorageSubBlockDescription &description,
@@ -155,6 +301,13 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor) override;
 
+  tuple_id bulkInsertPartialTuples(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor,
+    const tuple_id max_num_tuples_to_insert);
+
+  void bulkInsertPartialTuplesFinalize(const tuple_id num_tuples_inserted);
+
   const void* getAttributeValue(const tuple_id tuple,
                                 const attribute_id attr) const override;
 
@@ -213,6 +366,33 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   template <bool nullable_attrs, bool variable_length_attrs>
   InsertResult insertTupleImpl(const Tuple &tuple);
 
+  template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
+  tuple_id bulkInsertPartialTuplesImpl(
+    const splitrow_internal::CopyGroupList &copy_groups,
+    ValueAccessor *accessor,
+    std::size_t max_num_tuples_to_insert);
+
+  tuple_id bulkInsertDispatcher(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor,
+    tuple_id max_num_tuples_to_insert,
+    bool finalize);
+
+  void getCopyGroupsForAttributeMap(
+    const std::vector<attribute_id> &attribute_map,
+    splitrow_internal::CopyGroupList *copy_groups);
+
+  std::size_t getInsertLowerBound() const;
+
+  // When varlen attributes are bulk inserted, the difference between the maximum
+  // possible size and the actual size of the tuples will cause an underestimate of
+  // the number of tuples we can insert. This threshold puts a limit on the number
+  // of tuples to attempt to insert. A smaller number will give more rounds of insertion
+  // and a more-packed block, but at the cost of insertion speed.
+  std::size_t getInsertLowerBoundThreshold() const {
+    return 10;
+  }
+
   Header *header_;
 
   std::unique_ptr<BitVector<false>> occupancy_bitmap_;
@@ -221,12 +401,18 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   void *tuple_storage_;
   std::size_t tuple_storage_bytes_;
   std::size_t tuple_slot_bytes_;
+  std::vector<std::size_t> fixed_len_attr_sizes_;
+
+  std::size_t num_null_attrs_;
+  std::size_t num_fixed_attrs_;
+  std::size_t num_var_attrs_;
 
   std::size_t per_tuple_null_bitmap_bytes_;
 
   friend class SplitRowStoreTupleStorageSubBlockTest;
   friend class SplitRowStoreValueAccessor;
   FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, InitializeTest);
+  FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest);
 
   DISALLOW_COPY_AND_ASSIGN(SplitRowStoreTupleStorageSubBlock);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
index 2943343..b953854 100644
--- a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
+++ b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
@@ -22,6 +22,7 @@
 #include <cstdio>
 #include <cstring>
 #include <memory>
+#include <string>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -61,6 +62,11 @@ using std::snprintf;
 
 namespace quickstep {
 
+using splitrow_internal::CopyGroupList;
+using splitrow_internal::ContiguousAttrs;
+using splitrow_internal::NullableAttr;
+using splitrow_internal::VarLenAttr;
+
 namespace {
 
 // Used to set up a value-parameterized test with certain features for
@@ -76,9 +82,11 @@ enum class AttributeTypeFeatures {
 
 class SplitRowStoreTupleStorageSubBlockTest
     : public ::testing::TestWithParam<AttributeTypeFeatures> {
- protected:
+ public:
   static const std::size_t kSubBlockSize = 0x100000;  // 1 MB
+  static const std::size_t kVarLenSize = 26;
 
+ protected:
   virtual void SetUp() {
     // Create a sample relation with a variety of attribute types.
     relation_.reset(new CatalogRelation(nullptr, "TestRelation"));
@@ -102,7 +110,7 @@ class SplitRowStoreTupleStorageSubBlockTest
         relation_.get(),
         "string_attr",
         TypeFactory::GetType(testVariableLength() ? kVarChar : kChar,
-                             26,
+                             kVarLenSize,
                              testNullable()));
     ASSERT_EQ(2, relation_->addAttribute(current_attr));
 
@@ -147,6 +155,14 @@ class SplitRowStoreTupleStorageSubBlockTest
     return tuple_store_->tuple_storage_bytes_;
   }
 
+  std::size_t getTupleInsertLowerBound() const {
+    return tuple_store_->getInsertLowerBound();
+  }
+
+  std::size_t getInsertLowerBoundThreshold() const {
+    return tuple_store_->getInsertLowerBoundThreshold();
+  }
+
   Tuple createSampleTuple(const int base_value) const {
     std::vector<TypedValue> attribute_values;
 
@@ -174,10 +190,10 @@ class SplitRowStoreTupleStorageSubBlockTest
       char string_buffer[13];
       int written = snprintf(string_buffer, sizeof(string_buffer), "%d", base_value);
       if (testVariableLength()) {
-        attribute_values.emplace_back((VarCharType::InstanceNonNullable(26).makeValue(string_buffer,
+        attribute_values.emplace_back((VarCharType::InstanceNonNullable(kVarLenSize).makeValue(string_buffer,
                                                                                       written + 1)));
       } else {
-        attribute_values.emplace_back((CharType::InstanceNonNullable(26).makeValue(string_buffer,
+        attribute_values.emplace_back((CharType::InstanceNonNullable(kVarLenSize).makeValue(string_buffer,
                                                                                    written + 1)));
       }
       attribute_values.back().ensureNotReference();
@@ -197,6 +213,11 @@ class SplitRowStoreTupleStorageSubBlockTest
     tuple_store_->rebuild();
   }
 
+  void getCopyGroupsForAttributeMap(const std::vector<attribute_id> &attribute_map,
+                                    CopyGroupList *copy_groups) {
+    tuple_store_->getCopyGroupsForAttributeMap(attribute_map, copy_groups);
+  }
+
   void checkTupleValuesUntyped(const tuple_id tid,
                                const int base_value) {
     ASSERT_TRUE(tuple_store_->hasTupleWithID(tid));
@@ -269,6 +290,135 @@ class SplitRowStoreTupleStorageSubBlockTest
 };
 typedef SplitRowStoreTupleStorageSubBlockTest SplitRowStoreTupleStorageSubBlockDeathTest;
 
+class SplitRowWrapper {
+ public:
+  enum AttrType {
+    kInt = 0,
+    kDouble,
+    kString,
+    kNumAttrTypes
+  };
+
+  /**
+   * Builds a catalog relation given a list of attributes.
+   *
+   * @param attribute_ordering The ordering of the attributes in the represented relation. Attribute #1 is an
+   *                integer attribute, #2 is a double, and #3 is a string.
+   * @param contains_nullable If the relation contains nullable attributes.
+   * @param contains_varlen If the relation contains variable length attributes.
+   * @return A caller-owned catalog relation.
+   */
+  static CatalogRelation *
+  GetRelationFromAttributeList(const std::vector<attribute_id> &attribute_ordering, bool contains_nullable,
+                               bool contains_varlen) {
+    // Create a unique name.
+    std::string rel_name("TempRelation");
+    for (auto attr_itr = attribute_ordering.begin();
+         attr_itr != attribute_ordering.end();
+         ++attr_itr) {
+      rel_name += "_" + std::to_string(*attr_itr);
+    }
+    CatalogRelation *relation = new CatalogRelation(nullptr, rel_name.c_str());
+
+    std::vector<int> attr_counts(AttrType::kNumAttrTypes);
+    std::string attr_name;
+    for (auto attr_itr = attribute_ordering.begin();
+         attr_itr != attribute_ordering.end();
+         ++attr_itr) {
+      switch (*attr_itr) {
+        case AttrType::kInt:
+          // An integer.
+          attr_name = "int_attr_" + std::to_string(attr_counts[AttrType::kInt]);
+          relation->addAttribute(new CatalogAttribute(
+            relation,
+            attr_name.c_str(),
+            TypeFactory::GetType(TypeID::kInt, contains_nullable)));
+          attr_counts[AttrType::kInt]++;
+          break;
+        case AttrType::kDouble:
+          // A double.
+          attr_name = "double_attr_" + std::to_string(attr_counts[AttrType::kDouble]);
+          relation->addAttribute(new CatalogAttribute(
+            relation,
+            attr_name.c_str(),
+            TypeFactory::GetType(TypeID::kDouble, contains_nullable)));
+          attr_counts[AttrType::kDouble]++;
+          break;
+        case AttrType::kString:
+          // A (possibly variable-length) string.
+          attr_name = "string_attr_" + std::to_string(attr_counts[AttrType::kString]);
+          relation->addAttribute(new CatalogAttribute(
+            relation,
+            attr_name.c_str(),
+            TypeFactory::GetType(contains_varlen ? TypeID::kVarChar : TypeID::kChar,
+                                 SplitRowStoreTupleStorageSubBlockTest::kVarLenSize,
+                                 contains_nullable)));
+          attr_counts[AttrType::kString]++;
+          break;
+        default:
+          LOG(FATAL) << "Unknown type was specified in SplitRowWrapper.";
+          break;
+      }
+    }
+    return relation;
+  }
+
+  /**
+   * A wrapper for an empty SplitRowstore.
+   *
+   * @param attribute_ordering The ordering of the attributes in the represented relation. Attribute #1 is an
+   *                integer attribute, #2 is a double, and #3 is a string.
+   * @param contains_nullable If the relation contains nullable attributes.
+   * @param contains_varlen If the relation contains variable length attributes.
+   */
+  SplitRowWrapper(const std::vector<attribute_id> &attribute_ordering, bool contains_nullable, bool contains_varlen)
+    : contains_nullable_(contains_nullable),
+      contains_varlen_(contains_varlen) {
+    initialize(attribute_ordering);
+  }
+
+  SplitRowWrapper(bool contains_nullable, bool contains_varlen)
+    : contains_nullable_(contains_nullable),
+      contains_varlen_(contains_varlen) {
+    // Make a clone of the Test Block type using the 3 basic attributes.
+    std::vector<attribute_id> attrs;
+    for (attribute_id attr = 0; attr < 3; ++attr) {
+      attrs.push_back(attr);
+    }
+    initialize(attrs);
+  }
+
+  SplitRowStoreTupleStorageSubBlock *operator->() {
+    return tuple_store_.get();
+  }
+
+  const bool contains_nullable_;
+  const bool contains_varlen_;
+
+  std::unique_ptr<CatalogRelation> relation_;
+  std::unique_ptr<TupleStorageSubBlockDescription> tuple_store_description_;
+  ScopedBuffer tuple_store_memory_;
+  std::unique_ptr<SplitRowStoreTupleStorageSubBlock> tuple_store_;
+
+ private:
+  void initialize(const std::vector<attribute_id> &attribute_ordering) {
+    // Create a sample relation with a variety of attribute types.
+    relation_.reset(GetRelationFromAttributeList(attribute_ordering, contains_nullable_, contains_varlen_));
+
+    tuple_store_description_.reset(new TupleStorageSubBlockDescription());
+    tuple_store_description_->set_sub_block_type(TupleStorageSubBlockDescription::SPLIT_ROW_STORE);
+
+    // Initialize the actual block.
+    tuple_store_memory_.reset(SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize);
+    std::memset(tuple_store_memory_.get(), 0x0, SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize);
+    tuple_store_.reset(new SplitRowStoreTupleStorageSubBlock(*relation_,
+                                                             *tuple_store_description_,
+                                                             true,
+                                                             tuple_store_memory_.get(),
+                                                             SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize));
+  }
+};
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, DescriptionIsValidTest) {
   // The descriptions we use for the other tests (which includes nullable and
   // variable-length attributes) should be valid.
@@ -458,37 +608,37 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
   const std::size_t max_tuple_capacity = getTupleStorageSize() / getTupleSlotSize();
 
   NativeColumnVector *int_vector = new NativeColumnVector(
-      relation_->getAttributeById(0)->getType(),
-      max_tuple_capacity);
+    relation_->getAttributeById(0)->getType(),
+    max_tuple_capacity);
   NativeColumnVector *double_vector = new NativeColumnVector(
-      relation_->getAttributeById(1)->getType(),
-      max_tuple_capacity);
+    relation_->getAttributeById(1)->getType(),
+    max_tuple_capacity);
   ColumnVector *string_vector = testVariableLength() ?
-      static_cast<ColumnVector*>(new IndirectColumnVector(
-          relation_->getAttributeById(2)->getType(),
-          max_tuple_capacity))
-      : static_cast<ColumnVector*>(new NativeColumnVector(
-          relation_->getAttributeById(2)->getType(),
-          max_tuple_capacity));
+                                static_cast<ColumnVector*>(new IndirectColumnVector(
+                                  relation_->getAttributeById(2)->getType(),
+                                  max_tuple_capacity))
+                                                     : static_cast<ColumnVector*>(new NativeColumnVector(
+      relation_->getAttributeById(2)->getType(),
+      max_tuple_capacity));
 
   std::size_t storage_used = 0;
   int current_tuple_idx = 0;
   for (;;) {
     Tuple current_tuple(createSampleTuple(current_tuple_idx));
     const std::size_t current_tuple_storage_bytes
-        = getTupleSlotSize()
-          + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
-                                     0 : current_tuple.getAttributeValue(2).getDataSize())
-                                  : 0);
+      = getTupleSlotSize()
+        + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
+                                   0 : current_tuple.getAttributeValue(2).getDataSize())
+                                : 0);
     if (storage_used + current_tuple_storage_bytes <= getTupleStorageSize()) {
       int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
       double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
       if (testVariableLength()) {
         static_cast<IndirectColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
       } else {
         static_cast<NativeColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
       }
 
       storage_used += current_tuple_storage_bytes;
@@ -505,16 +655,21 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
 
   // Actually do the bulk-insert.
   accessor.beginIteration();
-  EXPECT_EQ(current_tuple_idx, tuple_store_->bulkInsertTuples(&accessor));
-  EXPECT_TRUE(accessor.iterationFinished());
-
-  // Shouldn't be able to insert any more tuples.
-  accessor.beginIteration();
-  EXPECT_EQ(0, tuple_store_->bulkInsertTuples(&accessor));
+  tuple_id num_inserted = tuple_store_->bulkInsertTuples(&accessor);
+  if (testVariableLength()) {
+    EXPECT_LE(current_tuple_idx - num_inserted, getInsertLowerBoundThreshold());
+  } else {
+    EXPECT_EQ(current_tuple_idx, num_inserted);
+    ASSERT_TRUE(accessor.iterationFinished());
+    // Shouldn't be able to insert any more tuples.
+    accessor.beginIteration();
+    tuple_id num_inserted_second_round = tuple_store_->bulkInsertTuples(&accessor);
+    ASSERT_EQ(0, num_inserted_second_round);
+  }
 
   tuple_store_->rebuild();
-  EXPECT_EQ(current_tuple_idx, tuple_store_->numTuples());
-  EXPECT_EQ(current_tuple_idx - 1, tuple_store_->getMaxTupleID());
+  EXPECT_EQ(num_inserted, tuple_store_->numTuples());
+  EXPECT_EQ(num_inserted - 1, tuple_store_->getMaxTupleID());
 
   // Check the inserted values.
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -525,6 +680,146 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
   }
 }
 
+TEST_P(SplitRowStoreTupleStorageSubBlockTest, PartialBulkInsertTest) {
+  // Build up a ColumnVectorsValueAccessor to bulk-insert from. We'll reserve
+  // enough space for the maximum possible number of tuples in the block, even
+  // though we won't use all of it if testVariableLength() is true.
+  const std::size_t max_tuple_capacity = getTupleStorageSize() / getTupleSlotSize();
+
+  NativeColumnVector *int_vector = new NativeColumnVector(
+    relation_->getAttributeById(0)->getType(),
+    max_tuple_capacity);
+  NativeColumnVector *double_vector = new NativeColumnVector(
+    relation_->getAttributeById(1)->getType(),
+    max_tuple_capacity);
+  ColumnVector *string_vector = testVariableLength() ?
+                                static_cast<ColumnVector *>(new IndirectColumnVector(
+                                  relation_->getAttributeById(2)->getType(),
+                                  max_tuple_capacity))
+                                                     : static_cast<ColumnVector *>(new NativeColumnVector(
+      relation_->getAttributeById(2)->getType(),
+      max_tuple_capacity));
+
+  const int max_tuples_insert = 1000;
+  for (int tuple_idx = 0; tuple_idx < max_tuples_insert; ++tuple_idx) {
+    Tuple current_tuple(createSampleTuple(tuple_idx));
+    int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
+    double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
+    if (testVariableLength()) {
+      static_cast<IndirectColumnVector *>(string_vector)
+        ->appendTypedValue(current_tuple.getAttributeValue(2));
+    } else {
+      static_cast<NativeColumnVector *>(string_vector)
+        ->appendTypedValue(current_tuple.getAttributeValue(2));
+    }
+  }
+
+  std::vector<attribute_id> attr_map_pt1 = {kInvalidCatalogId, 0, kInvalidCatalogId};
+  std::vector<attribute_id> attr_map_pt2 = {0, kInvalidCatalogId, 1};
+
+  ColumnVectorsValueAccessor accessor_pt1;
+  accessor_pt1.addColumn(double_vector);
+
+  ColumnVectorsValueAccessor accessor_pt2;
+  accessor_pt2.addColumn(int_vector);
+  accessor_pt2.addColumn(string_vector);
+
+
+  // Actually do the bulk-insert.
+  accessor_pt1.beginIteration();
+  const tuple_id num_inserted_pt1 = tuple_store_->bulkInsertPartialTuples(attr_map_pt1, &accessor_pt1, kCatalogMaxID);
+  ASSERT_GT(num_inserted_pt1, 0);
+  const tuple_id num_inserted_pt2 = tuple_store_->bulkInsertPartialTuples(attr_map_pt2, &accessor_pt2,
+                                                                          num_inserted_pt1);
+  ASSERT_EQ(num_inserted_pt1, num_inserted_pt2);
+
+  tuple_store_->bulkInsertPartialTuplesFinalize(num_inserted_pt1);
+  ASSERT_EQ(max_tuples_insert, tuple_store_->getMaxTupleID() + 1);
+  ASSERT_EQ(num_inserted_pt1, tuple_store_->getMaxTupleID() + 1);
+  EXPECT_TRUE(accessor_pt2.iterationFinished());
+
+  tuple_store_->rebuild();
+
+  // Should be the same order as if we inserted them serially.
+  ASSERT_TRUE(tuple_store_->isPacked());
+  for (tuple_id tid = 0;
+       tid <= tuple_store_->getMaxTupleID();
+       ++tid) {
+    checkTupleValuesUntyped(tid, tid);
+  }
+}
+
+TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest) {
+  const bool nullable_attrs = testNullable();
+  std::vector<attribute_id> relation_attrs = {
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString};
+  SplitRowWrapper dst_store(relation_attrs, nullable_attrs, testVariableLength());
+  std::vector<attribute_id> attr_map = { kInvalidCatalogId, 0, 1, kInvalidCatalogId, 2, 1 };
+  CopyGroupList copy_groups;
+  dst_store->getCopyGroupsForAttributeMap(attr_map, &copy_groups);
+
+  std::vector<ContiguousAttrs>& contiguous_attrs = copy_groups.contiguous_attrs_;
+  std::vector<VarLenAttr>& varlen_attrs = copy_groups.varlen_attrs_;
+
+  const std::size_t size_of_string = dst_store->getRelation().getAttributeById(3)->getType().maximumByteLength();
+
+  // Fixed length attributes.
+  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_copy_);
+
+  EXPECT_EQ(1, contiguous_attrs[1].src_attr_id_);
+  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_advance_);
+  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_copy_);
+
+  if (testVariableLength()) {
+    ASSERT_EQ(2, contiguous_attrs.size());
+    ASSERT_EQ(2, varlen_attrs.size());
+
+    EXPECT_EQ(2, varlen_attrs[0].src_attr_id_);
+    EXPECT_EQ(sizeof(int) + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[0].bytes_to_advance_);
+
+    EXPECT_EQ(1, varlen_attrs[1].src_attr_id_);
+    EXPECT_EQ(SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[1].bytes_to_advance_);
+
+  } else {
+    ASSERT_EQ(4, copy_groups.contiguous_attrs_.size());
+    ASSERT_EQ(0, copy_groups.varlen_attrs_.size());
+
+    EXPECT_EQ(2, contiguous_attrs[2].src_attr_id_);
+    EXPECT_EQ(4 + size_of_string, contiguous_attrs[2].bytes_to_advance_);
+    EXPECT_EQ(size_of_string, contiguous_attrs[2].bytes_to_copy_);
+  }
+
+  int null_count =  copy_groups.nullable_attrs_.size();
+  if (testNullable()) {
+    // The relation contains 6 nullable attributes, but only 3 are inserted.
+    EXPECT_EQ(4, null_count);
+  } else {
+    EXPECT_EQ(0, null_count);
+  }
+
+  // test that merging works.
+  copy_groups.merge_contiguous();
+  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
+
+  if (testVariableLength()) {
+    EXPECT_EQ(1, contiguous_attrs.size());
+    EXPECT_EQ(sizeof(int) * 2 + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize,
+              varlen_attrs[0].bytes_to_advance_);
+  } else {
+    EXPECT_EQ(3, contiguous_attrs.size());
+    EXPECT_EQ(8, contiguous_attrs[0].bytes_to_copy_);
+    EXPECT_EQ(8 + size_of_string, contiguous_attrs[1].bytes_to_advance_);
+  }
+}
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTest) {
   // This is similar to the above test, but we will reverse the order of the
   // ColumnVectors in the ColumnVectorsValueAccessor and remap them back to the
@@ -551,25 +846,26 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTe
 
   std::size_t storage_used = 0;
   int current_tuple_idx = 0;
+  std::size_t tuple_max_size = relation_->getMaximumByteLength();
+  std::size_t tuple_slot_size = getTupleSlotSize();
   for (;;) {
     Tuple current_tuple(createSampleTuple(current_tuple_idx));
-    const std::size_t current_tuple_storage_bytes
-        = getTupleSlotSize()
-          + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
-                                     0 : current_tuple.getAttributeValue(2).getDataSize())
-                                  : 0);
-    if (storage_used + current_tuple_storage_bytes <= getTupleStorageSize()) {
+    if ((getTupleStorageSize() - storage_used) / tuple_max_size > 0) {
       int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
       double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
       if (testVariableLength()) {
         static_cast<IndirectColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
       } else {
         static_cast<NativeColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
+      }
+
+      storage_used += tuple_slot_size;
+      if (testVariableLength() && !current_tuple.getAttributeValue(2).isNull()) {
+        storage_used += current_tuple.getAttributeValue(2).getDataSize();
       }
 
-      storage_used += current_tuple_storage_bytes;
       ++current_tuple_idx;
     } else {
       break;
@@ -588,18 +884,21 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTe
 
   // Actually do the bulk-insert.
   accessor.beginIteration();
-  EXPECT_EQ(current_tuple_idx,
-            tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor));
-  EXPECT_TRUE(accessor.iterationFinished());
-
-  // Shouldn't be able to insert any more tuples.
-  accessor.beginIteration();
-  EXPECT_EQ(0,
-            tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor));
+  tuple_id num_inserted = tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor);
+  if (testVariableLength()) {
+    EXPECT_LE(current_tuple_idx - num_inserted, getInsertLowerBoundThreshold());
+  } else {
+    EXPECT_EQ(current_tuple_idx, num_inserted);
+    ASSERT_TRUE(accessor.iterationFinished());
+    // Shouldn't be able to insert any more tuples.
+    accessor.beginIteration();
+    tuple_id num_inserted_second_round = tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor);
+    ASSERT_EQ(0, num_inserted_second_round);
+  }
 
   tuple_store_->rebuild();
-  EXPECT_EQ(current_tuple_idx, tuple_store_->numTuples());
-  EXPECT_EQ(current_tuple_idx - 1, tuple_store_->getMaxTupleID());
+  EXPECT_EQ(num_inserted, tuple_store_->numTuples());
+  EXPECT_EQ(num_inserted - 1, tuple_store_->getMaxTupleID());
 
   // Check the inserted values.
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -632,6 +931,53 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetAttributeValueTypedTest) {
   }
 }
 
+TEST_P(SplitRowStoreTupleStorageSubBlockTest, SplitRowToSplitRowTest) {
+  // Test insertion of data from a SplitRow to a SplitRow with no reordering.
+  fillBlockWithSampleData();
+  std::vector<attribute_id> relation_attrs = {
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kDouble,
+    SplitRowWrapper::AttrType::kDouble,
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString};
+  SplitRowWrapper dst_store(relation_attrs, testNullable(), testVariableLength());
+
+  std::vector<attribute_id> attribute_map = {0, kInvalidCatalogId, 1, 0, 2, kInvalidCatalogId, 2};
+
+  std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor());
+  ASSERT_EQ(ValueAccessor::Implementation::kSplitRowStore,
+            accessor->getImplementationType());
+  ASSERT_FALSE(accessor->isTupleIdSequenceAdapter());
+
+  SplitRowStoreValueAccessor &cast_accessor = static_cast<SplitRowStoreValueAccessor &>(*accessor);
+  std::size_t num_inserted = dst_store->bulkInsertPartialTuples(attribute_map, &cast_accessor, kCatalogMaxID);
+  attribute_map = {kInvalidCatalogId, 1, kInvalidCatalogId, kInvalidCatalogId, kInvalidCatalogId, 2, kInvalidCatalogId};
+  cast_accessor.beginIteration();
+  dst_store->bulkInsertPartialTuples(attribute_map, &cast_accessor, num_inserted);
+  dst_store->bulkInsertPartialTuplesFinalize(num_inserted);
+
+  EXPECT_EQ(num_inserted - 1, dst_store->getMaxTupleID());
+  // The inserted relation should hold roughly 1/3 the tuples of the src. The more varlen
+  // attributes, the fewer the relation will accept due to how it estimates.
+  EXPECT_LT(0.15 * tuple_store_->getMaxTupleID(), dst_store->getMaxTupleID());
+  EXPECT_GT(0.5 * tuple_store_->getMaxTupleID(), dst_store->getMaxTupleID());
+
+  attribute_map = {0, 1, 4};
+  for (tuple_id tid = 0; tid < dst_store->getMaxTupleID(); ++tid) {
+    for (attribute_id aid = 0; aid < tuple_store_->getRelation().getMaxAttributeId(); ++aid) {
+      const TypedValue &dst_value = dst_store->getAttributeValueTyped(tid, attribute_map[aid]);
+      const TypedValue &src_value = tuple_store_->getAttributeValueTyped(tid, aid);
+      if (src_value.isNull() || dst_value.isNull()) {
+        EXPECT_TRUE(src_value.isNull() && dst_value.isNull());
+      } else {
+        EXPECT_TRUE(src_value.fastEqualCheck(dst_value));
+      }
+    }
+  }
+}
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, ValueAccessorTest) {
   fillBlockWithSampleData();
 
@@ -721,7 +1067,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, SetAttributeValueTypedTest) {
     // It's also OK to replace a variable-length value with a shorter value, or
     // with null.
     std::unordered_map<attribute_id, TypedValue> variable_new_values;
-    variable_new_values.emplace(2, VarCharType::InstanceNonNullable(26).makeValue("x", 2));
+    variable_new_values.emplace(2, VarCharType::InstanceNonNullable(kVarLenSize).makeValue("x", 2));
     ASSERT_TRUE(tuple_store_->canSetAttributeValuesInPlaceTyped(33, variable_new_values));
     tuple_store_->setAttributeValueInPlaceTyped(33, 2, variable_new_values[2]);
     EXPECT_STREQ("x", static_cast<const char*>(tuple_store_->getAttributeValue(33, 2)));
@@ -747,13 +1093,14 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, SetAttributeValueTypedTest) {
     EXPECT_TRUE(tuple_store_->insertTupleInBatch(createSampleTuple(0)));
     tuple_store_->rebuild();
 
-    variable_new_values[2] = VarCharType::InstanceNonNullable(26).makeValue("hello world", 12);
+    variable_new_values[2] = VarCharType::InstanceNonNullable(kVarLenSize).makeValue("hello world", 12);
     ASSERT_TRUE(tuple_store_->canSetAttributeValuesInPlaceTyped(0, variable_new_values));
     tuple_store_->setAttributeValueInPlaceTyped(0, 2, variable_new_values[2]);
     EXPECT_STREQ("hello world", static_cast<const char*>(tuple_store_->getAttributeValue(0, 2)));
   }
 }
 
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
   fillBlockWithSampleData();
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -806,7 +1153,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
       reinsert_attr_values.emplace_back(testVariableLength() ? kVarChar : kChar);
     } else {
       reinsert_attr_values.emplace_back(
-          CharType::InstanceNonNullable(26).makeValue("foo", 4));
+          CharType::InstanceNonNullable(kVarLenSize).makeValue("foo", 4));
       reinsert_attr_values.back().ensureNotReference();
     }
     Tuple reinsert_tuple(std::move(reinsert_attr_values));
@@ -831,7 +1178,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
     std::vector<TypedValue> extra_variable_attr_values;
     extra_variable_attr_values.emplace_back(-123);
     extra_variable_attr_values.emplace_back(static_cast<double>(-100.5));
-    extra_variable_attr_values.emplace_back((VarCharType::InstanceNonNullable(26).makeValue(
+    extra_variable_attr_values.emplace_back((VarCharType::InstanceNonNullable(kVarLenSize).makeValue(
         kExtraVarCharValue,
         27)));
     extra_variable_tuple = Tuple(std::move(extra_variable_attr_values));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/utility/BitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BitVector.hpp b/utility/BitVector.hpp
index bb76315..2b56b8c 100644
--- a/utility/BitVector.hpp
+++ b/utility/BitVector.hpp
@@ -183,6 +183,20 @@ class BitVector {
   }
 
   /**
+   * @brief Assign this BitVector's contents to the pointed-to memory.
+   *
+   * @warning caller is responsible for ensuring the Bitvector has the correct
+   *          ownership and size.
+   *
+   * @param ptr Pointer to data representing a BitVector with the same parameters
+   *            as this BitVector.
+   **/
+  inline void setMemory(void *ptr) {
+    DCHECK(!owned_);
+    this->data_array_ = static_cast<std::size_t*>(ptr);
+  }
+
+  /**
    * @brief Similar to assignFrom(), but the other BitVector to assign from is
    *        allowed to be longer than this one.
    * @warning Only available when enable_short_version is false.