You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/06 02:08:06 UTC

[2/3] incubator-quickstep git commit: Improve partial bulk insert.

Improve partial bulk insert.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/31c80934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/31c80934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/31c80934

Branch: refs/heads/output-attr-order
Commit: 31c809343259bc3097c2e86a7860091cea7e6050
Parents: 9fcb0ac
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Dec 22 13:10:07 2016 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Dec 22 13:11:06 2016 -0600

----------------------------------------------------------------------
 relational_operators/HashJoinOperator.cpp       | 150 +---
 storage/InsertDestination.cpp                   |  84 ---
 storage/InsertDestination.hpp                   |  16 -
 storage/InsertDestinationInterface.hpp          |  22 -
 storage/SplitRowStoreTupleStorageSubBlock.cpp   | 692 ++++++++++---------
 storage/SplitRowStoreTupleStorageSubBlock.hpp   | 186 -----
 storage/StorageBlock.cpp                        |  24 -
 storage/StorageBlock.hpp                        |  44 --
 storage/TupleStorageSubBlock.hpp                |  50 --
 ...litRowStoreTupleStorageSubBlock_unittest.cpp | 445 ++----------
 types/containers/ColumnVectorsValueAccessor.hpp |   4 -
 utility/BitVector.hpp                           |  14 -
 12 files changed, 457 insertions(+), 1274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 2028046..4a91f86 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -65,11 +65,10 @@ namespace {
 
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
 // tuples from the inner relation. It stores matching tuple ID pairs
-// in an unordered_map keyed by inner block ID and a vector of
-// pairs of (build-tupleID, probe-tuple-ID).
-class VectorsOfPairsJoinedTuplesCollector {
+// in an unordered_map keyed by inner block ID.
+class MapBasedJoinedTupleCollector {
  public:
-  VectorsOfPairsJoinedTuplesCollector() {
+  MapBasedJoinedTupleCollector() {
   }
 
   template <typename ValueAccessorT>
@@ -96,34 +95,6 @@ class VectorsOfPairsJoinedTuplesCollector {
   std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
 };
 
-// Another collector using an unordered_map keyed on inner block just like above,
-// except that it uses of a pair of (build-tupleIDs-vector, probe-tuple-IDs-vector).
-class PairsOfVectorsJoinedTuplesCollector {
- public:
-  PairsOfVectorsJoinedTuplesCollector() {
-  }
-
-  template <typename ValueAccessorT>
-  inline void operator()(const ValueAccessorT &accessor,
-                         const TupleReference &tref) {
-    joined_tuples_[tref.block].first.push_back(tref.tuple);
-    joined_tuples_[tref.block].second.push_back(accessor.getCurrentPosition());
-  }
-
-  // Get a mutable pointer to the collected map of joined tuple ID pairs. The
-  // key is inner block_id, value is a pair consisting of
-  // inner block tuple IDs (first) and outer block tuple IDs (second).
-  inline std::unordered_map< block_id, std::pair<std::vector<tuple_id>, std::vector<tuple_id>>>*
-      getJoinedTuples() {
-    return &joined_tuples_;
-  }
-
- private:
-  std::unordered_map<
-    block_id,
-    std::pair<std::vector<tuple_id>, std::vector<tuple_id>>> joined_tuples_;
-};
-
 class SemiAntiJoinTupleCollector {
  public:
   explicit SemiAntiJoinTupleCollector(TupleIdSequence *filter)
@@ -461,7 +432,7 @@ void HashInnerJoinWorkOrder::execute() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  PairsOfVectorsJoinedTuplesCollector collector;
+  MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -479,14 +450,12 @@ void HashInnerJoinWorkOrder::execute() {
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
 
-  for (std::pair<const block_id, std::pair<std::vector<tuple_id>, std::vector<tuple_id>>>
+  for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
            &build_block_entry : *collector.getJoinedTuples()) {
     BlockReference build_block =
         storage_manager_->getBlock(build_block_entry.first, build_relation_);
     const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
     std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
-    const std::vector<tuple_id> &build_tids = build_block_entry.second.first;
-    const std::vector<tuple_id> &probe_tids = build_block_entry.second.second;
 
     // Evaluate '*residual_predicate_', if any.
     //
@@ -499,16 +468,17 @@ void HashInnerJoinWorkOrder::execute() {
     // hash join is below a reasonable threshold so that we don't blow up
     // temporary memory requirements to an unreasonable degree.
     if (residual_predicate_ != nullptr) {
-      std::pair<std::vector<tuple_id>, std::vector<tuple_id>> filtered_matches;
-      for (std::size_t i = 0; i < build_tids.size(); ++i) {
+      std::vector<std::pair<tuple_id, tuple_id>> filtered_matches;
+
+      for (const std::pair<tuple_id, tuple_id> &hash_match
+           : build_block_entry.second) {
         if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
                                                         build_relation_id,
-                                                        build_tids[i],
+                                                        hash_match.first,
                                                         *probe_accessor,
                                                         probe_relation_id,
-                                                        probe_tids[i])) {
-          filtered_matches.first.push_back(build_tids[i]);
-          filtered_matches.second.push_back(probe_tids[i]);
+                                                        hash_match.second)) {
+          filtered_matches.emplace_back(hash_match);
         }
       }
 
@@ -531,96 +501,22 @@ void HashInnerJoinWorkOrder::execute() {
     // benefit (probably only a real performance win when there are very few
     // matching tuples in each individual inner block but very many inner
     // blocks with at least one match).
-
-    // We now create ordered value accessors for both build and probe side,
-    // using the joined tuple TIDs. Note that we have to use this Lambda-based
-    // invocation method here because the accessors don't have a virtual
-    // function that creates such an OrderedTupleIdSequenceAdapterValueAccessor.
-    std::unique_ptr<ValueAccessor> ordered_build_accessor, ordered_probe_accessor;
-    InvokeOnValueAccessorNotAdapter(
-        build_accessor.get(),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-          ordered_build_accessor.reset(
-              accessor->createSharedOrderedTupleIdSequenceAdapter(build_tids));
-        });
-
-    if (probe_accessor->isTupleIdSequenceAdapter()) {
-      InvokeOnTupleIdSequenceAdapterValueAccessor(
-        probe_accessor.get(),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-          ordered_probe_accessor.reset(
-            accessor->createSharedOrderedTupleIdSequenceAdapter(probe_tids));
-        });
-    } else {
-      InvokeOnValueAccessorNotAdapter(
-        probe_accessor.get(),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-          ordered_probe_accessor.reset(
-            accessor->createSharedOrderedTupleIdSequenceAdapter(probe_tids));
-        });
-    }
-
-
-    // We also need a temp value accessor to store results of any scalar expressions.
     ColumnVectorsValueAccessor temp_result;
-
-    // Create a map of ValueAccessors and what attributes we want to pick from them
-    std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> accessor_attribute_map;
-    const std::vector<ValueAccessor *> accessors{
-        ordered_build_accessor.get(), ordered_probe_accessor.get(), &temp_result};
-    const unsigned int build_index = 0, probe_index = 1, temp_index = 2;
-    for (auto &accessor : accessors) {
-      accessor_attribute_map.push_back(std::make_pair(
-          accessor,
-          std::vector<attribute_id>(selection_.size(), kInvalidCatalogId)));
-    }
-
-    attribute_id dest_attr = 0;
-    std::vector<std::pair<tuple_id, tuple_id>> zipped_joined_tuple_ids;
-
-    for (auto &selection_cit : selection_) {
-      // If the Scalar (column) is not an attribute in build/probe blocks, then
-      // insert it into a ColumnVectorsValueAccessor.
-      if (selection_cit->getDataSource() != Scalar::ScalarDataSource::kAttribute) {
-        // Current destination attribute maps to the column we'll create now.
-        accessor_attribute_map[temp_index].second[dest_attr] = temp_result.getNumColumns();
-
-        if (temp_result.getNumColumns() == 0) {
-          // The getAllValuesForJoin function below needs joined tuple IDs as
-          // a vector of pair of (build-tuple-ID, probe-tuple-ID), and we have
-          // a pair of (build-tuple-IDs-vector, probe-tuple-IDs-vector). So
-          // we'll have to zip our two vectors together. We do this inside
-          // the loop because most queries don't exercise this code since
-          // they don't have scalar expressions with attributes from both
-          // build and probe relations (other expressions would have been
-          // pushed down to before the join).
-          zipped_joined_tuple_ids.reserve(build_tids.size());
-          for (std::size_t i = 0; i < build_tids.size(); ++i) {
-            zipped_joined_tuple_ids.push_back(std::make_pair(build_tids[i], probe_tids[i]));
-          }
-        }
-        temp_result.addColumn(
-            selection_cit
-                ->getAllValuesForJoin(build_relation_id, build_accessor.get(),
-                                      probe_relation_id, probe_accessor.get(),
-                                      zipped_joined_tuple_ids));
-      } else {
-        auto scalar_attr = static_cast<const ScalarAttribute *>(selection_cit.get());
-        const attribute_id attr_id = scalar_attr->getAttribute().getID();
-        if (scalar_attr->getAttribute().getParent().getID() == build_relation_id) {
-          accessor_attribute_map[build_index].second[dest_attr] = attr_id;
-        } else {
-          accessor_attribute_map[probe_index].second[dest_attr] = attr_id;
-        }
-      }
-      ++dest_attr;
+    for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
+         selection_cit != selection_.end();
+         ++selection_cit) {
+      temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id,
+                                                                  build_accessor.get(),
+                                                                  probe_relation_id,
+                                                                  probe_accessor.get(),
+                                                                  build_block_entry.second));
     }
 
     // NOTE(chasseur): calling the bulk-insert method of InsertDestination once
     // for each pair of joined blocks incurs some extra overhead that could be
     // avoided by keeping checked-out MutableBlockReferences across iterations
     // of this loop, but that would get messy when combined with partitioning.
-    output_destination_->bulkInsertTuplesFromValueAccessors(accessor_attribute_map);
+    output_destination_->bulkInsertTuples(&temp_result);
   }
 }
 
@@ -654,7 +550,7 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   // We collect all the matching probe relation tuples, as there's a residual
   // preidcate that needs to be applied after collecting these matches.
-  VectorsOfPairsJoinedTuplesCollector collector;
+  MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -863,7 +759,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  VectorsOfPairsJoinedTuplesCollector collector;
+  MapBasedJoinedTupleCollector collector;
   // We probe the hash table and get all the matches. Unlike
   // executeWithoutResidualPredicate(), we have to collect all the matching
   // tuples, because after this step we still have to evalute the residual

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 067edf6..5e83453 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -247,90 +247,6 @@ void InsertDestination::bulkInsertTuplesWithRemappedAttributes(
   });
 }
 
