You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/06 02:08:07 UTC

[3/3] incubator-quickstep git commit: Copy test

Copy test


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c7fdc360
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c7fdc360
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c7fdc360

Branch: refs/heads/output-attr-order
Commit: c7fdc360e9f3ec9466d7854db8ee7fb85630ae91
Parents: 31c8093
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Jan 5 20:07:44 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Jan 5 20:07:44 2017 -0600

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt             |   1 +
 relational_operators/HashJoinOperator.cpp       | 143 ++++++++--
 storage/CMakeLists.txt                          |  10 +
 storage/InsertContext.hpp                       | 281 +++++++++++++++++++
 storage/InsertDestination.cpp                   |  28 ++
 storage/InsertDestination.hpp                   |   9 +-
 storage/SplitRowStoreTupleStorageSubBlock.cpp   |  22 ++
 storage/SplitRowStoreTupleStorageSubBlock.hpp   |   4 +
 storage/SplitRowStoreValueAccessor.hpp          |  22 ++
 storage/StorageBlock.cpp                        |  16 ++
 storage/StorageBlock.hpp                        |   3 +
 storage/TupleStorageSubBlock.hpp                |   6 +
 storage/ValueAccessor.hpp                       |  72 +++++
 types/containers/ColumnVectorsValueAccessor.hpp |  22 ++
 14 files changed, 609 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 9e4b1b6..b792a7b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -213,6 +213,7 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_HashTable
+                      quickstep_storage_InsertContext
                       quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 4a91f86..1a34e32 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -20,6 +20,7 @@
 #include "relational_operators/HashJoinOperator.hpp"
 
 #include <algorithm>
+#include <map>
 #include <memory>
 #include <unordered_map>
 #include <utility>