-// A common case that we can optimize away is when the attribute_map
-// for an accessor only contains gaps. e.g. This happens for a join when
-// there are no attributes selected from one side.
-void removeGapOnlyAccessors(
-  const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>>* accessor_attribute_map,
-  std::vector<std::pair<ValueAccessor *, const std::vector<attribute_id>>>* reduced_accessor_attribute_map) {
-  for (std::size_t i = 0; i < accessor_attribute_map->size(); ++i) {
-    bool all_gaps = true;
-    for (const auto &attr : (*accessor_attribute_map)[i].second)
-      if (attr != kInvalidCatalogId) {
-        all_gaps = false;
-        break;
-      }
-    if (all_gaps)
-      continue;
-    reduced_accessor_attribute_map->push_back((*accessor_attribute_map)[i]);
-    (*accessor_attribute_map)[i].first->beginIterationVirtual();
-  }
-}
-
-void InsertDestination::bulkInsertTuplesFromValueAccessors(
-    const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-    bool always_mark_full) {
-  // Handle pathological corner case where there are no accessors
-  if (accessor_attribute_map.size() == 0)
-    return;
-
-  std::vector<std::pair<ValueAccessor *, const std::vector<attribute_id>>> reduced_accessor_attribute_map;
-  removeGapOnlyAccessors(&accessor_attribute_map, &reduced_accessor_attribute_map);
-
-  // We assume that all input accessors have the same number of tuples, so
-  // the iterations finish together. Therefore, we can just check the first one.
-  auto first_accessor = reduced_accessor_attribute_map[0].first;
-  while (!first_accessor->iterationFinishedVirtual()) {
-    tuple_id num_tuples_to_insert = kCatalogMaxID;
-    tuple_id num_tuples_inserted = 0;
-    MutableBlockReference output_block = this->getBlockForInsertion();
-
-    // Now iterate through all the accessors and do one round of bulk-insertion
-    // of partial tuples into the selected output_block.
-    // While inserting from the first ValueAccessor, space is reserved for
-    // all the columns including those coming from other ValueAccessors.
-    // Thereafter, in a given round, we only insert the remaining columns of the
-    // same tuples from the other ValueAccessors.
-    for (auto &p : reduced_accessor_attribute_map) {
-      ValueAccessor *accessor = p.first;
-      std::vector<attribute_id> attribute_map = p.second;
-
-
-      InvokeOnAnyValueAccessor(
-          accessor,
-          [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-            num_tuples_inserted = output_block->bulkInsertPartialTuples(
-                attribute_map, accessor, num_tuples_to_insert);
-      });
-
-      if (accessor == first_accessor) {
-        // Now we know how many full tuples can be inserted into this
-        // output_block (viz. number of tuples inserted from first ValueAccessor).
-        // We should only insert that many tuples from the remaining
-        // ValueAccessors as well.
-        num_tuples_to_insert = num_tuples_inserted;
-      } else {
-        // Since the bulk insertion of the first ValueAccessor should already
-        // have reserved the space for all the other ValueAccessors' columns,
-        // we must have been able to insert all the tuples we asked to insert.
-        DCHECK(num_tuples_inserted == num_tuples_to_insert);
-      }
-    }
-
-    // After one round of insertions, we have successfully inserted as many
-    // tuples as possible into the output_block. Strictly speaking, it's
-    // possible that there is more space for insertions because the size
-    // estimation of variable length columns is conservative. But we will ignore
-    // that case and proceed assuming that this output_block is full.
-
-    // Update the header for output_block and then return it.
-    output_block->bulkInsertPartialTuplesFinalize(num_tuples_inserted);
-    const bool mark_full = always_mark_full
-                           || !first_accessor->iterationFinishedVirtual();
-    this->returnBlock(std::move(output_block), mark_full);
-  }
-}
-
 void InsertDestination::insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                                                std::vector<Tuple>::const_iterator end) {
   if (begin == end) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3487638..408e76b 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -152,10 +152,6 @@ class InsertDestination : public InsertDestinationInterface {
       ValueAccessor *accessor,
       bool always_mark_full = false) override;
 
-  void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) override;
-
   void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                               std::vector<Tuple>::const_iterator end) override;
 
@@ -317,12 +313,6 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
   ~AlwaysCreateBlockInsertDestination() override {
   }
 
-  void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) override  {
-    LOG(FATAL) << "bulkInsertTuplesFromValueAccessors is not implemented for AlwaysCreateBlockInsertDestination";
-  }
-
  protected:
   MutableBlockReference getBlockForInsertion() override;
 
@@ -527,12 +517,6 @@ class PartitionAwareInsertDestination : public InsertDestination {
       ValueAccessor *accessor,
       bool always_mark_full = false) override;
 
-  void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) override  {
-    LOG(FATAL) << "bulkInsertTuplesFromValueAccessors is not implemented for PartitionAwareInsertDestination";
-  }
-
   void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                               std::vector<Tuple>::const_iterator end) override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/InsertDestinationInterface.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestinationInterface.hpp b/storage/InsertDestinationInterface.hpp
index b62d3e5..423dff1 100644
--- a/storage/InsertDestinationInterface.hpp
+++ b/storage/InsertDestinationInterface.hpp
@@ -20,7 +20,6 @@
 #ifndef QUICKSTEP_STORAGE_INSERT_DESTINATION_INTERFACE_HPP_
 #define QUICKSTEP_STORAGE_INSERT_DESTINATION_INTERFACE_HPP_
 