@@ -35,6 +36,7 @@
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/HashTable.hpp"
+#include "storage/InsertContext.hpp"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -63,12 +65,16 @@ namespace quickstep {
 
 namespace {
 
+typedef std::vector<std::pair<tuple_id, tuple_id>> VectorOfPairs;
+typedef std::pair<std::vector<tuple_id>, std::vector<tuple_id>> PairOfVectors;
+
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
 // tuples from the inner relation. It stores matching tuple ID pairs
-// in an unordered_map keyed by inner block ID.
-class MapBasedJoinedTupleCollector {
+// in an unordered_map keyed by inner block ID and a vector of
+// pairs of (build-tupleID, probe-tuple-ID).
+class VectorsOfPairsJoinedTuplesCollector {
  public:
-  MapBasedJoinedTupleCollector() {
+  VectorsOfPairsJoinedTuplesCollector() {
   }
 
   template <typename ValueAccessorT>
@@ -81,8 +87,7 @@ class MapBasedJoinedTupleCollector {
   // key is inner block_id, values are vectors of joined tuple ID pairs with
   // tuple ID from the inner block on the left and the outer block on the
   // right.
-  inline std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>>*
-      getJoinedTuples() {
+  inline std::unordered_map<block_id, VectorOfPairs>* getJoinedTuples() {
     return &joined_tuples_;
   }
 
@@ -92,7 +97,35 @@ class MapBasedJoinedTupleCollector {
   // cross-product of all tuples from both blocks, but simply using pairs of
   // tuple-IDs is expected to be more space efficient if the result set is less
   // than 1/64 the cardinality of the cross-product.
-  std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
+  std::unordered_map<block_id, VectorOfPairs> joined_tuples_;
+};
+
+// Another collector using an unordered_map keyed on inner block just like above,
+// except that it uses of a pair of (build-tupleIDs-vector, probe-tuple-IDs-vector).
+class PairsOfVectorsJoinedTuplesCollector {
+ public:
+  PairsOfVectorsJoinedTuplesCollector() {
+  }
+
+  template <typename ValueAccessorT>
+  inline void operator()(const ValueAccessorT &accessor,
+                         const TupleReference &tref) {
+    auto &entry = joined_tuples_[tref.block];
+    entry.first.emplace_back(tref.tuple);
+    entry.second.emplace_back(accessor.getCurrentPosition());
+  }
+
+  // Get a mutable pointer to the collected map of joined tuple ID pairs. The
+  // key is inner block_id, value is a pair consisting of
+  // inner block tuple IDs (first) and outer block tuple IDs (second).
+  inline std::unordered_map<block_id, PairOfVectors>* getJoinedTuples() {
+    return &joined_tuples_;
+  }
+
+ private:
+  std::unordered_map<
+    block_id,
+    std::pair<std::vector<tuple_id>, std::vector<tuple_id>>> joined_tuples_;
 };
 
 class SemiAntiJoinTupleCollector {
@@ -432,7 +465,7 @@ void HashInnerJoinWorkOrder::execute() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  MapBasedJoinedTupleCollector collector;
+  PairsOfVectorsJoinedTuplesCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -450,13 +483,49 @@ void HashInnerJoinWorkOrder::execute() {
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
 
-  for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+  std::map<attribute_id, attribute_id> build_attribute_map;
+  std::map<attribute_id, attribute_id> probe_attribute_map;
+  std::map<attribute_id, attribute_id> non_trivial_attribute_map;
+  std::vector<const Scalar *> non_trivial_expressions;
+  for (std::size_t i = 0; i < selection_.size(); ++i) {
+    const Scalar *scalar = selection_[i].get();
+    if (scalar->getDataSource() == Scalar::ScalarDataSource::kAttribute) {
+      const ScalarAttribute *scalar_attr =
+          static_cast<const ScalarAttribute *>(scalar);
+      const relation_id scalar_attr_relation_id =
+          scalar_attr->getRelationIdForValueAccessor();
+      const attribute_id scalar_attr_id =
+          scalar_attr->getAttributeIdForValueAccessor();
+
+      if (scalar_attr_relation_id == build_relation_id) {
+        build_attribute_map.emplace(scalar_attr_id, i);
+      } else {
+        DCHECK_EQ(probe_relation_id, scalar_attr->getRelationIdForValueAccessor());
+        probe_attribute_map.emplace(scalar_attr_id, i);
+      }
+    } else {
+      non_trivial_attribute_map.emplace(non_trivial_expressions.size(), i);
+      non_trivial_expressions.emplace_back(scalar);
+    }
+  }
+
+  std::unique_ptr<InsertContext> insert_context(
+      new InsertContext(output_destination_->getRelation()));
+  insert_context->addSource(build_attribute_map);
+  insert_context->addSource(probe_attribute_map);
+  insert_context->addSource(non_trivial_attribute_map);
+
+  MutableBlockReference output_block;
+  for (std::pair<const block_id, PairOfVectors>
            &build_block_entry : *collector.getJoinedTuples()) {
     BlockReference build_block =
         storage_manager_->getBlock(build_block_entry.first, build_relation_);
     const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
     std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
 
+    const std::vector<tuple_id> &build_tids = build_block_entry.second.first;
+    const std::vector<tuple_id> &probe_tids = build_block_entry.second.second;
+
     // Evaluate '*residual_predicate_', if any.
     //
     // TODO(chasseur): We might consider implementing true vectorized
@@ -468,17 +537,19 @@ void HashInnerJoinWorkOrder::execute() {
     // hash join is below a reasonable threshold so that we don't blow up
     // temporary memory requirements to an unreasonable degree.
     if (residual_predicate_ != nullptr) {
-      std::vector<std::pair<tuple_id, tuple_id>> filtered_matches;
+      PairOfVectors filtered_matches;
 
-      for (const std::pair<tuple_id, tuple_id> &hash_match
-           : build_block_entry.second) {
+      for (std::size_t i = 0; i < build_tids.size(); ++i) {
+        const tuple_id build_tid = build_tids[i];
+        const tuple_id probe_tid = probe_tids[i];
         if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
                                                         build_relation_id,
-                                                        hash_match.first,
+                                                        build_tid,
                                                         *probe_accessor,
                                                         probe_relation_id,
-                                                        hash_match.second)) {
-          filtered_matches.emplace_back(hash_match);
+                                                        probe_tid)) {
+          filtered_matches.first.emplace_back(build_tid);
+          filtered_matches.second.emplace_back(probe_tid);
         }
       }
 
@@ -502,22 +573,36 @@ void HashInnerJoinWorkOrder::execute() {
     // matching tuples in each individual inner block but very many inner
     // blocks with at least one match).
     ColumnVectorsValueAccessor temp_result;
-    for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
-         selection_cit != selection_.end();
-         ++selection_cit) {
-      temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id,
-                                                                  build_accessor.get(),
-                                                                  probe_relation_id,
-                                                                  probe_accessor.get(),
-                                                                  build_block_entry.second));
+    if (non_trivial_expressions.size() > 0) {
+      VectorOfPairs zipped_joined_tuple_ids;
+      zipped_joined_tuple_ids.reserve(build_tids.size());
+      for (std::size_t i = 0; i < build_tids.size(); ++i) {
+        zipped_joined_tuple_ids.emplace_back(build_tids[i], probe_tids[i]);
+      }
+
+      for (auto selection_cit = non_trivial_expressions.begin();
+           selection_cit != non_trivial_expressions.end();
+           ++selection_cit) {
+        temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id,
+                                                                    build_accessor.get(),
+                                                                    probe_relation_id,
+                                                                    probe_accessor.get(),
+                                                                    zipped_joined_tuple_ids));
+      }
     }
 
-    // NOTE(chasseur): calling the bulk-insert method of InsertDestination once
-    // for each pair of joined blocks incurs some extra overhead that could be
-    // avoided by keeping checked-out MutableBlockReferences across iterations
-    // of this loop, but that would get messy when combined with partitioning.
-    output_destination_->bulkInsertTuples(&temp_result);
+    std::unique_ptr<ValueAccessor> ordered_build_accessor(
+        build_accessor->createSharedOrderedTupleIdSequenceAdapterVirtual(build_tids));
+    std::unique_ptr<ValueAccessor> ordered_probe_accessor(
+        probe_accessor->createSharedOrderedTupleIdSequenceAdapterVirtual(probe_tids));
+
+    output_destination_->bulkInsertTuples(
+        { ordered_build_accessor.get(), ordered_probe_accessor.get(), &temp_result },
+        insert_context.get(),
+        &output_block);
   }
+
+  output_destination_->returnBlock(&output_block);
 }
 
 void HashSemiJoinWorkOrder::execute() {
@@ -550,7 +635,7 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   // We collect all the matching probe relation tuples, as there's a residual
   // preidcate that needs to be applied after collecting these matches.
-  MapBasedJoinedTupleCollector collector;
+  VectorsOfPairsJoinedTuplesCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -759,7 +844,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  MapBasedJoinedTupleCollector collector;
+  VectorsOfPairsJoinedTuplesCollector collector;
   // We probe the hash table and get all the matches. Unlike
   // executeWithoutResidualPredicate(), we have to collect all the matching
   // tuples, because after this step we still have to evalute the residual

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 61a8a99..f3869c9 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -219,6 +219,7 @@ add_library(quickstep_storage_HashTableKeyManager ../empty_src.cpp HashTableKeyM
 add_library(quickstep_storage_HashTablePool ../empty_src.cpp HashTablePool.hpp)
 add_library(quickstep_storage_IndexSubBlock ../empty_src.cpp IndexSubBlock.hpp)
 add_library(quickstep_storage_IndexSubBlockDescriptionFactory ../empty_src.cpp IndexSubBlockDescriptionFactory.hpp)
+add_library(quickstep_storage_InsertContext ../empty_src.cpp InsertContext.hpp)
 add_library(quickstep_storage_InsertDestination InsertDestination.cpp InsertDestination.hpp)
 add_library(quickstep_storage_InsertDestinationInterface
             ../empty_src.cpp
@@ -782,6 +783,13 @@ target_link_libraries(quickstep_storage_IndexSubBlock
 target_link_libraries(quickstep_storage_IndexSubBlockDescriptionFactory
                       quickstep_storage_StorageBlockLayout
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_InsertContext
+                      quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_InsertDestination
                       glog
                       gtest
@@ -940,6 +948,7 @@ target_link_libraries(quickstep_storage_SimpleScalarSeparateChainingHashTable
 target_link_libraries(quickstep_storage_SplitRowStoreTupleStorageSubBlock
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_expressions_predicate_PredicateCost
+                      quickstep_storage_InsertContext
                       quickstep_storage_SplitRowStoreValueAccessor
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_StorageErrors
@@ -1175,6 +1184,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_HashTablePool
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_IndexSubBlockDescriptionFactory
+                      quickstep_storage_InsertContext
                       quickstep_storage_InsertDestination
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_InsertDestination_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/InsertContext.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertContext.hpp b/storage/InsertContext.hpp
new file mode 100644
index 0000000..b321528
--- /dev/null
+++ b/storage/InsertContext.hpp
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_INSERT_CONTEXT_HPP_
+#define QUICKSTEP_STORAGE_INSERT_CONTEXT_HPP_
+
+#include <cstddef>
+#include <functional>
+#include <map>
+
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+struct CopyGroup {
+  CopyGroup(const attribute_id source_attr_id_in,
+            const std::size_t bytes_to_advance_in,
+            const std::size_t bytes_to_copy_in)
+      : source_attr_id(source_attr_id_in),
+        bytes_to_advance(bytes_to_advance_in),
+        bytes_to_copy(bytes_to_copy_in) {}
+
+  const attribute_id source_attr_id;
+  const std::size_t bytes_to_advance;
+  const std::size_t bytes_to_copy;
+};
+
+class CopyList {
+ public:
+  CopyList(const std::vector<CopyGroup> &copy_groups) {
+    for (const auto &copy_group : copy_groups) {
+      stride_copy_functors_.emplace_back(
+          CreateStrideCopyFunctorHelper(copy_group.bytes_to_copy,
+                                        copy_group.bytes_to_advance,
+                                        copy_group.source_attr_id));
+    }
+  }
+
+  inline std::size_t bulkInsertTuples(ValueAccessor *accessor,
+                                      const std::size_t stride_width,
+                                      const std::size_t num_tuples,
+                                      void *storage) const {
+//    std::cerr << "Call CopyList::bulkInsertTuples, copiers = "
+//              << stride_copy_functors_.size() << "\n";
+    DCHECK(!stride_copy_functors_.empty());
+
+    accessor->punctuateVirtual();
+    auto func_it = stride_copy_functors_.begin();
+    const std::size_t num_tuples_inserted =
+        (*func_it)(accessor, storage, stride_width, num_tuples);
+
+    for (++func_it; func_it != stride_copy_functors_.end(); ++func_it) {
+      accessor->rewindVirtual();
+      const std::size_t other_num_tuples_inserted =
+          (*func_it)(accessor, storage, stride_width, num_tuples);
+
+      (void)other_num_tuples_inserted;
+      DCHECK_EQ(num_tuples_inserted, other_num_tuples_inserted);
+    }
+
+    return num_tuples_inserted;
+  }
+
+ private:
+  typedef std::function<std::size_t (ValueAccessor *, void *, std::size_t, std::size_t)> StrideCopyFunctor;
+  std::vector<StrideCopyFunctor> stride_copy_functors_;
+
+  template <typename ...ArgTypes>
+  static StrideCopyFunctor CreateStrideCopyFunctorHelper(
+      const std::size_t bytes_to_copy,
+      ArgTypes &&...args) {
+    switch (bytes_to_copy) {
+      case 4:
+        return CreateStrideCopyFunctor<4>(std::forward<ArgTypes>(args)...);
+      case 8:
+        return CreateStrideCopyFunctor<8>(std::forward<ArgTypes>(args)...);
+      case 12:
+        return CreateStrideCopyFunctor<12>(std::forward<ArgTypes>(args)...);
+      case 16:
+        return CreateStrideCopyFunctor<16>(std::forward<ArgTypes>(args)...);
+      case 20:
+        return CreateStrideCopyFunctor<20>(std::forward<ArgTypes>(args)...);
+      case 24:
+        return CreateStrideCopyFunctor<24>(std::forward<ArgTypes>(args)...);
+      case 28:
+        return CreateStrideCopyFunctor<28>(std::forward<ArgTypes>(args)...);
+      case 32:
+        return CreateStrideCopyFunctor<32>(std::forward<ArgTypes>(args)...);
+      default:
+        return CreateStrideCopyFunctor(bytes_to_copy, std::forward<ArgTypes>(args)...);
+    }
+  }
+
+  static StrideCopyFunctor CreateStrideCopyFunctor(
+      const std::size_t bytes_to_copy,
+      const std::size_t bytes_to_advance,
+      const attribute_id source_attr_id) {
+    return [source_attr_id, bytes_to_advance, bytes_to_copy](
+        ValueAccessor *accessor,
+        void *storage,
+        std::size_t stride_width,
+        std::size_t num_tuples) -> std::size_t {
+      return InvokeOnAnyValueAccessor(
+          accessor,
+          [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+        char *dst = static_cast<char *>(storage) + bytes_to_advance;
+        std::size_t pos = 0;
+        while (pos < num_tuples && accessor->next()) {
+          std::memcpy(dst,
+                      accessor->template getUntypedValue<false>(source_attr_id),
+                      bytes_to_copy);
+          dst += stride_width;
+          ++pos;
+        }
+        return pos;
+      });
+    };
+  }
+
+  template <std::size_t bytes_to_copy>
+  static StrideCopyFunctor CreateStrideCopyFunctor(
+      const std::size_t bytes_to_advance,
+      const attribute_id source_attr_id) {
+    return [source_attr_id, bytes_to_advance](
+        ValueAccessor *accessor,
+        void *storage,
+        std::size_t stride_width,
+        std::size_t num_tuples) -> std::size_t {
+      return InvokeOnAnyValueAccessor(
+          accessor,
+          [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+//        std::cerr << "HERE!\n";
+        char *dst = static_cast<char *>(storage) + bytes_to_advance;
+        std::size_t pos = 0;
+        while (pos < num_tuples && accessor->next()) {
+          std::memcpy(dst,
+                      accessor->template getUntypedValue<false>(source_attr_id),
+                      bytes_to_copy);
+          dst += stride_width;
+          ++pos;
+        }
+//        std::cerr << "num_tuples = " << num_tuples << "\n";
+//        std::cerr << "pos = " << pos << "\n";
+        return pos;
+      });
+    };
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(CopyList);
+};
+
+class InsertContext {
+ public:
+  explicit InsertContext(const CatalogRelationSchema &output_relation)
+      : output_relation_(output_relation) {}
+
+  void addSource(const std::map<attribute_id, attribute_id> &attribute_map) {
+    std::vector<CopyGroup> copy_groups;
+
+    if (!attribute_map.empty()) {
+      auto attr_map_it = attribute_map.begin();
+      attribute_id init_src_attr_id = attr_map_it->first;
+      attribute_id init_dst_attr_id = attr_map_it->second;
+      std::size_t accum_length =
+          output_relation_.getAttributeById(init_dst_attr_id)->getType().maximumByteLength();
+
+      attribute_id prev_src_attr_id = init_src_attr_id;
+      attribute_id prev_dst_attr_id = init_dst_attr_id;
+      while ((++attr_map_it) != attribute_map.end()) {
+        attribute_id curr_src_attr_id = attr_map_it->first;
+        attribute_id curr_dst_attr_id = attr_map_it->second;
+
+        const std::size_t attr_length =
+            output_relation_.getAttributeById(curr_dst_attr_id)->getType().maximumByteLength();
+
+        if (curr_src_attr_id == prev_src_attr_id + 1 &&
+            curr_dst_attr_id == prev_dst_attr_id + 1) {
+          accum_length += attr_length;
+        } else {
+          // Add a copy group
+          copy_groups.emplace_back(init_src_attr_id,
+                                   output_relation_.getFixedLengthAttributeOffset(init_dst_attr_id),
+                                   accum_length);
+
+          init_src_attr_id = curr_src_attr_id;
+          init_dst_attr_id = curr_dst_attr_id;
+          accum_length = attr_length;
+        }
+
+        prev_src_attr_id = curr_src_attr_id;
+        prev_dst_attr_id = curr_dst_attr_id;
+      }
+
+      // Add the last copy group
+      copy_groups.emplace_back(init_src_attr_id,
+                               output_relation_.getFixedLengthAttributeOffset(init_dst_attr_id),
+                               accum_length);
+
+      for (const auto &cg : copy_groups) {
+        std::cout << cg.source_attr_id << ": " << cg.bytes_to_copy << " @" << cg.bytes_to_advance << "\n";
+      }
+      non_empty_copy_indices_.emplace_back(copy_lists_.size());
+    }
+
+    copy_lists_.emplace_back(std::make_unique<CopyList>(copy_groups));
+  }
+
+  std::size_t bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                               const std::size_t stride_width,
+                               const std::size_t num_tuples,
+                               void *storage) {
+    DCHECK_EQ(copy_lists_.size(), accessors.size());
+    DCHECK(!non_empty_copy_indices_.empty());
+
+    auto idx_it = non_empty_copy_indices_.begin();
+    const std::size_t num_tuples_inserted =
+        copy_lists_[*idx_it]->bulkInsertTuples(
+            accessors[*idx_it], stride_width, num_tuples, storage);
+    iteration_finished_ = accessors[*idx_it]->iterationFinishedVirtual();
+
+    for (++idx_it; idx_it != non_empty_copy_indices_.end(); ++idx_it) {
+      const std::size_t other_num_tuples_inserted =
+          copy_lists_[*idx_it]->bulkInsertTuples(
+              accessors[*idx_it], stride_width, num_tuples, storage);
+
+      (void)other_num_tuples_inserted;
+      DCHECK_EQ(num_tuples_inserted, other_num_tuples_inserted);
+      DCHECK_EQ(iteration_finished_, accessors[*idx_it]->iterationFinishedVirtual());
+    }
+
+    return num_tuples_inserted;
+  }
+
+  void beginIteration() {
+    iteration_finished_ = false;
+  }
+
+  bool iterationFinished() const {
+    return iteration_finished_;
+  }
+
+ private:
+  const CatalogRelationSchema &output_relation_;
+  std::vector<std::unique_ptr<CopyList>> copy_lists_;
+  std::vector<std::size_t> non_empty_copy_indices_;
+
+  bool iteration_finished_;
+
+  DISALLOW_COPY_AND_ASSIGN(InsertContext);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_INSERT_CONTEXT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 5e83453..5c7d430 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -33,6 +33,7 @@
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/InsertContext.hpp"
 #include "storage/InsertDestination.pb.h"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -221,6 +222,31 @@ void InsertDestination::bulkInsertTuples(ValueAccessor *accessor, bool always_ma
   });
 }
 
+void InsertDestination::bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                                         InsertContext *insert_context,
+                                         MutableBlockReference *output_block) {
+  DCHECK_GE(accessors.size(), 1u);
+
+  insert_context->beginIteration();
+  while (!insert_context->iterationFinished()) {
+    // FIXME(chasseur): Deal with TupleTooLargeForBlock exception.
+    if (!output_block->valid()) {
+      *output_block = this->getBlockForInsertion();
+    }
+    if ((*output_block)->bulkInsertTuples(accessors, insert_context) == 0 ||
+        !insert_context->iterationFinished()) {
+      // output_block is full.
+      this->returnBlock(std::move(*output_block), true);
+    }
+  }
+}
+
+void InsertDestination::returnBlock(MutableBlockReference *output_block) {
+  if (output_block->valid()) {
+    this->returnBlock(std::move(*output_block), false);
+  }
+}
+
 void InsertDestination::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map,
     ValueAccessor *accessor,
@@ -312,6 +338,7 @@ void AlwaysCreateBlockInsertDestination::returnBlock(MutableBlockReference &&blo
   // Due to the nature of this InsertDestination, a block will always be
   // streamed no matter if it's full or not.
   sendBlockFilledMessage(block->getID());
+  block.release();
 }
 
 MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
@@ -389,6 +416,7 @@ void BlockPoolInsertDestination::returnBlock(MutableBlockReference &&block, cons
   }
   // Note that the block will only be sent if it's full (true).
   sendBlockFilledMessage(block->getID());
+  block.release();
 }
 
 const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInternal() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 408e76b..ca2ed57 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -52,6 +52,7 @@ namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
+class InsertContext;
 class StorageManager;
 class ValueAccessor;
 
@@ -75,7 +76,7 @@ class InsertDestination : public InsertDestinationInterface {
    * @brief Constructor.
    *
    * @param relation The relation to insert tuples into.
-   * @param layout The layout to use for any newly-created blocks. If NULL,
+   * @param layout The layout to use for any n`ewly-created blocks. If NULL,
    *        defaults to relation's default layout.
    * @param storage_manager The StorageManager to use.
    * @param relational_op_index The index of the relational operator in the
@@ -147,6 +148,12 @@ class InsertDestination : public InsertDestinationInterface {
 
   void bulkInsertTuples(ValueAccessor *accessor, bool always_mark_full = false) override;
 
+  void bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                        InsertContext *insert_context,
+                        MutableBlockReference *output_block);
+
+  void returnBlock(MutableBlockReference *block);
+
   void bulkInsertTuplesWithRemappedAttributes(
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index f955c99..43aa40d 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "catalog/CatalogRelationSchema.hpp"
+#include "storage/InsertContext.hpp"
 #include "storage/SplitRowStoreValueAccessor.hpp"
 #include "storage/StorageBlockLayout.pb.h"
 #include "storage/StorageErrors.hpp"
@@ -386,6 +387,27 @@ tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(ValueAccessor *acce
   return header_->num_tuples - original_num_tuples;
 }
 
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(
+    const std::vector<ValueAccessor *> &accessors,
+    InsertContext *insert_context) {
+  const std::size_t num_available_slots =
+      tuple_storage_bytes_ / tuple_slot_bytes_ - header_->num_tuples - 1;
+  void *tuple_slot_start =
+      static_cast<char*>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_;
+
+  const std::size_t num_tuples_inserted =
+      insert_context->bulkInsertTuples(accessors,
+                                       tuple_slot_bytes_,
+                                       num_available_slots,
+                                       tuple_slot_start);
+
+  occupancy_bitmap_->setBitRange(header_->num_tuples, num_tuples_inserted, true);
+  header_->num_tuples += num_tuples_inserted;
+  header_->max_tid = header_->num_tuples - 1;
+
+  return num_tuples_inserted;
+}
+
 tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map,
     ValueAccessor *accessor) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index a930103..84b036d 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -39,6 +39,7 @@
 namespace quickstep {
 
 class ComparisonPredicate;
+class InsertContext;
 class Tuple;
 class TupleStorageSubBlockDescription;
 class ValueAccessor;
@@ -151,6 +152,9 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
 
   tuple_id bulkInsertTuples(ValueAccessor *accessor) override;
 
+  tuple_id bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                            InsertContext *insert_context) override;
+
   tuple_id bulkInsertTuplesWithRemappedAttributes(
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor) override;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/SplitRowStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreValueAccessor.hpp b/storage/SplitRowStoreValueAccessor.hpp
index 951a20a..c475361 100644
--- a/storage/SplitRowStoreValueAccessor.hpp
+++ b/storage/SplitRowStoreValueAccessor.hpp
@@ -97,6 +97,14 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     return num_tuples_;
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -284,6 +292,14 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -318,6 +334,11 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -349,6 +370,7 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
   const std::size_t per_tuple_null_bitmap_bytes_;
 
   std::size_t current_position_;
+  std::size_t punctuated_position_;
 
   // Initialized from 'occupancy_bitmap_' on-demand.
   mutable std::unique_ptr<TupleIdSequence> tuple_id_sequence_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ea74ee6..9029cd7 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -267,6 +267,22 @@ tuple_id StorageBlock::bulkInsertTuples(ValueAccessor *accessor) {
   return num_inserted;
 }
 
+tuple_id StorageBlock::bulkInsertTuples(
+    const std::vector<ValueAccessor *> &accessors,
+    InsertContext *insert_context) {
+  const tuple_id num_inserted =
+      tuple_store_->bulkInsertTuples(accessors, insert_context);
+  if (num_inserted != 0) {
+    invalidateAllIndexes();
+    dirty_ = true;
+  } else if (tuple_store_->isEmpty()) {
+    if (!accessors.front()->iterationFinishedVirtual()) {
+      throw TupleTooLargeForBlock(0);
+    }
+  }
+  return num_inserted;
+}
+
 tuple_id StorageBlock::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map,
     ValueAccessor *accessor) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 56b3bdc..9f0acb9 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
 class ColumnVectorsValueAccessor;
+class InsertContext;
 class InsertDestinationInterface;
 class Predicate;
 class Scalar;
@@ -283,6 +284,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   tuple_id bulkInsertTuples(ValueAccessor *accessor);
 
+  tuple_id bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                            InsertContext *insert_context);
   /**
    * @brief Insert as many tuples as possible from a ValueAccessor (all of the
    *        tuples accessible or as many as will fit in this StorageBlock) as a

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/TupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/TupleStorageSubBlock.hpp b/storage/TupleStorageSubBlock.hpp
index aed6eea..65f990a 100644
--- a/storage/TupleStorageSubBlock.hpp
+++ b/storage/TupleStorageSubBlock.hpp
@@ -34,6 +34,7 @@ namespace quickstep {
 
 class CatalogRelationSchema;
 class ComparisonPredicate;
+class InsertContext;
 class Tuple;
 class TupleStorageSubBlockDescription;
 class ValueAccessor;
@@ -245,6 +246,11 @@ class TupleStorageSubBlock {
    **/
   virtual tuple_id bulkInsertTuples(ValueAccessor *accessor) = 0;
 
+  virtual tuple_id bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                                    InsertContext *insert_context) {
+    FATAL_ERROR("Not implemented");
+  }
+
   /**
    * @brief Insert as many tuples as possible from a ValueAccessor (all of the
    *        tuples accessible or as many as will fit in this

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/ValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/ValueAccessor.hpp b/storage/ValueAccessor.hpp
index e4a2906..9ce911e 100644
--- a/storage/ValueAccessor.hpp
+++ b/storage/ValueAccessor.hpp
@@ -184,6 +184,10 @@ class ValueAccessor {
    **/
   virtual tuple_id getNumTuplesVirtual() const = 0;
 
+  virtual void punctuateVirtual() = 0;
+
+  virtual void rewindVirtual() = 0;
+
   /**
    * @brief Returns whether this accessor has a fast strided ColumnAccessor available
    *        that can be used to optimize memory access in a tight loop iteration
@@ -305,6 +309,8 @@ class ValueAccessor {
   virtual ValueAccessor* createSharedTupleIdSequenceAdapterVirtual(
       const TupleIdSequence &id_sequence) = 0;
 
+  virtual ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) = 0;
   /**
    * @brief Get a TupleIdSequence indicating which positions this ValueAccessor
    *        is iterating over.
@@ -387,6 +393,14 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return id_sequence_.numTuples();
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -479,6 +493,14 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -513,6 +535,11 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -522,6 +549,7 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
   std::unique_ptr<InternalValueAccessorType> owned_accessor_;
   const TupleIdSequence &id_sequence_;
   TupleIdSequence::const_iterator current_position_;
+  TupleIdSequence::const_iterator punctuated_position_;
 
   DISALLOW_COPY_AND_ASSIGN(TupleIdSequenceAdapterValueAccessor);
 };
@@ -589,6 +617,14 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return id_sequence_.size();
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -685,6 +721,14 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -719,6 +763,11 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -728,6 +777,7 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
   std::unique_ptr<InternalValueAccessorType> owned_accessor_;
   const OrderedTupleIdSequence &id_sequence_;
   OrderedTupleIdSequence::size_type current_position_;
+  OrderedTupleIdSequence::size_type punctuated_position_;
 
   DISALLOW_COPY_AND_ASSIGN(OrderedTupleIdSequenceAdapterValueAccessor);
 };
@@ -785,6 +835,14 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return helper_.numPackedTuples();
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_tuple_;
+  }
+
+  inline void rewind() {
+    current_tuple_ = punctuated_position_;
+  }
+
   template <bool check_null = true>
   inline const void* getUntypedValue(const attribute_id attr_id) const {
     return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_tuple_);
@@ -911,6 +969,14 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -945,6 +1011,11 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -962,6 +1033,7 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
   const CatalogRelationSchema &relation_;
   HelperT helper_;
   tuple_id current_tuple_;
+  tuple_id punctuated_position_;
 
   friend TupleStorageSubBlockT;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index fe413a0..f8022a0 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -121,6 +121,14 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return column_length_;
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -252,6 +260,14 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -286,6 +302,11 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -304,6 +325,7 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
   std::vector<bool> column_native_;
   std::size_t column_length_;
   std::size_t current_position_;
+  std::size_t punctuated_position_;
   ScopedDeleter deleter_;
 
   DISALLOW_COPY_AND_ASSIGN(ColumnVectorsValueAccessor);