-#include <utility>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -123,27 +122,6 @@ class InsertDestinationInterface {
       bool always_mark_full = false) = 0;
 
   /**
-   * @brief Bulk-insert tuples from one or more ValueAccessors
-   *        into blocks managed by this InsertDestination.
-   *
-   * @warning It is implicitly assumed that all the input ValueAccessors have
-   *          the same number of tuples in them.
-   *
-   * @param accessor_attribute_map A vector of pairs of ValueAccessor and
-   *        corresponding attribute map
-   *        The i-th attribute ID in the attr map for a value accessor is "n" 
-   *        if the attribute_id "i" in the output relation
-   *        is the attribute_id "n" in corresponding input value accessor.
-   *        Set the i-th element to kInvalidCatalogId if it doesn't come from
-   *        the corresponding value accessor.
-   * @param always_mark_full If \c true, always mark the blocks full after
-   *        insertion from ValueAccessor even when partially full.
-   **/
-  virtual void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) = 0;
-
-  /**
    * @brief Insert tuples from a range of Tuples in a vector.
    * @warning Unlike bulkInsertTuples(), this is not well-optimized and not
    *          intended for general use. It should only be used by

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index 1e6f7ff..f955c99 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -41,61 +41,54 @@ namespace quickstep {
 
 QUICKSTEP_REGISTER_TUPLE_STORE(SplitRowStoreTupleStorageSubBlock, SPLIT_ROW_STORE);
 
-using splitrow_internal::CopyGroupList;
-using splitrow_internal::ContiguousAttrs;
-using splitrow_internal::NullableAttr;
-using splitrow_internal::VarLenAttr;
-
-const std::size_t SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize = sizeof(std::uint32_t) * 2;
-
 namespace {
 
-  template<typename ValueAccessorT, bool nullable_attrs>
-  inline std::size_t CalculateVariableSize(
+template <typename ValueAccessorT, bool nullable_attrs>
+inline std::size_t CalculateVariableSize(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor) {
-    std::size_t total_size = 0;
-    attribute_id accessor_attr_id = 0;
-    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-         attr_it != relation.end();
-         ++attr_it, ++accessor_attr_id) {
-      if (!attr_it->getType().isVariableLength()) {
-        continue;
-      }
+  std::size_t total_size = 0;
+  attribute_id accessor_attr_id = 0;
+  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+       attr_it != relation.end();
+       ++attr_it, ++accessor_attr_id) {
+    if (!attr_it->getType().isVariableLength()) {
+      continue;
+    }
 
-      TypedValue value(accessor.getTypedValue(accessor_attr_id));
-      if (nullable_attrs && value.isNull()) {
-        continue;
-      }
-      total_size += value.getDataSize();
+    TypedValue value(accessor.getTypedValue(accessor_attr_id));
+    if (nullable_attrs && value.isNull()) {
+      continue;
     }
-    return total_size;
+    total_size += value.getDataSize();
   }
+  return total_size;
+}
 
-  template<typename ValueAccessorT, bool nullable_attrs>
-  inline std::size_t CalculateVariableSizeWithRemappedAttributes(
+template <typename ValueAccessorT, bool nullable_attrs>
+inline std::size_t CalculateVariableSizeWithRemappedAttributes(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor,
     const std::vector<attribute_id> &attribute_map) {
-    std::size_t total_size = 0;
-    std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-         attr_it != relation.end();
-         ++attr_it, ++attr_map_it) {
-      if (!attr_it->getType().isVariableLength()) {
-        continue;
-      }
+  std::size_t total_size = 0;
+  std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+       attr_it != relation.end();
+       ++attr_it, ++attr_map_it) {
+    if (!attr_it->getType().isVariableLength()) {
+      continue;
+    }
 
-      TypedValue value(accessor.getTypedValue(*attr_map_it));
-      if (nullable_attrs && value.isNull()) {
-        continue;
-      }
-      total_size += value.getDataSize();
+    TypedValue value(accessor.getTypedValue(*attr_map_it));
+    if (nullable_attrs && value.isNull()) {
+      continue;
     }
-    return total_size;
+    total_size += value.getDataSize();
   }
+  return total_size;
+}
 
-}  // anonymous namespace
+}  // namespace
 
 SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
     const CatalogRelationSchema &relation,
@@ -108,10 +101,7 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                            new_block,
                            sub_block_memory,
                            sub_block_memory_size),
-      header_(static_cast<Header*>(sub_block_memory)),
-      num_null_attrs_(0),
-      num_fixed_attrs_(0),
-      num_var_attrs_(0) {
+      header_(static_cast<Header*>(sub_block_memory)) {
   if (!DescriptionIsValid(relation_, description_)) {
     FATAL_ERROR("Attempted to construct a SplitRowStoreTupleStorageSubBlock from an invalid description.");
   }
@@ -153,21 +143,6 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                    + sizeof(Header) + occupancy_bitmap_bytes_;
   tuple_storage_bytes_ = sub_block_memory_size_ - (sizeof(Header) + occupancy_bitmap_bytes_);
 
-  // Some accounting information for bulk inserts.
-  for (attribute_id attr_id = 0;
-       attr_id < static_cast<attribute_id>(relation.size());
-       ++attr_id) {
-    const Type& attr_type = relation.getAttributeById(attr_id)->getType();
-    if (attr_type.isVariableLength()) {
-      fixed_len_attr_sizes_.push_back(kInvalidAttributeID);
-      num_var_attrs_++;
-    } else {
-      fixed_len_attr_sizes_.push_back(attr_type.maximumByteLength());
-      num_fixed_attrs_++;
-    }
-    num_null_attrs_ += attr_type.isNullable();
-  }
-
   if (new_block) {
     // Only need to initialize these fields, the rest of the block will be
     // zeroed-out by the StorageManager.
@@ -219,218 +194,380 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
 }
 
 tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(ValueAccessor *accessor) {
-  std::vector<attribute_id> simple_remap;
-  for (attribute_id attr_id = 0;
-      attr_id < static_cast<attribute_id>(relation_.size());
-      ++attr_id) {
-    simple_remap.push_back(attr_id);
-  }
-  return bulkInsertDispatcher(simple_remap, accessor, kCatalogMaxID, true);
-}
+  const tuple_id original_num_tuples = header_->num_tuples;
+  tuple_id pos = 0;
 
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuples(
-  const std::vector<attribute_id> &attribute_map,
-  ValueAccessor *accessor,
-  const tuple_id max_num_tuples_to_insert) {
-  return bulkInsertDispatcher(attribute_map, accessor, max_num_tuples_to_insert, false);
-}
-
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertDispatcher(
-  const std::vector<attribute_id> &attribute_map,
-  ValueAccessor *accessor,
-  tuple_id max_num_tuples_to_insert,
-  bool finalize) {
-  const bool fill_to_capacity = max_num_tuples_to_insert == kCatalogMaxID;
-
-  CopyGroupList copy_groups;
-  getCopyGroupsForAttributeMap(attribute_map, &copy_groups);
-  auto impl = accessor->getImplementationType();
-  const bool is_rowstore_source =
-    (impl == ValueAccessor::Implementation::kPackedRowStore ||
-     impl == ValueAccessor::Implementation::kSplitRowStore);
-  if (is_rowstore_source) {
-    copy_groups.merge_contiguous();
-  }
-
-  const bool copy_nulls = copy_groups.nullable_attrs_.size() > 0;
-  const bool copy_varlen = copy_groups.varlen_attrs_.size() > 0;
-
-  if (fill_to_capacity) {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
     if (relation_.hasNullableAttributes()) {
-      // TODO(marc) This is an annoying gotcha: the insertion loop assumes the null
-      // bitmaps are zero'd for a fresh insert. We could clear the bit map on each tuple
-      // iteration, but that'd be costlier.
-      std::int64_t remaining_bytes = tuple_storage_bytes_ -
-                                     (header_->variable_length_bytes_allocated +
-                                      (header_->num_tuples * tuple_slot_bytes_));
-      memset(static_cast<char *>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_, 0x0, remaining_bytes);
-    }
-  }
-
-  tuple_id num_inserted = 0;
-  if (max_num_tuples_to_insert == kCatalogMaxID) {
-    max_num_tuples_to_insert = getInsertLowerBound();
-  }
-  if (copy_varlen) {
-    if (copy_nulls) {
-      if (fill_to_capacity) {
-        num_inserted = bulkInsertPartialTuplesImpl<true, true, true>(copy_groups, accessor,
-                                                                     max_num_tuples_to_insert);
+      if (relation_.isVariableLength()) {
+        while (accessor->next()) {
+          // If packed, insert at the end of the slot array, otherwise find the
+          // first hole.
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSize<decltype(*accessor), true>(relation_, *accessor);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
+          }
+          // Allocate variable-length storage.
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          // Find the slot and locate its sub-structures.
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          // Start writing variable-length data at the beginning of the newly
+          // allocated range.
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
+            if ((nullable_idx != -1) && (attr_value.isNull())) {
+              // Set null bit and move on.
+              tuple_null_bitmap.setBit(nullable_idx, true);
+              continue;
+            }
+            if (variable_idx != -1) {
+              // Write offset and size into the slot, then copy the actual
+              // value into the variable-length storage region.
+              const std::size_t attr_size = attr_value.getDataSize();
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              // Copy fixed-length value directly into the slot.
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+            }
+          }
+          // Update occupancy bitmap and header.
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       } else {
-        num_inserted = bulkInsertPartialTuplesImpl<true, true, false>(copy_groups, accessor,
-                                                                      max_num_tuples_to_insert);
+        // Same as above, but skip variable-length checks.
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            if (nullable_idx != -1) {
+              const void *attr_value = accessor->template getUntypedValue<true>(accessor_attr_id);
+              if (attr_value == nullptr) {
+                tuple_null_bitmap.setBit(nullable_idx, true);
+              } else {
+                std::memcpy(fixed_length_attr_storage
+                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                            attr_value,
+                            attr_it->getType().maximumByteLength());
+              }
+            } else {
+              const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
+              std::memcpy(fixed_length_attr_storage
+                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                          attr_value,
+                          attr_it->getType().maximumByteLength());
+            }
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       }
     } else {
-      if (fill_to_capacity) {
-        num_inserted = bulkInsertPartialTuplesImpl<false, true, true>(copy_groups, accessor,
-                                                                      max_num_tuples_to_insert);
+      if (relation_.isVariableLength()) {
+        // Same as most general case above, but skip null checks.
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSize<decltype(*accessor), false>(relation_, *accessor);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
+          }
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
+            if (variable_idx != -1) {
+              const std::size_t attr_size = attr_value.getDataSize();
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+            }
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       } else {
-        num_inserted = bulkInsertPartialTuplesImpl<false, true, false>(copy_groups, accessor,
-                                                                       max_num_tuples_to_insert);
+        // Simplest case: skip both null and variable-length checks.
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
+            std::memcpy(fixed_length_attr_storage
+                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                        attr_value,
+                        attr_it->getType().maximumByteLength());
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       }
     }
-  } else {
-    if (copy_nulls) {
-      num_inserted = bulkInsertPartialTuplesImpl<true, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
-    } else {
-      num_inserted = bulkInsertPartialTuplesImpl<false, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
-    }
-  }
+  });
 
-  if (finalize) {
-    bulkInsertPartialTuplesFinalize(num_inserted);
-  }
-  return num_inserted;
+  return header_->num_tuples - original_num_tuples;
 }
 
-// copy_nulls is true if the incoming attributes include at least one nullable attribute
-// copy_varlen is true if the incoming attributes include at least one varlen attribute
-template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesImpl(
-  const CopyGroupList &copy_groups,
-  ValueAccessor *accessor,
-  std::size_t max_num_tuples_to_insert) {
-  std::size_t num_tuples_inserted = 0;
-
-  // We only append to the end of the block to cut down on complexity.
-  char *tuple_slot = static_cast<char *>(tuple_storage_) +  header_->num_tuples * tuple_slot_bytes_;
-
-  std::uint32_t varlen_heap_offset = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-  std::uint32_t varlen_heap_offset_orig = varlen_heap_offset;
-
-  BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
-  char *fixed_len_cursor = tuple_slot + BitVector<true>::BytesNeeded(num_null_attrs_);
-
-
-
-  std::size_t storage_available = tuple_storage_bytes_ -
-                                    (header_->variable_length_bytes_allocated +
-                                     header_->num_tuples * tuple_slot_bytes_);
-
-  // The number of bytes that must be reserved per tuple inserted due to gaps.
-  std::size_t varlen_reserve = relation_.getMaximumVariableByteLength();
-  if (fill_to_capacity) {
-    for (std::size_t vattr_idx = 0; vattr_idx < copy_groups.varlen_attrs_.size(); vattr_idx++) {
-      varlen_reserve -= relation_.getAttributeById(
-        copy_groups.varlen_attrs_[vattr_idx].dst_attr_id_)->getType().maximumByteLength();
-    }
-    DCHECK_GE(relation_.getMaximumVariableByteLength(), varlen_reserve);
-  }
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor) {
+  DEBUG_ASSERT(attribute_map.size() == relation_.size());
+  const tuple_id original_num_tuples = header_->num_tuples;
+  tuple_id pos = 0;
 
   InvokeOnAnyValueAccessor(
-    accessor,
-    [&](auto *accessor) -> void {  // NOLINT(build/c++11
-      do {
-        const std::size_t num_c_attr = copy_groups.contiguous_attrs_.size();
-        const std::size_t num_n_attr = copy_groups.nullable_attrs_.size();
-        const std::size_t num_v_attr = copy_groups.varlen_attrs_.size();
-
-        const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
-
-        while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
-          for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
-            const ContiguousAttrs &cattr = copy_groups.contiguous_attrs_[cattr_idx];
-            fixed_len_cursor += cattr.bytes_to_advance_;
-            const void *attr_value = accessor->template getUntypedValue<false>(cattr.src_attr_id_);
-            std::memcpy(fixed_len_cursor, attr_value, cattr.bytes_to_copy_);
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    if (relation_.hasNullableAttributes()) {
+      if (relation_.isVariableLength()) {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), true>(
+                  relation_, *accessor, attribute_map);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
           }
-
-          if (copy_nulls) {
-            tuple_null_bitmap.setMemory(tuple_slot);
-            for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
-              const NullableAttr &nattr = copy_groups.nullable_attrs_[nattr_idx];
-              const void *attr_value = accessor->template getUntypedValue<true>(nattr.src_attr_id_);
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
+            if ((nullable_idx != -1) && (attr_value.isNull())) {
+              tuple_null_bitmap.setBit(nullable_idx, true);
+              continue;
+            }
+            if (variable_idx != -1) {
+              const std::size_t attr_size = attr_value.getDataSize();
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+            }
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
+      } else {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            if (nullable_idx != -1) {
+              const void *attr_value = accessor->template getUntypedValue<true>(*attr_map_it);
               if (attr_value == nullptr) {
-                tuple_null_bitmap.setBit(nattr.nullable_attr_idx_, true);
+                tuple_null_bitmap.setBit(nullable_idx, true);
+              } else {
+                std::memcpy(fixed_length_attr_storage
+                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                            attr_value,
+                            attr_it->getType().maximumByteLength());
               }
+            } else {
+              const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
+              std::memcpy(fixed_length_attr_storage
+                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                          attr_value,
+                          attr_it->getType().maximumByteLength());
             }
           }
-
-          if (copy_varlen) {
-            for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
-              const VarLenAttr &vattr = copy_groups.varlen_attrs_[vattr_idx];
-              fixed_len_cursor += vattr.bytes_to_advance_;
-              // Typed value is necessary as we need the length.
-              const TypedValue &attr_value = accessor->template getTypedValue(vattr.src_attr_id_);
-              if (attr_value.isNull()) {
-                continue;
-              }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
+      }
+    } else {
+      if (relation_.isVariableLength()) {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), false>(
+                  relation_, *accessor, attribute_map);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
+          }
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
+            if (variable_idx != -1) {
               const std::size_t attr_size = attr_value.getDataSize();
-              varlen_heap_offset -= attr_size;
-              std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset, attr_value.getDataPtr(),
-                          attr_size);
-              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[0] = varlen_heap_offset;
-              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[1] = static_cast<std::uint32_t>(attr_size);
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
             }
           }
-          tuple_slot += tuple_slot_bytes_;
-          fixed_len_cursor = tuple_slot + nullmap_size;
-          num_tuples_inserted++;
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
         }
-        if (fill_to_capacity) {
-          std::int64_t remaining_storage_after_inserts = storage_available -
-                                                         (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
-                                                          (varlen_heap_offset_orig - varlen_heap_offset));
-          DCHECK_LE(0, remaining_storage_after_inserts);
-          std::size_t additional_tuples_insert =
-            remaining_storage_after_inserts / (tuple_slot_bytes_ + this->relation_.getMaximumByteLength());
-          // We want to avoid a situation where we have several short insert iterations
-          // near the end of an insertion cycle.
-          if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
-            max_num_tuples_to_insert += additional_tuples_insert;
+      } else {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
+            std::memcpy(fixed_length_attr_storage
+                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                        attr_value,
+                        attr_it->getType().maximumByteLength());
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
           }
         }
-      } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
-               num_tuples_inserted < max_num_tuples_to_insert);
-    });
-
-  if (copy_varlen) {
-    header_->variable_length_bytes_allocated += (varlen_heap_offset_orig - varlen_heap_offset);
-  }
-
-  return num_tuples_inserted;
-}
-
-void SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesFinalize(
-    const tuple_id num_tuples_inserted) {
-  occupancy_bitmap_->setBitRange(header_->max_tid + 1, num_tuples_inserted, true);
-  header_->num_tuples += num_tuples_inserted;
-  header_->max_tid += num_tuples_inserted;
-}
-
-std::size_t SplitRowStoreTupleStorageSubBlock::getInsertLowerBound() const {
-  const std::size_t remaining_storage_bytes = tuple_storage_bytes_ -
-                                              (header_->variable_length_bytes_allocated +
-                                               ((header_->max_tid + 1) * tuple_slot_bytes_));
-  const std::size_t tuple_max_size = tuple_slot_bytes_ + relation_.getMaximumVariableByteLength();
-  return remaining_storage_bytes / tuple_max_size;
-}
+      }
+    }
+  });
 
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor) {
-  DCHECK_EQ(relation_.size(), attribute_map.size());
-  return bulkInsertDispatcher(attribute_map, accessor, kCatalogMaxID, true);
+  return header_->num_tuples - original_num_tuples;
 }
 
 const void* SplitRowStoreTupleStorageSubBlock::getAttributeValue(
@@ -865,67 +1002,4 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
   return InsertResult(pos, false);
 }
 
-// Copy groups are used by insert algorithms to efficiently copy attributes from a
-// variety of source schemas with some matching attributes in the destination (this) store.
-// SplitRow has 3 distinct zones which define a physical tuple:
-//    [null_bitmap] [fixed_length_zone] [var_len_pairs]
-// When we do our insert algorithm, we first copy over fixed length attributes. Since there
-// can be gaps, and reorderings in the source schema, we need to know:
-//    * Where to copy the src attr into (ie offset from start of fixed_len_zone)
-//    * How many bytes to copy
-//    * Which src attr we are copying
-// When copying fixed length attributes, we calculate the offset into our tuple, do a memcpy for
-// the length of the data with the src attribute.
-//
-// Copying variable length attributes pairs is similar. Note that there is a heap at the end of
-// the SplitRow for actual data and the tuple contains pairs of (heap offset, length). Having to
-// copy varlen into the heap is the main difference from copying fixed length.
-void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
-  const std::vector<attribute_id> &attribute_map,
-  CopyGroupList *copy_groups) {
-  DCHECK_EQ(attribute_map.size(), relation_.size());
-
-  attribute_id num_attrs = attribute_map.size();
-
-  std::size_t contig_adv = 0;
-  std::size_t varlen_adv = 0;
-  for (attribute_id attr_id = 0; attr_id < num_attrs; ++attr_id) {
-    attribute_id src_attr = attribute_map[attr_id];
-
-    // Attribute doesn't exist in src.
-    if (src_attr == kInvalidCatalogId) {
-      // create a placeholder for now
-      if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
-        // fixed len
-        contig_adv += fixed_len_attr_sizes_[attr_id];
-      } else {
-        // var len
-        varlen_adv += kVarLenSlotSize;
-      }
-      continue;
-    }
-
-    // Attribute exists in src.
-    if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
-      // fixed len
-      copy_groups->contiguous_attrs_.push_back(
-        ContiguousAttrs(src_attr, fixed_len_attr_sizes_[attr_id], contig_adv));
-      contig_adv = fixed_len_attr_sizes_[attr_id];
-    } else {
-      // var len
-      copy_groups->varlen_attrs_.push_back(VarLenAttr(src_attr, attr_id, varlen_adv));
-      varlen_adv = SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize;
-    }
-
-    if (relation_.getNullableAttributeIndex(attr_id) != -1) {
-      copy_groups->nullable_attrs_.push_back(
-        NullableAttr(src_attr, relation_.getNullableAttributeIndex(attr_id)));
-    }
-  }
-  // This will point us to the beginning of the varlen zone.
-  if (copy_groups->varlen_attrs_.size() > 0) {
-    copy_groups->varlen_attrs_[0].bytes_to_advance_ += contig_adv;
-  }
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index 89c756d..a930103 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -45,150 +45,6 @@ class ValueAccessor;
 
 QUICKSTEP_DECLARE_SUB_BLOCK_TYPE_REGISTERED(SplitRowStoreTupleStorageSubBlock);
 
-namespace splitrow_internal {
-// A CopyGroup contains information about ane run of attributes in the source
-// ValueAccessor that can be copied into the output block. The
-// getCopyGroupsForAttributeMap function below takes an attribute map for a source
-// and converts it into a sequence of runs. The goal is to minimize the number
-// of memcpy calls and address calculations that occur during bulk insertion.
-// Contiguous attributes from a rowstore source can be merged into a single copy group.
-//
-// A single ContiguousAttrs CopyGroup consists of contiguous attributes, nullable
-// or not. "Contiguous" here means that their attribute IDs are successive in both
-// the source and destination relations.
-//
-// A NullAttr refers to exactly one nullable attribute. Nullable columns are
-// represented using fixed length inline data as well as a null bitmap.
-// In a particular tuple, if the attribute has a null value, the inline data
-// has no meaning. So it is safe to copy it or not. We use this fact to merge
-// runs together aggressively, i.e., a ContiguousAttrs group may include a
-// nullable attribute. However, we also create a NullableAttr in that case in
-// order to check the null bitmap.
-//
-// A gap is a run of destination (output) attributes that don't come from a
-// particular source. This occurs during bulkInsertPartialTuples. They must be
-// skipped during the insert (not copied over). They are indicated by a
-// kInvalidCatalogId in the attribute map. For efficiency, the gap size
-// is merged into the bytes_to_advance_ of previous ContiguousAttrs copy group.
-// For gaps at the start of the attribute map, we just create a ContiguousAttrs
-// copy group with 0 bytes to copy and dummy (0) source attribute id.
-//
-// eg. For 4B integer attrs, from a row store source,
-// if the input attribute_map is {-1,0,5,6,7,-1,2,4,9,10,-1}
-// with input/output attributes 4 and 7 being nullable,
-// we will create the following ContiguousAttrs copy groups
-//
-//  ----------------------------------------------------
-//  |src_id_      |bytes_to_advance_| bytes_to_copy_   |
-//  |-------------|-----------------|------------------|
-//  |            0|                4|                 4|
-//  |            5|                4|                12|
-//  |            2|               16|                 4|
-//  |            4|                4|                 4|
-//  |            9|                4|                 8|
-//  ----------------------------------------------------
-// and two NullableAttrs with src_attr_id_ set to 4 and 7.
-//
-// In this example, we do 6 memcpy calls and 6 address calculations
-// as well as 2 bitvector lookups for each tuple. A naive copy algorithm
-// would do 11 memcpy calls and address calculations, along with the
-// bitvector lookups, not to mention the schema lookups,
-// all interspersed in a complex loop with lots of branches.
-//
-// If the source was a column store, then we can't merge contiguous
-// attributes (or gaps). So we would have 11 ContigousAttrs copy groups with
-// three of them having bytes_to_copy = 0 (corresponding to the gaps) and
-// the rest having bytes_to_copy_ = 4.
-//
-// SplitRowStore supports variable length attributes. Since the layout of the
-// tuple is like: [null bitmap][fixed length attributes][variable length offsets]
-// we do all the variable length copies after the fixed length copies.
-//
-struct CopyGroup {
-  attribute_id src_attr_id_;  // The attr_id of starting input attribute for run.
-
-  explicit CopyGroup(const attribute_id source_attr_id)
-    : src_attr_id_(source_attr_id) {}
-};
-
-struct ContiguousAttrs : public CopyGroup {
-  std::size_t bytes_to_advance_;  // Number of bytes to advance destination ptr
-                                  // to get to the location where we copy THIS attribute.
-  std::size_t bytes_to_copy_;     // Number of bytes to copy from source.
-
-  ContiguousAttrs(
-    const attribute_id source_attr_id,
-    const std::size_t bytes_to_copy,
-    const std::size_t bytes_to_advance)
-    : CopyGroup(source_attr_id),
-      bytes_to_advance_(bytes_to_advance),
-      bytes_to_copy_(bytes_to_copy) { }
-};
-
-struct VarLenAttr : public CopyGroup {
-  std::size_t bytes_to_advance_;
-  attribute_id dst_attr_id_;
-  VarLenAttr(const attribute_id source_attr_id,
-             const attribute_id dst_attr_id,
-             const std::size_t bytes_to_advance)
-    : CopyGroup(source_attr_id),
-      bytes_to_advance_(bytes_to_advance),
-      dst_attr_id_(dst_attr_id) {}
-};
-
-struct NullableAttr : public CopyGroup {
-  int nullable_attr_idx_;  // index into null bitmap
-
-  NullableAttr(attribute_id source_attr_id_,
-               int nullable_attr_idx)
-    : CopyGroup(source_attr_id_),
-      nullable_attr_idx_(nullable_attr_idx) {}
-};
-
-struct CopyGroupList {
-  CopyGroupList()
-    : contiguous_attrs_(),
-      nullable_attrs_(),
-      varlen_attrs_() {}
-
-  /**
-   * @brief Attributes which are exactly sequential are merged to a single copy.
-   */
-  void merge_contiguous() {
-    if (contiguous_attrs_.size() < 2) {
-      return;
-    }
-
-    int add_to_advance = 0;
-    for (std::size_t idx = 1; idx < contiguous_attrs_.size(); ++idx) {
-      ContiguousAttrs *current_attr = &contiguous_attrs_[idx];
-      ContiguousAttrs *previous_attr = &contiguous_attrs_[idx - 1];
-      if (add_to_advance > 0) {
-        current_attr->bytes_to_advance_ += add_to_advance;
-        add_to_advance = 0;
-      }
-      // The merge step:
-      if (previous_attr->src_attr_id_ + 1 == current_attr->src_attr_id_ &&
-            previous_attr->bytes_to_copy_ == current_attr->bytes_to_advance_) {
-        previous_attr->bytes_to_copy_ += current_attr->bytes_to_copy_;
-        add_to_advance += current_attr->bytes_to_advance_;
-        contiguous_attrs_.erase(contiguous_attrs_.begin() + idx);
-        idx--;
-      }
-    }
-
-    if (varlen_attrs_.size() > 0) {
-      varlen_attrs_[0].bytes_to_advance_ += add_to_advance;
-    }
-  }
-
-  std::vector<ContiguousAttrs> contiguous_attrs_;
-  std::vector<NullableAttr> nullable_attrs_;
-  std::vector<VarLenAttr> varlen_attrs_;
-};
-
-}  // namespace splitrow_internal
-
 /** \addtogroup Storage
  *  @{
  */
@@ -204,8 +60,6 @@ struct CopyGroupList {
  *       storage can be reclaimed by calling rebuild().
  **/
 class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
-  static const std::size_t kVarLenSlotSize;
-
  public:
   SplitRowStoreTupleStorageSubBlock(const CatalogRelationSchema &relation,
                                     const TupleStorageSubBlockDescription &description,
@@ -301,13 +155,6 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor) override;
 
-  tuple_id bulkInsertPartialTuples(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    const tuple_id max_num_tuples_to_insert) override;
-
-  void bulkInsertPartialTuplesFinalize(const tuple_id num_tuples_inserted) override;
-
   const void* getAttributeValue(const tuple_id tuple,
                                 const attribute_id attr) const override;
 
@@ -366,33 +213,6 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   template <bool nullable_attrs, bool variable_length_attrs>
   InsertResult insertTupleImpl(const Tuple &tuple);
 
-  template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
-  tuple_id bulkInsertPartialTuplesImpl(
-    const splitrow_internal::CopyGroupList &copy_groups,
-    ValueAccessor *accessor,
-    std::size_t max_num_tuples_to_insert);
-
-  tuple_id bulkInsertDispatcher(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    tuple_id max_num_tuples_to_insert,
-    bool finalize);
-
-  void getCopyGroupsForAttributeMap(
-    const std::vector<attribute_id> &attribute_map,
-    splitrow_internal::CopyGroupList *copy_groups);
-
-  std::size_t getInsertLowerBound() const;
-
-  // When varlen attributes are bulk inserted, the difference between the maximum
-  // possible size and the actual size of the tuples will cause an underestimate of
-  // the number of tuples we can insert. This threshold puts a limit on the number
-  // of tuples to attempt to insert. A smaller number will give more rounds of insertion
-  // and a more-packed block, but at the cost of insertion speed.
-  std::size_t getInsertLowerBoundThreshold() const {
-    return 10;
-  }
-
   Header *header_;
 
   std::unique_ptr<BitVector<false>> occupancy_bitmap_;
@@ -401,18 +221,12 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   void *tuple_storage_;
   std::size_t tuple_storage_bytes_;
   std::size_t tuple_slot_bytes_;
-  std::vector<std::size_t> fixed_len_attr_sizes_;
-
-  std::size_t num_null_attrs_;
-  std::size_t num_fixed_attrs_;
-  std::size_t num_var_attrs_;
 
   std::size_t per_tuple_null_bitmap_bytes_;
 
   friend class SplitRowStoreTupleStorageSubBlockTest;
   friend class SplitRowStoreValueAccessor;
   FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, InitializeTest);
-  FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest);
 
   DISALLOW_COPY_AND_ASSIGN(SplitRowStoreTupleStorageSubBlock);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 6267d6b..ea74ee6 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -284,30 +284,6 @@ tuple_id StorageBlock::bulkInsertTuplesWithRemappedAttributes(
   return num_inserted;
 }
 
-tuple_id StorageBlock::bulkInsertPartialTuples(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    const tuple_id max_num_tuples_to_insert) {
-  const tuple_id num_inserted
-      = tuple_store_->bulkInsertPartialTuples(attribute_map,
-                                              accessor,
-                                              max_num_tuples_to_insert);
-  if (num_inserted != 0) {
-    invalidateAllIndexes();
-    dirty_ = true;
-  } else if (tuple_store_->isEmpty()) {
-    if (!accessor->iterationFinishedVirtual()) {
-      throw TupleTooLargeForBlock(0);
-    }
-  }
-  return num_inserted;
-}
-
-void StorageBlock::bulkInsertPartialTuplesFinalize(
-    const tuple_id num_tuples_inserted) {
-  tuple_store_->bulkInsertPartialTuplesFinalize(num_tuples_inserted);
-}
-
 void StorageBlock::sample(const bool is_block_sample,
                           const int percentage,
                           InsertDestinationInterface *destination) const {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index ed252c5..56b3bdc 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -307,7 +307,6 @@ class StorageBlock : public StorageBlockBase {
    *        iteration will be advanced to the first non-inserted tuple or, if
    *        all accessible tuples were inserted in this block, to the end
    *        position.
-   * @param max_tuples_to_insert Insert at most these many tuples
    * @return The number of tuples inserted from accessor.
    **/
   tuple_id bulkInsertTuplesWithRemappedAttributes(
@@ -315,49 +314,6 @@ class StorageBlock : public StorageBlockBase {
       ValueAccessor *accessor);
 
   /**
-   * @brief Insert up to max_num_tuples_to_insert tuples from a ValueAccessor
-   *        as a single batch, using the attribute_map to project and reorder
-   *        columns from the input ValueAccessor. Does not update header.
-   *
-   * @note Typical usage is where you want to bulk-insert columns from two
-   *       or more value accessors. Instead of writing out the columns into
-   *       one or more column vector value accessors, you can simply use this
-   *       function with the appropriate attribute_map for each value
-   *       accessor (InsertDestination::bulkInsertTuplesFromValueAccessors
-   *       handles all the details) to insert tuples without an extra temp copy.
-   * 
-   * @warning Must call bulkInsertPartialTuplesFinalize() to update the header,
-   *          until which point, the insertion is not visible to others.
-   * @warning The inserted tuples may be placed in sub-optimal locations in this
-   *          TupleStorageSubBlock.
-   *
-   * @param attribute_map A vector which maps the attributes of this
-   *        TupleStorageSubBlock's relation (gaps indicated with kInvalidCatalogId)
-   *         to the corresponding attributes which should be read from accessor.
-   * @param accessor A ValueAccessor to insert tuples from. The accessor's
-   *        iteration will be advanced to the first non-inserted tuple or, if
-   *        all accessible tuples were inserted in this sub-block, to the end
-   *        position.
-   * @return The number of tuples inserted from accessor.
-   **/
-  tuple_id bulkInsertPartialTuples(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    const tuple_id max_num_tuples_to_insert);
-
-  /**
-   * @brief Update header after a bulkInsertPartialTuples.
-   *
-   * @warning Only call this after a bulkInsertPartialTuples, passing in the
-   *          number of tuples that were inserted (return value of that function).
-   *
-   * @param num_tuples_inserted Number of tuples inserted (i.e., how much to
-   *        advance the header.num_tuples by). Should be equal to the return
-   *        value of bulkInsertPartialTuples.
-   **/
-  void bulkInsertPartialTuplesFinalize(tuple_id num_tuples_inserted);
-
-  /**
    * @brief Get the IDs of tuples in this StorageBlock which match a given Predicate.
    *
    * @param predicate The predicate to match.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/TupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/TupleStorageSubBlock.hpp b/storage/TupleStorageSubBlock.hpp
index 26e8027..aed6eea 100644
--- a/storage/TupleStorageSubBlock.hpp
+++ b/storage/TupleStorageSubBlock.hpp
@@ -272,56 +272,6 @@ class TupleStorageSubBlock {
       ValueAccessor *accessor) = 0;
 
   /**
-   * @brief Insert up to max_num_tuples_to_insert tuples from a ValueAccessor
-   *        as a single batch, using the attribute_map to project and reorder
-   *        columns from the input ValueAccessor. Does not update header.
-   *
-   * @note Typical usage is where you want to bulk-insert columns from two
-   *       or more value accessors. Instead of writing out the columns into
-   *       one or more column vector value accessors, you can simply use this
-   *       function with the appropriate attribute_map for each value
-   *       accessor (InsertDestination::bulkInsertTuplesFromValueAccessors
-   *       handles all the details) to insert tuples without an extra temp copy.
-   * 
-   * @warning Must call bulkInsertPartialTuplesFinalize() to update the header,
-   *          until which point, the insertion is not visible to others.
-   * @warning The inserted tuples may be placed in a suboptimal position in the
-   *          block.
-   *
-   * @param attribute_map A vector which maps the attributes of this
-   *        TupleStorageSubBlock's relation (gaps indicated with kInvalidCatalogId)
-   *         to the corresponding attributes which should be read from accessor.
-   * @param accessor A ValueAccessor to insert tuples from. The accessor's
-   *        iteration will be advanced to the first non-inserted tuple or, if
-   *        all accessible tuples were inserted in this sub-block, to the end
-   *        position.
-   * @return The number of tuples inserted from accessor.
-   **/
-  virtual tuple_id bulkInsertPartialTuples(
-      const std::vector<attribute_id> &attribute_map,
-      ValueAccessor *accessor,
-      const tuple_id max_num_tuples_to_insert) {
-    LOG(FATAL) << "Partial bulk insert is not supported for this TupleStorageBlock type ("
-               << getTupleStorageSubBlockType() << ").";
-  }
-
-  /**
-   * @brief Update header after a bulkInsertPartialTuples.
-   *
-   * @warning Only call this after a bulkInsertPartialTuples, passing in the
-   *          number of tuples that were inserted (return value of that function).
-   *
-   * @param num_tuples_inserted Number of tuples inserted (i.e., how much to
-   *        advance the header.num_tuples by). Should be equal to the return
-   *        value of bulkInsertPartialTuples.
-   **/
-  virtual void bulkInsertPartialTuplesFinalize(
-      const tuple_id num_tuples_inserted) {
-    LOG(FATAL) << "Partial bulk insert is not supported for this TupleStorageBlock type ("
-               << getTupleStorageSubBlockType() << ").";
-  }
-
-  /**
    * @brief Get the (untyped) value of an attribute in a tuple in this buffer.
    * @warning This method may not be supported for all implementations of
    *          TupleStorageSubBlock. supportsUntypedGetAttributeValue() MUST be