You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/06 02:08:05 UTC

[1/3] incubator-quickstep git commit: Improve partial bulk insert.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/output-attr-order [created] c7fdc360e


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
index 9270d93..2943343 100644
--- a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
+++ b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
@@ -22,7 +22,6 @@
 #include <cstdio>
 #include <cstring>
 #include <memory>
-#include <string>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -62,11 +61,6 @@ using std::snprintf;
 
 namespace quickstep {
 
-using splitrow_internal::CopyGroupList;
-using splitrow_internal::ContiguousAttrs;
-using splitrow_internal::NullableAttr;
-using splitrow_internal::VarLenAttr;
-
 namespace {
 
 // Used to set up a value-parameterized test with certain features for
@@ -82,11 +76,9 @@ enum class AttributeTypeFeatures {
 
 class SplitRowStoreTupleStorageSubBlockTest
     : public ::testing::TestWithParam<AttributeTypeFeatures> {
- public:
+ protected:
   static const std::size_t kSubBlockSize = 0x100000;  // 1 MB
-  static const std::size_t kVarLenSize = 26;
 
- protected:
   virtual void SetUp() {
     // Create a sample relation with a variety of attribute types.
     relation_.reset(new CatalogRelation(nullptr, "TestRelation"));
@@ -110,7 +102,7 @@ class SplitRowStoreTupleStorageSubBlockTest
         relation_.get(),
         "string_attr",
         TypeFactory::GetType(testVariableLength() ? kVarChar : kChar,
-                             kVarLenSize,
+                             26,
                              testNullable()));
     ASSERT_EQ(2, relation_->addAttribute(current_attr));
 
@@ -155,14 +147,6 @@ class SplitRowStoreTupleStorageSubBlockTest
     return tuple_store_->tuple_storage_bytes_;
   }
 
-  std::size_t getTupleInsertLowerBound() const {
-    return tuple_store_->getInsertLowerBound();
-  }
-
-  std::size_t getInsertLowerBoundThreshold() const {
-    return tuple_store_->getInsertLowerBoundThreshold();
-  }
-
   Tuple createSampleTuple(const int base_value) const {
     std::vector<TypedValue> attribute_values;
 
@@ -190,10 +174,10 @@ class SplitRowStoreTupleStorageSubBlockTest
       char string_buffer[13];
       int written = snprintf(string_buffer, sizeof(string_buffer), "%d", base_value);
       if (testVariableLength()) {
-        attribute_values.emplace_back((VarCharType::InstanceNonNullable(kVarLenSize).makeValue(string_buffer,
+        attribute_values.emplace_back((VarCharType::InstanceNonNullable(26).makeValue(string_buffer,
                                                                                       written + 1)));
       } else {
-        attribute_values.emplace_back((CharType::InstanceNonNullable(kVarLenSize).makeValue(string_buffer,
+        attribute_values.emplace_back((CharType::InstanceNonNullable(26).makeValue(string_buffer,
                                                                                    written + 1)));
       }
       attribute_values.back().ensureNotReference();
@@ -213,11 +197,6 @@ class SplitRowStoreTupleStorageSubBlockTest
     tuple_store_->rebuild();
   }
 
-  void getCopyGroupsForAttributeMap(const std::vector<attribute_id> &attribute_map,
-                                    CopyGroupList *copy_groups) {
-    tuple_store_->getCopyGroupsForAttributeMap(attribute_map, copy_groups);
-  }
-
   void checkTupleValuesUntyped(const tuple_id tid,
                                const int base_value) {
     ASSERT_TRUE(tuple_store_->hasTupleWithID(tid));
@@ -290,135 +269,6 @@ class SplitRowStoreTupleStorageSubBlockTest
 };
 typedef SplitRowStoreTupleStorageSubBlockTest SplitRowStoreTupleStorageSubBlockDeathTest;
 
-class SplitRowWrapper {
- public:
-  enum AttrType {
-    kInt = 0,
-    kDouble,
-    kString,
-    kNumAttrTypes
-  };
-
-  /**
-   * Builds a catalog relation given a list of attributes.
-   *
-   * @param attribute_ordering The ordering of the attributes in the represented relation. Attribute #1 is an
-   *                integer attribute, #2 is a double, and #3 is a string.
-   * @param contains_nullable If the relation contains nullable attributes.
-   * @param contains_varlen If the relation contains variable length attributes.
-   * @return A caller-owned catalog relation.
-   */
-  static CatalogRelation *
-  GetRelationFromAttributeList(const std::vector<attribute_id> &attribute_ordering, bool contains_nullable,
-                               bool contains_varlen) {
-    // Create a unique name.
-    std::string rel_name("TempRelation");
-    for (auto attr_itr = attribute_ordering.begin();
-         attr_itr != attribute_ordering.end();
-         ++attr_itr) {
-      rel_name += "_" + std::to_string(*attr_itr);
-    }
-    CatalogRelation *relation = new CatalogRelation(nullptr, rel_name.c_str());
-
-    std::vector<int> attr_counts(AttrType::kNumAttrTypes);
-    std::string attr_name;
-    for (auto attr_itr = attribute_ordering.begin();
-         attr_itr != attribute_ordering.end();
-         ++attr_itr) {
-      switch (*attr_itr) {
-        case AttrType::kInt:
-          // An integer.
-          attr_name = "int_attr_" + std::to_string(attr_counts[AttrType::kInt]);
-          relation->addAttribute(new CatalogAttribute(
-            relation,
-            attr_name.c_str(),
-            TypeFactory::GetType(TypeID::kInt, contains_nullable)));
-          attr_counts[AttrType::kInt]++;
-          break;
-        case AttrType::kDouble:
-          // A double.
-          attr_name = "double_attr_" + std::to_string(attr_counts[AttrType::kDouble]);
-          relation->addAttribute(new CatalogAttribute(
-            relation,
-            attr_name.c_str(),
-            TypeFactory::GetType(TypeID::kDouble, contains_nullable)));
-          attr_counts[AttrType::kDouble]++;
-          break;
-        case AttrType::kString:
-          // A (possibly variable-length) string.
-          attr_name = "string_attr_" + std::to_string(attr_counts[AttrType::kString]);
-          relation->addAttribute(new CatalogAttribute(
-            relation,
-            attr_name.c_str(),
-            TypeFactory::GetType(contains_varlen ? TypeID::kVarChar : TypeID::kChar,
-                                 SplitRowStoreTupleStorageSubBlockTest::kVarLenSize,
-                                 contains_nullable)));
-          attr_counts[AttrType::kString]++;
-          break;
-        default:
-          LOG(FATAL) << "Unknown type was specified in SplitRowWrapper.";
-          break;
-      }
-    }
-    return relation;
-  }
-
-  /**
-   * A wrapper for an empty SplitRowstore.
-   *
-   * @param attribute_ordering The ordering of the attributes in the represented relation. Attribute #1 is an
-   *                integer attribute, #2 is a double, and #3 is a string.
-   * @param contains_nullable If the relation contains nullable attributes.
-   * @param contains_varlen If the relation contains variable length attributes.
-   */
-  SplitRowWrapper(const std::vector<attribute_id> &attribute_ordering, bool contains_nullable, bool contains_varlen)
-    : contains_nullable_(contains_nullable),
-      contains_varlen_(contains_varlen) {
-    initialize(attribute_ordering);
-  }
-
-  SplitRowWrapper(bool contains_nullable, bool contains_varlen)
-    : contains_nullable_(contains_nullable),
-      contains_varlen_(contains_varlen) {
-    // Make a clone of the Test Block type using the 3 basic attributes.
-    std::vector<attribute_id> attrs;
-    for (attribute_id attr = 0; attr < 3; ++attr) {
-      attrs.push_back(attr);
-    }
-    initialize(attrs);
-  }
-
-  SplitRowStoreTupleStorageSubBlock *operator->() {
-    return tuple_store_.get();
-  }
-
-  const bool contains_nullable_;
-  const bool contains_varlen_;
-
-  std::unique_ptr<CatalogRelation> relation_;
-  std::unique_ptr<TupleStorageSubBlockDescription> tuple_store_description_;
-  ScopedBuffer tuple_store_memory_;
-  std::unique_ptr<SplitRowStoreTupleStorageSubBlock> tuple_store_;
-
- private:
-  void initialize(const std::vector<attribute_id> &attribute_ordering) {
-    // Create a sample relation with a variety of attribute types.
-    relation_.reset(GetRelationFromAttributeList(attribute_ordering, contains_nullable_, contains_varlen_));
-
-    tuple_store_description_.reset(new TupleStorageSubBlockDescription());
-    tuple_store_description_->set_sub_block_type(TupleStorageSubBlockDescription::SPLIT_ROW_STORE);
-
-    // Initialize the actual block.
-    tuple_store_memory_.reset(SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize);
-    std::memset(tuple_store_memory_.get(), 0x0, SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize);
-    tuple_store_.reset(new SplitRowStoreTupleStorageSubBlock(*relation_,
-                                                             *tuple_store_description_,
-                                                             true,
-                                                             tuple_store_memory_.get(),
-                                                             SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize));
-  }
-};
-
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, DescriptionIsValidTest) {
   // The descriptions we use for the other tests (which includes nullable and
   // variable-length attributes) should be valid.
@@ -608,37 +458,37 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
   const std::size_t max_tuple_capacity = getTupleStorageSize() / getTupleSlotSize();
 
   NativeColumnVector *int_vector = new NativeColumnVector(
-    relation_->getAttributeById(0)->getType(),
-    max_tuple_capacity);
+      relation_->getAttributeById(0)->getType(),
+      max_tuple_capacity);
   NativeColumnVector *double_vector = new NativeColumnVector(
-    relation_->getAttributeById(1)->getType(),
-    max_tuple_capacity);
+      relation_->getAttributeById(1)->getType(),
+      max_tuple_capacity);
   ColumnVector *string_vector = testVariableLength() ?
-                                static_cast<ColumnVector*>(new IndirectColumnVector(
-                                  relation_->getAttributeById(2)->getType(),
-                                  max_tuple_capacity))
-                                                     : static_cast<ColumnVector*>(new NativeColumnVector(
-      relation_->getAttributeById(2)->getType(),
-      max_tuple_capacity));
+      static_cast<ColumnVector*>(new IndirectColumnVector(
+          relation_->getAttributeById(2)->getType(),
+          max_tuple_capacity))
+      : static_cast<ColumnVector*>(new NativeColumnVector(
+          relation_->getAttributeById(2)->getType(),
+          max_tuple_capacity));
 
   std::size_t storage_used = 0;
   int current_tuple_idx = 0;
   for (;;) {
     Tuple current_tuple(createSampleTuple(current_tuple_idx));
     const std::size_t current_tuple_storage_bytes
-      = getTupleSlotSize()
-        + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
-                                   0 : current_tuple.getAttributeValue(2).getDataSize())
-                                : 0);
+        = getTupleSlotSize()
+          + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
+                                     0 : current_tuple.getAttributeValue(2).getDataSize())
+                                  : 0);
     if (storage_used + current_tuple_storage_bytes <= getTupleStorageSize()) {
       int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
       double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
       if (testVariableLength()) {
         static_cast<IndirectColumnVector*>(string_vector)
-          ->appendTypedValue(current_tuple.getAttributeValue(2));
+            ->appendTypedValue(current_tuple.getAttributeValue(2));
       } else {
         static_cast<NativeColumnVector*>(string_vector)
-          ->appendTypedValue(current_tuple.getAttributeValue(2));
+            ->appendTypedValue(current_tuple.getAttributeValue(2));
       }
 
       storage_used += current_tuple_storage_bytes;
@@ -655,90 +505,18 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
 
   // Actually do the bulk-insert.
   accessor.beginIteration();
-  tuple_id num_inserted = tuple_store_->bulkInsertTuples(&accessor);
-  if (!testVariableLength()) {
-    EXPECT_EQ(current_tuple_idx, num_inserted);
-    ASSERT_TRUE(accessor.iterationFinished());
-    // Shouldn't be able to insert any more tuples.
-    accessor.beginIteration();
-    tuple_id num_inserted_second_round = tuple_store_->bulkInsertTuples(&accessor);
-    ASSERT_EQ(0, num_inserted_second_round);
-  }
-
-  tuple_store_->rebuild();
-  EXPECT_EQ(num_inserted, tuple_store_->numTuples());
-  EXPECT_EQ(num_inserted - 1, tuple_store_->getMaxTupleID());
-
-  // Check the inserted values.
-  ASSERT_TRUE(tuple_store_->isPacked());
-  for (tuple_id tid = 0;
-       tid <= tuple_store_->getMaxTupleID();
-       ++tid) {
-    checkTupleValuesUntyped(tid, tid);
-  }
-}
+  EXPECT_EQ(current_tuple_idx, tuple_store_->bulkInsertTuples(&accessor));
+  EXPECT_TRUE(accessor.iterationFinished());
 
-TEST_P(SplitRowStoreTupleStorageSubBlockTest, PartialBulkInsertTest) {
-  // Build up a ColumnVectorsValueAccessor to bulk-insert from. We'll reserve
-  // enough space for the maximum possible number of tuples in the block, even
-  // though we won't use all of it if testVariableLength() is true.
-  const std::size_t max_tuple_capacity = getTupleStorageSize() / getTupleSlotSize();
-
-  NativeColumnVector *int_vector = new NativeColumnVector(
-    relation_->getAttributeById(0)->getType(),
-    max_tuple_capacity);
-  NativeColumnVector *double_vector = new NativeColumnVector(
-    relation_->getAttributeById(1)->getType(),
-    max_tuple_capacity);
-  ColumnVector *string_vector = testVariableLength() ?
-                                static_cast<ColumnVector *>(new IndirectColumnVector(
-                                  relation_->getAttributeById(2)->getType(),
-                                  max_tuple_capacity))
-                                                     : static_cast<ColumnVector *>(new NativeColumnVector(
-      relation_->getAttributeById(2)->getType(),
-      max_tuple_capacity));
-
-  const int max_tuples_insert = 1000;
-  for (int tuple_idx = 0; tuple_idx < max_tuples_insert; ++tuple_idx) {
-    Tuple current_tuple(createSampleTuple(tuple_idx));
-    int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
-    double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
-    if (testVariableLength()) {
-      static_cast<IndirectColumnVector *>(string_vector)
-        ->appendTypedValue(current_tuple.getAttributeValue(2));
-    } else {
-      static_cast<NativeColumnVector *>(string_vector)
-        ->appendTypedValue(current_tuple.getAttributeValue(2));
-    }
-  }
-
-  std::vector<attribute_id> attr_map_pt1 = {kInvalidCatalogId, 0, kInvalidCatalogId};
-  std::vector<attribute_id> attr_map_pt2 = {0, kInvalidCatalogId, 1};
-
-  ColumnVectorsValueAccessor accessor_pt1;
-  accessor_pt1.addColumn(double_vector);
-
-  ColumnVectorsValueAccessor accessor_pt2;
-  accessor_pt2.addColumn(int_vector);
-  accessor_pt2.addColumn(string_vector);
-
-
-  // Actually do the bulk-insert.
-  accessor_pt1.beginIteration();
-  const tuple_id num_inserted_pt1 = tuple_store_->bulkInsertPartialTuples(attr_map_pt1, &accessor_pt1, kCatalogMaxID);
-  ASSERT_GT(num_inserted_pt1, 0);
-  const tuple_id num_inserted_pt2 = tuple_store_->bulkInsertPartialTuples(attr_map_pt2, &accessor_pt2,
-                                                                          num_inserted_pt1);
-  ASSERT_EQ(num_inserted_pt1, num_inserted_pt2);
-
-  tuple_store_->bulkInsertPartialTuplesFinalize(num_inserted_pt1);
-  ASSERT_EQ(max_tuples_insert, tuple_store_->getMaxTupleID() + 1);
-  ASSERT_EQ(num_inserted_pt1, tuple_store_->getMaxTupleID() + 1);
-  EXPECT_TRUE(accessor_pt2.iterationFinished());
+  // Shouldn't be able to insert any more tuples.
+  accessor.beginIteration();
+  EXPECT_EQ(0, tuple_store_->bulkInsertTuples(&accessor));
 
   tuple_store_->rebuild();
+  EXPECT_EQ(current_tuple_idx, tuple_store_->numTuples());
+  EXPECT_EQ(current_tuple_idx - 1, tuple_store_->getMaxTupleID());
 
-  // Should be the same order as if we inserted them serially.
+  // Check the inserted values.
   ASSERT_TRUE(tuple_store_->isPacked());
   for (tuple_id tid = 0;
        tid <= tuple_store_->getMaxTupleID();
@@ -747,77 +525,6 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, PartialBulkInsertTest) {
   }
 }
 
-TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest) {
-  const bool nullable_attrs = testNullable();
-  std::vector<attribute_id> relation_attrs = {
-    SplitRowWrapper::AttrType::kInt,
-    SplitRowWrapper::AttrType::kInt,
-    SplitRowWrapper::AttrType::kInt,
-    SplitRowWrapper::AttrType::kString,
-    SplitRowWrapper::AttrType::kString,
-    SplitRowWrapper::AttrType::kString};
-  SplitRowWrapper dst_store(relation_attrs, nullable_attrs, testVariableLength());
-  std::vector<attribute_id> attr_map = { kInvalidCatalogId, 0, 1, kInvalidCatalogId, 2, 1 };
-  CopyGroupList copy_groups;
-  dst_store->getCopyGroupsForAttributeMap(attr_map, &copy_groups);
-
-  std::vector<ContiguousAttrs>& contiguous_attrs = copy_groups.contiguous_attrs_;
-  std::vector<VarLenAttr>& varlen_attrs = copy_groups.varlen_attrs_;
-
-  const std::size_t size_of_string = dst_store->getRelation().getAttributeById(3)->getType().maximumByteLength();
-
-  // Fixed length attributes.
-  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
-  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
-  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_copy_);
-
-  EXPECT_EQ(1, contiguous_attrs[1].src_attr_id_);
-  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_advance_);
-  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_copy_);
-
-  if (testVariableLength()) {
-    ASSERT_EQ(2, contiguous_attrs.size());
-    ASSERT_EQ(2, varlen_attrs.size());
-
-    EXPECT_EQ(2, varlen_attrs[0].src_attr_id_);
-    EXPECT_EQ(sizeof(int) + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[0].bytes_to_advance_);
-
-    EXPECT_EQ(1, varlen_attrs[1].src_attr_id_);
-    EXPECT_EQ(SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[1].bytes_to_advance_);
-
-  } else {
-    ASSERT_EQ(4, copy_groups.contiguous_attrs_.size());
-    ASSERT_EQ(0, copy_groups.varlen_attrs_.size());
-
-    EXPECT_EQ(2, contiguous_attrs[2].src_attr_id_);
-    EXPECT_EQ(4 + size_of_string, contiguous_attrs[2].bytes_to_advance_);
-    EXPECT_EQ(size_of_string, contiguous_attrs[2].bytes_to_copy_);
-  }
-
-  int null_count =  copy_groups.nullable_attrs_.size();
-  if (testNullable()) {
-    // The relation contains 6 nullable attributes, but only 3 are inserted.
-    EXPECT_EQ(4, null_count);
-  } else {
-    EXPECT_EQ(0, null_count);
-  }
-
-  // test that merging works.
-  copy_groups.merge_contiguous();
-  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
-  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
-
-  if (testVariableLength()) {
-    EXPECT_EQ(1, contiguous_attrs.size());
-    EXPECT_EQ(sizeof(int) * 2 + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize,
-              varlen_attrs[0].bytes_to_advance_);
-  } else {
-    EXPECT_EQ(3, contiguous_attrs.size());
-    EXPECT_EQ(8, contiguous_attrs[0].bytes_to_copy_);
-    EXPECT_EQ(8 + size_of_string, contiguous_attrs[1].bytes_to_advance_);
-  }
-}
-
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTest) {
   // This is similar to the above test, but we will reverse the order of the
   // ColumnVectors in the ColumnVectorsValueAccessor and remap them back to the
@@ -844,26 +551,25 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTe
 
   std::size_t storage_used = 0;
   int current_tuple_idx = 0;
-  std::size_t tuple_max_size = relation_->getMaximumByteLength();
-  std::size_t tuple_slot_size = getTupleSlotSize();
   for (;;) {
     Tuple current_tuple(createSampleTuple(current_tuple_idx));
-    if ((getTupleStorageSize() - storage_used) / tuple_max_size > 0) {
+    const std::size_t current_tuple_storage_bytes
+        = getTupleSlotSize()
+          + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
+                                     0 : current_tuple.getAttributeValue(2).getDataSize())
+                                  : 0);
+    if (storage_used + current_tuple_storage_bytes <= getTupleStorageSize()) {
       int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
       double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
       if (testVariableLength()) {
         static_cast<IndirectColumnVector*>(string_vector)
-          ->appendTypedValue(current_tuple.getAttributeValue(2));
+            ->appendTypedValue(current_tuple.getAttributeValue(2));
       } else {
         static_cast<NativeColumnVector*>(string_vector)
-          ->appendTypedValue(current_tuple.getAttributeValue(2));
-      }
-
-      storage_used += tuple_slot_size;
-      if (testVariableLength() && !current_tuple.getAttributeValue(2).isNull()) {
-        storage_used += current_tuple.getAttributeValue(2).getDataSize();
+            ->appendTypedValue(current_tuple.getAttributeValue(2));
       }
 
+      storage_used += current_tuple_storage_bytes;
       ++current_tuple_idx;
     } else {
       break;
@@ -882,19 +588,18 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTe
 
   // Actually do the bulk-insert.
   accessor.beginIteration();
-  tuple_id num_inserted = tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor);
-  if (!testVariableLength()) {
-    EXPECT_EQ(current_tuple_idx, num_inserted);
-    ASSERT_TRUE(accessor.iterationFinished());
-    // Shouldn't be able to insert any more tuples.
-    accessor.beginIteration();
-    tuple_id num_inserted_second_round = tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor);
-    ASSERT_EQ(0, num_inserted_second_round);
-  }
+  EXPECT_EQ(current_tuple_idx,
+            tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor));
+  EXPECT_TRUE(accessor.iterationFinished());
+
+  // Shouldn't be able to insert any more tuples.
+  accessor.beginIteration();
+  EXPECT_EQ(0,
+            tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor));
 
   tuple_store_->rebuild();
-  EXPECT_EQ(num_inserted, tuple_store_->numTuples());
-  EXPECT_EQ(num_inserted - 1, tuple_store_->getMaxTupleID());
+  EXPECT_EQ(current_tuple_idx, tuple_store_->numTuples());
+  EXPECT_EQ(current_tuple_idx - 1, tuple_store_->getMaxTupleID());
 
   // Check the inserted values.
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -927,53 +632,6 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetAttributeValueTypedTest) {
   }
 }
 
-TEST_P(SplitRowStoreTupleStorageSubBlockTest, SplitRowToSplitRowTest) {
-  // Test insertion of data from a SplitRow to a SplitRow with no reordering.
-  fillBlockWithSampleData();
-  std::vector<attribute_id> relation_attrs = {
-    SplitRowWrapper::AttrType::kInt,
-    SplitRowWrapper::AttrType::kDouble,
-    SplitRowWrapper::AttrType::kDouble,
-    SplitRowWrapper::AttrType::kInt,
-    SplitRowWrapper::AttrType::kString,
-    SplitRowWrapper::AttrType::kString,
-    SplitRowWrapper::AttrType::kString};
-  SplitRowWrapper dst_store(relation_attrs, testNullable(), testVariableLength());
-
-  std::vector<attribute_id> attribute_map = {0, kInvalidCatalogId, 1, 0, 2, kInvalidCatalogId, 2};
-
-  std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor());
-  ASSERT_EQ(ValueAccessor::Implementation::kSplitRowStore,
-            accessor->getImplementationType());
-  ASSERT_FALSE(accessor->isTupleIdSequenceAdapter());
-
-  SplitRowStoreValueAccessor &cast_accessor = static_cast<SplitRowStoreValueAccessor &>(*accessor);
-  std::size_t num_inserted = dst_store->bulkInsertPartialTuples(attribute_map, &cast_accessor, kCatalogMaxID);
-  attribute_map = {kInvalidCatalogId, 1, kInvalidCatalogId, kInvalidCatalogId, kInvalidCatalogId, 2, kInvalidCatalogId};
-  cast_accessor.beginIteration();
-  dst_store->bulkInsertPartialTuples(attribute_map, &cast_accessor, num_inserted);
-  dst_store->bulkInsertPartialTuplesFinalize(num_inserted);
-
-  EXPECT_EQ(num_inserted - 1, dst_store->getMaxTupleID());
-  // The inserted relation should hold roughly 1/3 the tuples of the src. The more varlen
-  // attributes, the fewer the relation will accept due to how it estimates.
-  EXPECT_LT(0.15 * tuple_store_->getMaxTupleID(), dst_store->getMaxTupleID());
-  EXPECT_GT(0.5 * tuple_store_->getMaxTupleID(), dst_store->getMaxTupleID());
-
-  attribute_map = {0, 1, 4};
-  for (tuple_id tid = 0; tid < dst_store->getMaxTupleID(); ++tid) {
-    for (attribute_id aid = 0; aid < tuple_store_->getRelation().getMaxAttributeId(); ++aid) {
-      const TypedValue &dst_value = dst_store->getAttributeValueTyped(tid, attribute_map[aid]);
-      const TypedValue &src_value = tuple_store_->getAttributeValueTyped(tid, aid);
-      if (src_value.isNull() || dst_value.isNull()) {
-        EXPECT_TRUE(src_value.isNull() && dst_value.isNull());
-      } else {
-        EXPECT_TRUE(src_value.fastEqualCheck(dst_value));
-      }
-    }
-  }
-}
-
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, ValueAccessorTest) {
   fillBlockWithSampleData();
 
@@ -1063,7 +721,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, SetAttributeValueTypedTest) {
     // It's also OK to replace a variable-length value with a shorter value, or
     // with null.
     std::unordered_map<attribute_id, TypedValue> variable_new_values;
-    variable_new_values.emplace(2, VarCharType::InstanceNonNullable(kVarLenSize).makeValue("x", 2));
+    variable_new_values.emplace(2, VarCharType::InstanceNonNullable(26).makeValue("x", 2));
     ASSERT_TRUE(tuple_store_->canSetAttributeValuesInPlaceTyped(33, variable_new_values));
     tuple_store_->setAttributeValueInPlaceTyped(33, 2, variable_new_values[2]);
     EXPECT_STREQ("x", static_cast<const char*>(tuple_store_->getAttributeValue(33, 2)));
@@ -1089,14 +747,13 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, SetAttributeValueTypedTest) {
     EXPECT_TRUE(tuple_store_->insertTupleInBatch(createSampleTuple(0)));
     tuple_store_->rebuild();
 
-    variable_new_values[2] = VarCharType::InstanceNonNullable(kVarLenSize).makeValue("hello world", 12);
+    variable_new_values[2] = VarCharType::InstanceNonNullable(26).makeValue("hello world", 12);
     ASSERT_TRUE(tuple_store_->canSetAttributeValuesInPlaceTyped(0, variable_new_values));
     tuple_store_->setAttributeValueInPlaceTyped(0, 2, variable_new_values[2]);
     EXPECT_STREQ("hello world", static_cast<const char*>(tuple_store_->getAttributeValue(0, 2)));
   }
 }
 
-
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
   fillBlockWithSampleData();
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -1149,7 +806,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
       reinsert_attr_values.emplace_back(testVariableLength() ? kVarChar : kChar);
     } else {
       reinsert_attr_values.emplace_back(
-          CharType::InstanceNonNullable(kVarLenSize).makeValue("foo", 4));
+          CharType::InstanceNonNullable(26).makeValue("foo", 4));
       reinsert_attr_values.back().ensureNotReference();
     }
     Tuple reinsert_tuple(std::move(reinsert_attr_values));
@@ -1174,7 +831,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
     std::vector<TypedValue> extra_variable_attr_values;
     extra_variable_attr_values.emplace_back(-123);
     extra_variable_attr_values.emplace_back(static_cast<double>(-100.5));
-    extra_variable_attr_values.emplace_back((VarCharType::InstanceNonNullable(kVarLenSize).makeValue(
+    extra_variable_attr_values.emplace_back((VarCharType::InstanceNonNullable(26).makeValue(
         kExtraVarCharValue,
         27)));
     extra_variable_tuple = Tuple(std::move(extra_variable_attr_values));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index fbbdc1b..fe413a0 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -139,10 +139,6 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return nullptr;
   }
 
-  inline std::size_t getNumColumns() const {
-    return columns_.size();
-  }
-
   template <bool check_null = true>
   inline const void* getUntypedValue(const attribute_id attr_id) const {
     return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_position_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/utility/BitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BitVector.hpp b/utility/BitVector.hpp
index c404b7e..c1e6f70 100644
--- a/utility/BitVector.hpp
+++ b/utility/BitVector.hpp
@@ -183,20 +183,6 @@ class BitVector {
   }
 
   /**
-   * @brief Assign this BitVector's contents to the pointed-to memory.
-   *
-   * @warning caller is responsible for ensuring the Bitvector has the correct
-   *          ownership and size.
-   *
-   * @param ptr Pointer to data representing a BitVector with the same parameters
-   *            as this BitVector.
-   **/
-  inline void setMemory(void *ptr) {
-    DCHECK(!owned_);
-    this->data_array_ = static_cast<std::size_t*>(ptr);
-  }
-
-  /**
    * @brief Similar to assignFrom(), but the other BitVector to assign from is
    *        allowed to be longer than this one.
    * @warning Only available when enable_short_version is false.


[2/3] incubator-quickstep git commit: Improve partial bulk insert.

Posted by ji...@apache.org.
Improve partial bulk insert.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/31c80934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/31c80934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/31c80934

Branch: refs/heads/output-attr-order
Commit: 31c809343259bc3097c2e86a7860091cea7e6050
Parents: 9fcb0ac
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Dec 22 13:10:07 2016 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Dec 22 13:11:06 2016 -0600

----------------------------------------------------------------------
 relational_operators/HashJoinOperator.cpp       | 150 +---
 storage/InsertDestination.cpp                   |  84 ---
 storage/InsertDestination.hpp                   |  16 -
 storage/InsertDestinationInterface.hpp          |  22 -
 storage/SplitRowStoreTupleStorageSubBlock.cpp   | 692 ++++++++++---------
 storage/SplitRowStoreTupleStorageSubBlock.hpp   | 186 -----
 storage/StorageBlock.cpp                        |  24 -
 storage/StorageBlock.hpp                        |  44 --
 storage/TupleStorageSubBlock.hpp                |  50 --
 ...litRowStoreTupleStorageSubBlock_unittest.cpp | 445 ++----------
 types/containers/ColumnVectorsValueAccessor.hpp |   4 -
 utility/BitVector.hpp                           |  14 -
 12 files changed, 457 insertions(+), 1274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 2028046..4a91f86 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -65,11 +65,10 @@ namespace {
 
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
 // tuples from the inner relation. It stores matching tuple ID pairs
-// in an unordered_map keyed by inner block ID and a vector of
-// pairs of (build-tupleID, probe-tuple-ID).
-class VectorsOfPairsJoinedTuplesCollector {
+// in an unordered_map keyed by inner block ID.
+class MapBasedJoinedTupleCollector {
  public:
-  VectorsOfPairsJoinedTuplesCollector() {
+  MapBasedJoinedTupleCollector() {
   }
 
   template <typename ValueAccessorT>
@@ -96,34 +95,6 @@ class VectorsOfPairsJoinedTuplesCollector {
   std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
 };
 
-// Another collector using an unordered_map keyed on inner block just like above,
-// except that it uses of a pair of (build-tupleIDs-vector, probe-tuple-IDs-vector).
-class PairsOfVectorsJoinedTuplesCollector {
- public:
-  PairsOfVectorsJoinedTuplesCollector() {
-  }
-
-  template <typename ValueAccessorT>
-  inline void operator()(const ValueAccessorT &accessor,
-                         const TupleReference &tref) {
-    joined_tuples_[tref.block].first.push_back(tref.tuple);
-    joined_tuples_[tref.block].second.push_back(accessor.getCurrentPosition());
-  }
-
-  // Get a mutable pointer to the collected map of joined tuple ID pairs. The
-  // key is inner block_id, value is a pair consisting of
-  // inner block tuple IDs (first) and outer block tuple IDs (second).
-  inline std::unordered_map< block_id, std::pair<std::vector<tuple_id>, std::vector<tuple_id>>>*
-      getJoinedTuples() {
-    return &joined_tuples_;
-  }
-
- private:
-  std::unordered_map<
-    block_id,
-    std::pair<std::vector<tuple_id>, std::vector<tuple_id>>> joined_tuples_;
-};
-
 class SemiAntiJoinTupleCollector {
  public:
   explicit SemiAntiJoinTupleCollector(TupleIdSequence *filter)
@@ -461,7 +432,7 @@ void HashInnerJoinWorkOrder::execute() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  PairsOfVectorsJoinedTuplesCollector collector;
+  MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -479,14 +450,12 @@ void HashInnerJoinWorkOrder::execute() {
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
 
-  for (std::pair<const block_id, std::pair<std::vector<tuple_id>, std::vector<tuple_id>>>
+  for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
            &build_block_entry : *collector.getJoinedTuples()) {
     BlockReference build_block =
         storage_manager_->getBlock(build_block_entry.first, build_relation_);
     const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
     std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
-    const std::vector<tuple_id> &build_tids = build_block_entry.second.first;
-    const std::vector<tuple_id> &probe_tids = build_block_entry.second.second;
 
     // Evaluate '*residual_predicate_', if any.
     //
@@ -499,16 +468,17 @@ void HashInnerJoinWorkOrder::execute() {
     // hash join is below a reasonable threshold so that we don't blow up
     // temporary memory requirements to an unreasonable degree.
     if (residual_predicate_ != nullptr) {
-      std::pair<std::vector<tuple_id>, std::vector<tuple_id>> filtered_matches;
-      for (std::size_t i = 0; i < build_tids.size(); ++i) {
+      std::vector<std::pair<tuple_id, tuple_id>> filtered_matches;
+
+      for (const std::pair<tuple_id, tuple_id> &hash_match
+           : build_block_entry.second) {
         if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
                                                         build_relation_id,
-                                                        build_tids[i],
+                                                        hash_match.first,
                                                         *probe_accessor,
                                                         probe_relation_id,
-                                                        probe_tids[i])) {
-          filtered_matches.first.push_back(build_tids[i]);
-          filtered_matches.second.push_back(probe_tids[i]);
+                                                        hash_match.second)) {
+          filtered_matches.emplace_back(hash_match);
         }
       }
 
@@ -531,96 +501,22 @@ void HashInnerJoinWorkOrder::execute() {
     // benefit (probably only a real performance win when there are very few
     // matching tuples in each individual inner block but very many inner
     // blocks with at least one match).
-
-    // We now create ordered value accessors for both build and probe side,
-    // using the joined tuple TIDs. Note that we have to use this Lambda-based
-    // invocation method here because the accessors don't have a virtual
-    // function that creates such an OrderedTupleIdSequenceAdapterValueAccessor.
-    std::unique_ptr<ValueAccessor> ordered_build_accessor, ordered_probe_accessor;
-    InvokeOnValueAccessorNotAdapter(
-        build_accessor.get(),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-          ordered_build_accessor.reset(
-              accessor->createSharedOrderedTupleIdSequenceAdapter(build_tids));
-        });
-
-    if (probe_accessor->isTupleIdSequenceAdapter()) {
-      InvokeOnTupleIdSequenceAdapterValueAccessor(
-        probe_accessor.get(),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-          ordered_probe_accessor.reset(
-            accessor->createSharedOrderedTupleIdSequenceAdapter(probe_tids));
-        });
-    } else {
-      InvokeOnValueAccessorNotAdapter(
-        probe_accessor.get(),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-          ordered_probe_accessor.reset(
-            accessor->createSharedOrderedTupleIdSequenceAdapter(probe_tids));
-        });
-    }
-
-
-    // We also need a temp value accessor to store results of any scalar expressions.
     ColumnVectorsValueAccessor temp_result;
-
-    // Create a map of ValueAccessors and what attributes we want to pick from them
-    std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> accessor_attribute_map;
-    const std::vector<ValueAccessor *> accessors{
-        ordered_build_accessor.get(), ordered_probe_accessor.get(), &temp_result};
-    const unsigned int build_index = 0, probe_index = 1, temp_index = 2;
-    for (auto &accessor : accessors) {
-      accessor_attribute_map.push_back(std::make_pair(
-          accessor,
-          std::vector<attribute_id>(selection_.size(), kInvalidCatalogId)));
-    }
-
-    attribute_id dest_attr = 0;
-    std::vector<std::pair<tuple_id, tuple_id>> zipped_joined_tuple_ids;
-
-    for (auto &selection_cit : selection_) {
-      // If the Scalar (column) is not an attribute in build/probe blocks, then
-      // insert it into a ColumnVectorsValueAccessor.
-      if (selection_cit->getDataSource() != Scalar::ScalarDataSource::kAttribute) {
-        // Current destination attribute maps to the column we'll create now.
-        accessor_attribute_map[temp_index].second[dest_attr] = temp_result.getNumColumns();
-
-        if (temp_result.getNumColumns() == 0) {
-          // The getAllValuesForJoin function below needs joined tuple IDs as
-          // a vector of pair of (build-tuple-ID, probe-tuple-ID), and we have
-          // a pair of (build-tuple-IDs-vector, probe-tuple-IDs-vector). So
-          // we'll have to zip our two vectors together. We do this inside
-          // the loop because most queries don't exercise this code since
-          // they don't have scalar expressions with attributes from both
-          // build and probe relations (other expressions would have been
-          // pushed down to before the join).
-          zipped_joined_tuple_ids.reserve(build_tids.size());
-          for (std::size_t i = 0; i < build_tids.size(); ++i) {
-            zipped_joined_tuple_ids.push_back(std::make_pair(build_tids[i], probe_tids[i]));
-          }
-        }
-        temp_result.addColumn(
-            selection_cit
-                ->getAllValuesForJoin(build_relation_id, build_accessor.get(),
-                                      probe_relation_id, probe_accessor.get(),
-                                      zipped_joined_tuple_ids));
-      } else {
-        auto scalar_attr = static_cast<const ScalarAttribute *>(selection_cit.get());
-        const attribute_id attr_id = scalar_attr->getAttribute().getID();
-        if (scalar_attr->getAttribute().getParent().getID() == build_relation_id) {
-          accessor_attribute_map[build_index].second[dest_attr] = attr_id;
-        } else {
-          accessor_attribute_map[probe_index].second[dest_attr] = attr_id;
-        }
-      }
-      ++dest_attr;
+    for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
+         selection_cit != selection_.end();
+         ++selection_cit) {
+      temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id,
+                                                                  build_accessor.get(),
+                                                                  probe_relation_id,
+                                                                  probe_accessor.get(),
+                                                                  build_block_entry.second));
     }
 
     // NOTE(chasseur): calling the bulk-insert method of InsertDestination once
     // for each pair of joined blocks incurs some extra overhead that could be
     // avoided by keeping checked-out MutableBlockReferences across iterations
     // of this loop, but that would get messy when combined with partitioning.
-    output_destination_->bulkInsertTuplesFromValueAccessors(accessor_attribute_map);
+    output_destination_->bulkInsertTuples(&temp_result);
   }
 }
 
@@ -654,7 +550,7 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   // We collect all the matching probe relation tuples, as there's a residual
   // preidcate that needs to be applied after collecting these matches.
-  VectorsOfPairsJoinedTuplesCollector collector;
+  MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -863,7 +759,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  VectorsOfPairsJoinedTuplesCollector collector;
+  MapBasedJoinedTupleCollector collector;
   // We probe the hash table and get all the matches. Unlike
   // executeWithoutResidualPredicate(), we have to collect all the matching
   // tuples, because after this step we still have to evalute the residual

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 067edf6..5e83453 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -247,90 +247,6 @@ void InsertDestination::bulkInsertTuplesWithRemappedAttributes(
   });
 }
 
-// A common case that we can optimize away is when the attribute_map
-// for an accessor only contains gaps. e.g. This happens for a join when
-// there are no attributes selected from one side.
-void removeGapOnlyAccessors(
-  const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>>* accessor_attribute_map,
-  std::vector<std::pair<ValueAccessor *, const std::vector<attribute_id>>>* reduced_accessor_attribute_map) {
-  for (std::size_t i = 0; i < accessor_attribute_map->size(); ++i) {
-    bool all_gaps = true;
-    for (const auto &attr : (*accessor_attribute_map)[i].second)
-      if (attr != kInvalidCatalogId) {
-        all_gaps = false;
-        break;
-      }
-    if (all_gaps)
-      continue;
-    reduced_accessor_attribute_map->push_back((*accessor_attribute_map)[i]);
-    (*accessor_attribute_map)[i].first->beginIterationVirtual();
-  }
-}
-
-void InsertDestination::bulkInsertTuplesFromValueAccessors(
-    const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-    bool always_mark_full) {
-  // Handle pathological corner case where there are no accessors
-  if (accessor_attribute_map.size() == 0)
-    return;
-
-  std::vector<std::pair<ValueAccessor *, const std::vector<attribute_id>>> reduced_accessor_attribute_map;
-  removeGapOnlyAccessors(&accessor_attribute_map, &reduced_accessor_attribute_map);
-
-  // We assume that all input accessors have the same number of tuples, so
-  // the iterations finish together. Therefore, we can just check the first one.
-  auto first_accessor = reduced_accessor_attribute_map[0].first;
-  while (!first_accessor->iterationFinishedVirtual()) {
-    tuple_id num_tuples_to_insert = kCatalogMaxID;
-    tuple_id num_tuples_inserted = 0;
-    MutableBlockReference output_block = this->getBlockForInsertion();
-
-    // Now iterate through all the accessors and do one round of bulk-insertion
-    // of partial tuples into the selected output_block.
-    // While inserting from the first ValueAccessor, space is reserved for
-    // all the columns including those coming from other ValueAccessors.
-    // Thereafter, in a given round, we only insert the remaining columns of the
-    // same tuples from the other ValueAccessors.
-    for (auto &p : reduced_accessor_attribute_map) {
-      ValueAccessor *accessor = p.first;
-      std::vector<attribute_id> attribute_map = p.second;
-
-
-      InvokeOnAnyValueAccessor(
-          accessor,
-          [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-            num_tuples_inserted = output_block->bulkInsertPartialTuples(
-                attribute_map, accessor, num_tuples_to_insert);
-      });
-
-      if (accessor == first_accessor) {
-        // Now we know how many full tuples can be inserted into this
-        // output_block (viz. number of tuples inserted from first ValueAccessor).
-        // We should only insert that many tuples from the remaining
-        // ValueAccessors as well.
-        num_tuples_to_insert = num_tuples_inserted;
-      } else {
-        // Since the bulk insertion of the first ValueAccessor should already
-        // have reserved the space for all the other ValueAccessors' columns,
-        // we must have been able to insert all the tuples we asked to insert.
-        DCHECK(num_tuples_inserted == num_tuples_to_insert);
-      }
-    }
-
-    // After one round of insertions, we have successfully inserted as many
-    // tuples as possible into the output_block. Strictly speaking, it's
-    // possible that there is more space for insertions because the size
-    // estimation of variable length columns is conservative. But we will ignore
-    // that case and proceed assuming that this output_block is full.
-
-    // Update the header for output_block and then return it.
-    output_block->bulkInsertPartialTuplesFinalize(num_tuples_inserted);
-    const bool mark_full = always_mark_full
-                           || !first_accessor->iterationFinishedVirtual();
-    this->returnBlock(std::move(output_block), mark_full);
-  }
-}
-
 void InsertDestination::insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                                                std::vector<Tuple>::const_iterator end) {
   if (begin == end) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3487638..408e76b 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -152,10 +152,6 @@ class InsertDestination : public InsertDestinationInterface {
       ValueAccessor *accessor,
       bool always_mark_full = false) override;
 
-  void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) override;
-
   void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                               std::vector<Tuple>::const_iterator end) override;
 
@@ -317,12 +313,6 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
   ~AlwaysCreateBlockInsertDestination() override {
   }
 
-  void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) override  {
-    LOG(FATAL) << "bulkInsertTuplesFromValueAccessors is not implemented for AlwaysCreateBlockInsertDestination";
-  }
-
  protected:
   MutableBlockReference getBlockForInsertion() override;
 
@@ -527,12 +517,6 @@ class PartitionAwareInsertDestination : public InsertDestination {
       ValueAccessor *accessor,
       bool always_mark_full = false) override;
 
-  void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) override  {
-    LOG(FATAL) << "bulkInsertTuplesFromValueAccessors is not implemented for PartitionAwareInsertDestination";
-  }
-
   void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                               std::vector<Tuple>::const_iterator end) override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/InsertDestinationInterface.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestinationInterface.hpp b/storage/InsertDestinationInterface.hpp
index b62d3e5..423dff1 100644
--- a/storage/InsertDestinationInterface.hpp
+++ b/storage/InsertDestinationInterface.hpp
@@ -20,7 +20,6 @@
 #ifndef QUICKSTEP_STORAGE_INSERT_DESTINATION_INTERFACE_HPP_
 #define QUICKSTEP_STORAGE_INSERT_DESTINATION_INTERFACE_HPP_
 
-#include <utility>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -123,27 +122,6 @@ class InsertDestinationInterface {
       bool always_mark_full = false) = 0;
 
   /**
-   * @brief Bulk-insert tuples from one or more ValueAccessors
-   *        into blocks managed by this InsertDestination.
-   *
-   * @warning It is implicitly assumed that all the input ValueAccessors have
-   *          the same number of tuples in them.
-   *
-   * @param accessor_attribute_map A vector of pairs of ValueAccessor and
-   *        corresponding attribute map
-   *        The i-th attribute ID in the attr map for a value accessor is "n" 
-   *        if the attribute_id "i" in the output relation
-   *        is the attribute_id "n" in corresponding input value accessor.
-   *        Set the i-th element to kInvalidCatalogId if it doesn't come from
-   *        the corresponding value accessor.
-   * @param always_mark_full If \c true, always mark the blocks full after
-   *        insertion from ValueAccessor even when partially full.
-   **/
-  virtual void bulkInsertTuplesFromValueAccessors(
-      const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map,
-      bool always_mark_full = false) = 0;
-
-  /**
    * @brief Insert tuples from a range of Tuples in a vector.
    * @warning Unlike bulkInsertTuples(), this is not well-optimized and not
    *          intended for general use. It should only be used by

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index 1e6f7ff..f955c99 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -41,61 +41,54 @@ namespace quickstep {
 
 QUICKSTEP_REGISTER_TUPLE_STORE(SplitRowStoreTupleStorageSubBlock, SPLIT_ROW_STORE);
 
-using splitrow_internal::CopyGroupList;
-using splitrow_internal::ContiguousAttrs;
-using splitrow_internal::NullableAttr;
-using splitrow_internal::VarLenAttr;
-
-const std::size_t SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize = sizeof(std::uint32_t) * 2;
-
 namespace {
 
-  template<typename ValueAccessorT, bool nullable_attrs>
-  inline std::size_t CalculateVariableSize(
+template <typename ValueAccessorT, bool nullable_attrs>
+inline std::size_t CalculateVariableSize(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor) {
-    std::size_t total_size = 0;
-    attribute_id accessor_attr_id = 0;
-    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-         attr_it != relation.end();
-         ++attr_it, ++accessor_attr_id) {
-      if (!attr_it->getType().isVariableLength()) {
-        continue;
-      }
+  std::size_t total_size = 0;
+  attribute_id accessor_attr_id = 0;
+  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+       attr_it != relation.end();
+       ++attr_it, ++accessor_attr_id) {
+    if (!attr_it->getType().isVariableLength()) {
+      continue;
+    }
 
-      TypedValue value(accessor.getTypedValue(accessor_attr_id));
-      if (nullable_attrs && value.isNull()) {
-        continue;
-      }
-      total_size += value.getDataSize();
+    TypedValue value(accessor.getTypedValue(accessor_attr_id));
+    if (nullable_attrs && value.isNull()) {
+      continue;
     }
-    return total_size;
+    total_size += value.getDataSize();
   }
+  return total_size;
+}
 
-  template<typename ValueAccessorT, bool nullable_attrs>
-  inline std::size_t CalculateVariableSizeWithRemappedAttributes(
+template <typename ValueAccessorT, bool nullable_attrs>
+inline std::size_t CalculateVariableSizeWithRemappedAttributes(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor,
     const std::vector<attribute_id> &attribute_map) {
-    std::size_t total_size = 0;
-    std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-         attr_it != relation.end();
-         ++attr_it, ++attr_map_it) {
-      if (!attr_it->getType().isVariableLength()) {
-        continue;
-      }
+  std::size_t total_size = 0;
+  std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+       attr_it != relation.end();
+       ++attr_it, ++attr_map_it) {
+    if (!attr_it->getType().isVariableLength()) {
+      continue;
+    }
 
-      TypedValue value(accessor.getTypedValue(*attr_map_it));
-      if (nullable_attrs && value.isNull()) {
-        continue;
-      }
-      total_size += value.getDataSize();
+    TypedValue value(accessor.getTypedValue(*attr_map_it));
+    if (nullable_attrs && value.isNull()) {
+      continue;
     }
-    return total_size;
+    total_size += value.getDataSize();
   }
+  return total_size;
+}
 
-}  // anonymous namespace
+}  // namespace
 
 SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
     const CatalogRelationSchema &relation,
@@ -108,10 +101,7 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                            new_block,
                            sub_block_memory,
                            sub_block_memory_size),
-      header_(static_cast<Header*>(sub_block_memory)),
-      num_null_attrs_(0),
-      num_fixed_attrs_(0),
-      num_var_attrs_(0) {
+      header_(static_cast<Header*>(sub_block_memory)) {
   if (!DescriptionIsValid(relation_, description_)) {
     FATAL_ERROR("Attempted to construct a SplitRowStoreTupleStorageSubBlock from an invalid description.");
   }
@@ -153,21 +143,6 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                    + sizeof(Header) + occupancy_bitmap_bytes_;
   tuple_storage_bytes_ = sub_block_memory_size_ - (sizeof(Header) + occupancy_bitmap_bytes_);
 
-  // Some accounting information for bulk inserts.
-  for (attribute_id attr_id = 0;
-       attr_id < static_cast<attribute_id>(relation.size());
-       ++attr_id) {
-    const Type& attr_type = relation.getAttributeById(attr_id)->getType();
-    if (attr_type.isVariableLength()) {
-      fixed_len_attr_sizes_.push_back(kInvalidAttributeID);
-      num_var_attrs_++;
-    } else {
-      fixed_len_attr_sizes_.push_back(attr_type.maximumByteLength());
-      num_fixed_attrs_++;
-    }
-    num_null_attrs_ += attr_type.isNullable();
-  }
-
   if (new_block) {
     // Only need to initialize these fields, the rest of the block will be
     // zeroed-out by the StorageManager.
@@ -219,218 +194,380 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
 }
 
 tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(ValueAccessor *accessor) {
-  std::vector<attribute_id> simple_remap;
-  for (attribute_id attr_id = 0;
-      attr_id < static_cast<attribute_id>(relation_.size());
-      ++attr_id) {
-    simple_remap.push_back(attr_id);
-  }
-  return bulkInsertDispatcher(simple_remap, accessor, kCatalogMaxID, true);
-}
+  const tuple_id original_num_tuples = header_->num_tuples;
+  tuple_id pos = 0;
 
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuples(
-  const std::vector<attribute_id> &attribute_map,
-  ValueAccessor *accessor,
-  const tuple_id max_num_tuples_to_insert) {
-  return bulkInsertDispatcher(attribute_map, accessor, max_num_tuples_to_insert, false);
-}
-
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertDispatcher(
-  const std::vector<attribute_id> &attribute_map,
-  ValueAccessor *accessor,
-  tuple_id max_num_tuples_to_insert,
-  bool finalize) {
-  const bool fill_to_capacity = max_num_tuples_to_insert == kCatalogMaxID;
-
-  CopyGroupList copy_groups;
-  getCopyGroupsForAttributeMap(attribute_map, &copy_groups);
-  auto impl = accessor->getImplementationType();
-  const bool is_rowstore_source =
-    (impl == ValueAccessor::Implementation::kPackedRowStore ||
-     impl == ValueAccessor::Implementation::kSplitRowStore);
-  if (is_rowstore_source) {
-    copy_groups.merge_contiguous();
-  }
-
-  const bool copy_nulls = copy_groups.nullable_attrs_.size() > 0;
-  const bool copy_varlen = copy_groups.varlen_attrs_.size() > 0;
-
-  if (fill_to_capacity) {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
     if (relation_.hasNullableAttributes()) {
-      // TODO(marc) This is an annoying gotcha: the insertion loop assumes the null
-      // bitmaps are zero'd for a fresh insert. We could clear the bit map on each tuple
-      // iteration, but that'd be costlier.
-      std::int64_t remaining_bytes = tuple_storage_bytes_ -
-                                     (header_->variable_length_bytes_allocated +
-                                      (header_->num_tuples * tuple_slot_bytes_));
-      memset(static_cast<char *>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_, 0x0, remaining_bytes);
-    }
-  }
-
-  tuple_id num_inserted = 0;
-  if (max_num_tuples_to_insert == kCatalogMaxID) {
-    max_num_tuples_to_insert = getInsertLowerBound();
-  }
-  if (copy_varlen) {
-    if (copy_nulls) {
-      if (fill_to_capacity) {
-        num_inserted = bulkInsertPartialTuplesImpl<true, true, true>(copy_groups, accessor,
-                                                                     max_num_tuples_to_insert);
+      if (relation_.isVariableLength()) {
+        while (accessor->next()) {
+          // If packed, insert at the end of the slot array, otherwise find the
+          // first hole.
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSize<decltype(*accessor), true>(relation_, *accessor);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
+          }
+          // Allocate variable-length storage.
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          // Find the slot and locate its sub-structures.
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          // Start writing variable-length data at the beginning of the newly
+          // allocated range.
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
+            if ((nullable_idx != -1) && (attr_value.isNull())) {
+              // Set null bit and move on.
+              tuple_null_bitmap.setBit(nullable_idx, true);
+              continue;
+            }
+            if (variable_idx != -1) {
+              // Write offset and size into the slot, then copy the actual
+              // value into the variable-length storage region.
+              const std::size_t attr_size = attr_value.getDataSize();
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              // Copy fixed-length value directly into the slot.
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+            }
+          }
+          // Update occupancy bitmap and header.
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       } else {
-        num_inserted = bulkInsertPartialTuplesImpl<true, true, false>(copy_groups, accessor,
-                                                                      max_num_tuples_to_insert);
+        // Same as above, but skip variable-length checks.
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            if (nullable_idx != -1) {
+              const void *attr_value = accessor->template getUntypedValue<true>(accessor_attr_id);
+              if (attr_value == nullptr) {
+                tuple_null_bitmap.setBit(nullable_idx, true);
+              } else {
+                std::memcpy(fixed_length_attr_storage
+                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                            attr_value,
+                            attr_it->getType().maximumByteLength());
+              }
+            } else {
+              const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
+              std::memcpy(fixed_length_attr_storage
+                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                          attr_value,
+                          attr_it->getType().maximumByteLength());
+            }
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       }
     } else {
-      if (fill_to_capacity) {
-        num_inserted = bulkInsertPartialTuplesImpl<false, true, true>(copy_groups, accessor,
-                                                                      max_num_tuples_to_insert);
+      if (relation_.isVariableLength()) {
+        // Same as most general case above, but skip null checks.
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSize<decltype(*accessor), false>(relation_, *accessor);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
+          }
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
+            if (variable_idx != -1) {
+              const std::size_t attr_size = attr_value.getDataSize();
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+            }
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       } else {
-        num_inserted = bulkInsertPartialTuplesImpl<false, true, false>(copy_groups, accessor,
-                                                                       max_num_tuples_to_insert);
+        // Simplest case: skip both null and variable-length checks.
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          attribute_id accessor_attr_id = 0;
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++accessor_attr_id) {
+            const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
+            std::memcpy(fixed_length_attr_storage
+                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                        attr_value,
+                        attr_it->getType().maximumByteLength());
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
       }
     }
-  } else {
-    if (copy_nulls) {
-      num_inserted = bulkInsertPartialTuplesImpl<true, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
-    } else {
-      num_inserted = bulkInsertPartialTuplesImpl<false, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
-    }
-  }
+  });
 
-  if (finalize) {
-    bulkInsertPartialTuplesFinalize(num_inserted);
-  }
-  return num_inserted;
+  return header_->num_tuples - original_num_tuples;
 }
 
-// copy_nulls is true if the incoming attributes include at least one nullable attribute
-// copy_varlen is true if the incoming attributes include at least one varlen attribute
-template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesImpl(
-  const CopyGroupList &copy_groups,
-  ValueAccessor *accessor,
-  std::size_t max_num_tuples_to_insert) {
-  std::size_t num_tuples_inserted = 0;
-
-  // We only append to the end of the block to cut down on complexity.
-  char *tuple_slot = static_cast<char *>(tuple_storage_) +  header_->num_tuples * tuple_slot_bytes_;
-
-  std::uint32_t varlen_heap_offset = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-  std::uint32_t varlen_heap_offset_orig = varlen_heap_offset;
-
-  BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
-  char *fixed_len_cursor = tuple_slot + BitVector<true>::BytesNeeded(num_null_attrs_);
-
-
-
-  std::size_t storage_available = tuple_storage_bytes_ -
-                                    (header_->variable_length_bytes_allocated +
-                                     header_->num_tuples * tuple_slot_bytes_);
-
-  // The number of bytes that must be reserved per tuple inserted due to gaps.
-  std::size_t varlen_reserve = relation_.getMaximumVariableByteLength();
-  if (fill_to_capacity) {
-    for (std::size_t vattr_idx = 0; vattr_idx < copy_groups.varlen_attrs_.size(); vattr_idx++) {
-      varlen_reserve -= relation_.getAttributeById(
-        copy_groups.varlen_attrs_[vattr_idx].dst_attr_id_)->getType().maximumByteLength();
-    }
-    DCHECK_GE(relation_.getMaximumVariableByteLength(), varlen_reserve);
-  }
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor) {
+  DEBUG_ASSERT(attribute_map.size() == relation_.size());
+  const tuple_id original_num_tuples = header_->num_tuples;
+  tuple_id pos = 0;
 
   InvokeOnAnyValueAccessor(
-    accessor,
-    [&](auto *accessor) -> void {  // NOLINT(build/c++11
-      do {
-        const std::size_t num_c_attr = copy_groups.contiguous_attrs_.size();
-        const std::size_t num_n_attr = copy_groups.nullable_attrs_.size();
-        const std::size_t num_v_attr = copy_groups.varlen_attrs_.size();
-
-        const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
-
-        while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
-          for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
-            const ContiguousAttrs &cattr = copy_groups.contiguous_attrs_[cattr_idx];
-            fixed_len_cursor += cattr.bytes_to_advance_;
-            const void *attr_value = accessor->template getUntypedValue<false>(cattr.src_attr_id_);
-            std::memcpy(fixed_len_cursor, attr_value, cattr.bytes_to_copy_);
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    if (relation_.hasNullableAttributes()) {
+      if (relation_.isVariableLength()) {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), true>(
+                  relation_, *accessor, attribute_map);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
           }
-
-          if (copy_nulls) {
-            tuple_null_bitmap.setMemory(tuple_slot);
-            for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
-              const NullableAttr &nattr = copy_groups.nullable_attrs_[nattr_idx];
-              const void *attr_value = accessor->template getUntypedValue<true>(nattr.src_attr_id_);
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
+            if ((nullable_idx != -1) && (attr_value.isNull())) {
+              tuple_null_bitmap.setBit(nullable_idx, true);
+              continue;
+            }
+            if (variable_idx != -1) {
+              const std::size_t attr_size = attr_value.getDataSize();
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+            }
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
+      } else {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          BitVector<true> tuple_null_bitmap(tuple_slot,
+                                            relation_.numNullableAttributes());
+          tuple_null_bitmap.clear();
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
+            if (nullable_idx != -1) {
+              const void *attr_value = accessor->template getUntypedValue<true>(*attr_map_it);
               if (attr_value == nullptr) {
-                tuple_null_bitmap.setBit(nattr.nullable_attr_idx_, true);
+                tuple_null_bitmap.setBit(nullable_idx, true);
+              } else {
+                std::memcpy(fixed_length_attr_storage
+                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                            attr_value,
+                            attr_it->getType().maximumByteLength());
               }
+            } else {
+              const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
+              std::memcpy(fixed_length_attr_storage
+                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                          attr_value,
+                          attr_it->getType().maximumByteLength());
             }
           }
-
-          if (copy_varlen) {
-            for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
-              const VarLenAttr &vattr = copy_groups.varlen_attrs_[vattr_idx];
-              fixed_len_cursor += vattr.bytes_to_advance_;
-              // Typed value is necessary as we need the length.
-              const TypedValue &attr_value = accessor->template getTypedValue(vattr.src_attr_id_);
-              if (attr_value.isNull()) {
-                continue;
-              }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
+        }
+      }
+    } else {
+      if (relation_.isVariableLength()) {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          const std::size_t tuple_variable_bytes
+              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), false>(
+                  relation_, *accessor, attribute_map);
+          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
+            accessor->previous();
+            break;
+          }
+          header_->variable_length_bytes_allocated += tuple_variable_bytes;
+
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
+              fixed_length_attr_storage + relation_.getFixedByteLength());
+          std::uint32_t current_variable_position
+              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
+            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
+            if (variable_idx != -1) {
               const std::size_t attr_size = attr_value.getDataSize();
-              varlen_heap_offset -= attr_size;
-              std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset, attr_value.getDataPtr(),
-                          attr_size);
-              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[0] = varlen_heap_offset;
-              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[1] = static_cast<std::uint32_t>(attr_size);
+              variable_length_info_array[variable_idx << 1] = current_variable_position;
+              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
+              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
+              current_variable_position += attr_size;
+            } else {
+              attr_value.copyInto(fixed_length_attr_storage
+                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
             }
           }
-          tuple_slot += tuple_slot_bytes_;
-          fixed_len_cursor = tuple_slot + nullmap_size;
-          num_tuples_inserted++;
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
+          }
         }
-        if (fill_to_capacity) {
-          std::int64_t remaining_storage_after_inserts = storage_available -
-                                                         (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
-                                                          (varlen_heap_offset_orig - varlen_heap_offset));
-          DCHECK_LE(0, remaining_storage_after_inserts);
-          std::size_t additional_tuples_insert =
-            remaining_storage_after_inserts / (tuple_slot_bytes_ + this->relation_.getMaximumByteLength());
-          // We want to avoid a situation where we have several short insert iterations
-          // near the end of an insertion cycle.
-          if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
-            max_num_tuples_to_insert += additional_tuples_insert;
+      } else {
+        while (accessor->next()) {
+          pos = this->isPacked() ? header_->num_tuples
+                                 : occupancy_bitmap_->firstZero(pos);
+          if (!this->spaceToInsert(pos, 0)) {
+            accessor->previous();
+            break;
+          }
+          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
+          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
+
+          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+               attr_it != relation_.end();
+               ++attr_it, ++attr_map_it) {
+            const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
+            std::memcpy(fixed_length_attr_storage
+                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
+                        attr_value,
+                        attr_it->getType().maximumByteLength());
+          }
+          occupancy_bitmap_->setBit(pos, true);
+          ++(header_->num_tuples);
+          if (pos > header_->max_tid) {
+            header_->max_tid = pos;
           }
         }
-      } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
-               num_tuples_inserted < max_num_tuples_to_insert);
-    });
-
-  if (copy_varlen) {
-    header_->variable_length_bytes_allocated += (varlen_heap_offset_orig - varlen_heap_offset);
-  }
-
-  return num_tuples_inserted;
-}
-
-void SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesFinalize(
-    const tuple_id num_tuples_inserted) {
-  occupancy_bitmap_->setBitRange(header_->max_tid + 1, num_tuples_inserted, true);
-  header_->num_tuples += num_tuples_inserted;
-  header_->max_tid += num_tuples_inserted;
-}
-
-std::size_t SplitRowStoreTupleStorageSubBlock::getInsertLowerBound() const {
-  const std::size_t remaining_storage_bytes = tuple_storage_bytes_ -
-                                              (header_->variable_length_bytes_allocated +
-                                               ((header_->max_tid + 1) * tuple_slot_bytes_));
-  const std::size_t tuple_max_size = tuple_slot_bytes_ + relation_.getMaximumVariableByteLength();
-  return remaining_storage_bytes / tuple_max_size;
-}
+      }
+    }
+  });
 
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor) {
-  DCHECK_EQ(relation_.size(), attribute_map.size());
-  return bulkInsertDispatcher(attribute_map, accessor, kCatalogMaxID, true);
+  return header_->num_tuples - original_num_tuples;
 }
 
 const void* SplitRowStoreTupleStorageSubBlock::getAttributeValue(
@@ -865,67 +1002,4 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
   return InsertResult(pos, false);
 }
 
-// Copy groups are used by insert algorithms to efficiently copy attributes from a
-// variety of source schemas with some matching attributes in the destination (this) store.
-// SplitRow has 3 distinct zones which define a physical tuple:
-//    [null_bitmap] [fixed_length_zone] [var_len_pairs]
-// When we do our insert algorithm, we first copy over fixed length attributes. Since there
-// can be gaps, and reorderings in the source schema, we need to know:
-//    * Where to copy the src attr into (ie offset from start of fixed_len_zone)
-//    * How many bytes to copy
-//    * Which src attr we are copying
-// When copying fixed length attributes, we calculate the offset into our tuple, do a memcpy for
-// the length of the data with the src attribute.
-//
-// Copying variable length attributes pairs is similar. Note that there is a heap at the end of
-// the SplitRow for actual data and the tuple contains pairs of (heap offset, length). Having to
-// copy varlen into the heap is the main difference from copying fixed length.
-void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
-  const std::vector<attribute_id> &attribute_map,
-  CopyGroupList *copy_groups) {
-  DCHECK_EQ(attribute_map.size(), relation_.size());
-
-  attribute_id num_attrs = attribute_map.size();
-
-  std::size_t contig_adv = 0;
-  std::size_t varlen_adv = 0;
-  for (attribute_id attr_id = 0; attr_id < num_attrs; ++attr_id) {
-    attribute_id src_attr = attribute_map[attr_id];
-
-    // Attribute doesn't exist in src.
-    if (src_attr == kInvalidCatalogId) {
-      // create a placeholder for now
-      if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
-        // fixed len
-        contig_adv += fixed_len_attr_sizes_[attr_id];
-      } else {
-        // var len
-        varlen_adv += kVarLenSlotSize;
-      }
-      continue;
-    }
-
-    // Attribute exists in src.
-    if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
-      // fixed len
-      copy_groups->contiguous_attrs_.push_back(
-        ContiguousAttrs(src_attr, fixed_len_attr_sizes_[attr_id], contig_adv));
-      contig_adv = fixed_len_attr_sizes_[attr_id];
-    } else {
-      // var len
-      copy_groups->varlen_attrs_.push_back(VarLenAttr(src_attr, attr_id, varlen_adv));
-      varlen_adv = SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize;
-    }
-
-    if (relation_.getNullableAttributeIndex(attr_id) != -1) {
-      copy_groups->nullable_attrs_.push_back(
-        NullableAttr(src_attr, relation_.getNullableAttributeIndex(attr_id)));
-    }
-  }
-  // This will point us to the beginning of the varlen zone.
-  if (copy_groups->varlen_attrs_.size() > 0) {
-    copy_groups->varlen_attrs_[0].bytes_to_advance_ += contig_adv;
-  }
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index 89c756d..a930103 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -45,150 +45,6 @@ class ValueAccessor;
 
 QUICKSTEP_DECLARE_SUB_BLOCK_TYPE_REGISTERED(SplitRowStoreTupleStorageSubBlock);
 
-namespace splitrow_internal {
-// A CopyGroup contains information about ane run of attributes in the source
-// ValueAccessor that can be copied into the output block. The
-// getCopyGroupsForAttributeMap function below takes an attribute map for a source
-// and converts it into a sequence of runs. The goal is to minimize the number
-// of memcpy calls and address calculations that occur during bulk insertion.
-// Contiguous attributes from a rowstore source can be merged into a single copy group.
-//
-// A single ContiguousAttrs CopyGroup consists of contiguous attributes, nullable
-// or not. "Contiguous" here means that their attribute IDs are successive in both
-// the source and destination relations.
-//
-// A NullAttr refers to exactly one nullable attribute. Nullable columns are
-// represented using fixed length inline data as well as a null bitmap.
-// In a particular tuple, if the attribute has a null value, the inline data
-// has no meaning. So it is safe to copy it or not. We use this fact to merge
-// runs together aggressively, i.e., a ContiguousAttrs group may include a
-// nullable attribute. However, we also create a NullableAttr in that case in
-// order to check the null bitmap.
-//
-// A gap is a run of destination (output) attributes that don't come from a
-// particular source. This occurs during bulkInsertPartialTuples. They must be
-// skipped during the insert (not copied over). They are indicated by a
-// kInvalidCatalogId in the attribute map. For efficiency, the gap size
-// is merged into the bytes_to_advance_ of previous ContiguousAttrs copy group.
-// For gaps at the start of the attribute map, we just create a ContiguousAttrs
-// copy group with 0 bytes to copy and dummy (0) source attribute id.
-//
-// eg. For 4B integer attrs, from a row store source,
-// if the input attribute_map is {-1,0,5,6,7,-1,2,4,9,10,-1}
-// with input/output attributes 4 and 7 being nullable,
-// we will create the following ContiguousAttrs copy groups
-//
-//  ----------------------------------------------------
-//  |src_id_      |bytes_to_advance_| bytes_to_copy_   |
-//  |-------------|-----------------|------------------|
-//  |            0|                4|                 4|
-//  |            5|                4|                12|
-//  |            2|               16|                 4|
-//  |            4|                4|                 4|
-//  |            9|                4|                 8|
-//  ----------------------------------------------------
-// and two NullableAttrs with src_attr_id_ set to 4 and 7.
-//
-// In this example, we do 6 memcpy calls and 6 address calculations
-// as well as 2 bitvector lookups for each tuple. A naive copy algorithm
-// would do 11 memcpy calls and address calculations, along with the
-// bitvector lookups, not to mention the schema lookups,
-// all interspersed in a complex loop with lots of branches.
-//
-// If the source was a column store, then we can't merge contiguous
-// attributes (or gaps). So we would have 11 ContigousAttrs copy groups with
-// three of them having bytes_to_copy = 0 (corresponding to the gaps) and
-// the rest having bytes_to_copy_ = 4.
-//
-// SplitRowStore supports variable length attributes. Since the layout of the
-// tuple is like: [null bitmap][fixed length attributes][variable length offsets]
-// we do all the variable length copies after the fixed length copies.
-//
-struct CopyGroup {
-  attribute_id src_attr_id_;  // The attr_id of starting input attribute for run.
-
-  explicit CopyGroup(const attribute_id source_attr_id)
-    : src_attr_id_(source_attr_id) {}
-};
-
-struct ContiguousAttrs : public CopyGroup {
-  std::size_t bytes_to_advance_;  // Number of bytes to advance destination ptr
-                                  // to get to the location where we copy THIS attribute.
-  std::size_t bytes_to_copy_;     // Number of bytes to copy from source.
-
-  ContiguousAttrs(
-    const attribute_id source_attr_id,
-    const std::size_t bytes_to_copy,
-    const std::size_t bytes_to_advance)
-    : CopyGroup(source_attr_id),
-      bytes_to_advance_(bytes_to_advance),
-      bytes_to_copy_(bytes_to_copy) { }
-};
-
-struct VarLenAttr : public CopyGroup {
-  std::size_t bytes_to_advance_;
-  attribute_id dst_attr_id_;
-  VarLenAttr(const attribute_id source_attr_id,
-             const attribute_id dst_attr_id,
-             const std::size_t bytes_to_advance)
-    : CopyGroup(source_attr_id),
-      bytes_to_advance_(bytes_to_advance),
-      dst_attr_id_(dst_attr_id) {}
-};
-
-struct NullableAttr : public CopyGroup {
-  int nullable_attr_idx_;  // index into null bitmap
-
-  NullableAttr(attribute_id source_attr_id_,
-               int nullable_attr_idx)
-    : CopyGroup(source_attr_id_),
-      nullable_attr_idx_(nullable_attr_idx) {}
-};
-
-struct CopyGroupList {
-  CopyGroupList()
-    : contiguous_attrs_(),
-      nullable_attrs_(),
-      varlen_attrs_() {}
-
-  /**
-   * @brief Attributes which are exactly sequential are merged to a single copy.
-   */
-  void merge_contiguous() {
-    if (contiguous_attrs_.size() < 2) {
-      return;
-    }
-
-    int add_to_advance = 0;
-    for (std::size_t idx = 1; idx < contiguous_attrs_.size(); ++idx) {
-      ContiguousAttrs *current_attr = &contiguous_attrs_[idx];
-      ContiguousAttrs *previous_attr = &contiguous_attrs_[idx - 1];
-      if (add_to_advance > 0) {
-        current_attr->bytes_to_advance_ += add_to_advance;
-        add_to_advance = 0;
-      }
-      // The merge step:
-      if (previous_attr->src_attr_id_ + 1 == current_attr->src_attr_id_ &&
-            previous_attr->bytes_to_copy_ == current_attr->bytes_to_advance_) {
-        previous_attr->bytes_to_copy_ += current_attr->bytes_to_copy_;
-        add_to_advance += current_attr->bytes_to_advance_;
-        contiguous_attrs_.erase(contiguous_attrs_.begin() + idx);
-        idx--;
-      }
-    }
-
-    if (varlen_attrs_.size() > 0) {
-      varlen_attrs_[0].bytes_to_advance_ += add_to_advance;
-    }
-  }
-
-  std::vector<ContiguousAttrs> contiguous_attrs_;
-  std::vector<NullableAttr> nullable_attrs_;
-  std::vector<VarLenAttr> varlen_attrs_;
-};
-
-}  // namespace splitrow_internal
-
 /** \addtogroup Storage
  *  @{
  */
@@ -204,8 +60,6 @@ struct CopyGroupList {
  *       storage can be reclaimed by calling rebuild().
  **/
 class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
-  static const std::size_t kVarLenSlotSize;
-
  public:
   SplitRowStoreTupleStorageSubBlock(const CatalogRelationSchema &relation,
                                     const TupleStorageSubBlockDescription &description,
@@ -301,13 +155,6 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor) override;
 
-  tuple_id bulkInsertPartialTuples(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    const tuple_id max_num_tuples_to_insert) override;
-
-  void bulkInsertPartialTuplesFinalize(const tuple_id num_tuples_inserted) override;
-
   const void* getAttributeValue(const tuple_id tuple,
                                 const attribute_id attr) const override;
 
@@ -366,33 +213,6 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   template <bool nullable_attrs, bool variable_length_attrs>
   InsertResult insertTupleImpl(const Tuple &tuple);
 
-  template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
-  tuple_id bulkInsertPartialTuplesImpl(
-    const splitrow_internal::CopyGroupList &copy_groups,
-    ValueAccessor *accessor,
-    std::size_t max_num_tuples_to_insert);
-
-  tuple_id bulkInsertDispatcher(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    tuple_id max_num_tuples_to_insert,
-    bool finalize);
-
-  void getCopyGroupsForAttributeMap(
-    const std::vector<attribute_id> &attribute_map,
-    splitrow_internal::CopyGroupList *copy_groups);
-
-  std::size_t getInsertLowerBound() const;
-
-  // When varlen attributes are bulk inserted, the difference between the maximum
-  // possible size and the actual size of the tuples will cause an underestimate of
-  // the number of tuples we can insert. This threshold puts a limit on the number
-  // of tuples to attempt to insert. A smaller number will give more rounds of insertion
-  // and a more-packed block, but at the cost of insertion speed.
-  std::size_t getInsertLowerBoundThreshold() const {
-    return 10;
-  }
-
   Header *header_;
 
   std::unique_ptr<BitVector<false>> occupancy_bitmap_;
@@ -401,18 +221,12 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   void *tuple_storage_;
   std::size_t tuple_storage_bytes_;
   std::size_t tuple_slot_bytes_;
-  std::vector<std::size_t> fixed_len_attr_sizes_;
-
-  std::size_t num_null_attrs_;
-  std::size_t num_fixed_attrs_;
-  std::size_t num_var_attrs_;
 
   std::size_t per_tuple_null_bitmap_bytes_;
 
   friend class SplitRowStoreTupleStorageSubBlockTest;
   friend class SplitRowStoreValueAccessor;
   FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, InitializeTest);
-  FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest);
 
   DISALLOW_COPY_AND_ASSIGN(SplitRowStoreTupleStorageSubBlock);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 6267d6b..ea74ee6 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -284,30 +284,6 @@ tuple_id StorageBlock::bulkInsertTuplesWithRemappedAttributes(
   return num_inserted;
 }
 
-tuple_id StorageBlock::bulkInsertPartialTuples(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    const tuple_id max_num_tuples_to_insert) {
-  const tuple_id num_inserted
-      = tuple_store_->bulkInsertPartialTuples(attribute_map,
-                                              accessor,
-                                              max_num_tuples_to_insert);
-  if (num_inserted != 0) {
-    invalidateAllIndexes();
-    dirty_ = true;
-  } else if (tuple_store_->isEmpty()) {
-    if (!accessor->iterationFinishedVirtual()) {
-      throw TupleTooLargeForBlock(0);
-    }
-  }
-  return num_inserted;
-}
-
-void StorageBlock::bulkInsertPartialTuplesFinalize(
-    const tuple_id num_tuples_inserted) {
-  tuple_store_->bulkInsertPartialTuplesFinalize(num_tuples_inserted);
-}
-
 void StorageBlock::sample(const bool is_block_sample,
                           const int percentage,
                           InsertDestinationInterface *destination) const {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index ed252c5..56b3bdc 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -307,7 +307,6 @@ class StorageBlock : public StorageBlockBase {
    *        iteration will be advanced to the first non-inserted tuple or, if
    *        all accessible tuples were inserted in this block, to the end
    *        position.
-   * @param max_tuples_to_insert Insert at most these many tuples
    * @return The number of tuples inserted from accessor.
    **/
   tuple_id bulkInsertTuplesWithRemappedAttributes(
@@ -315,49 +314,6 @@ class StorageBlock : public StorageBlockBase {
       ValueAccessor *accessor);
 
   /**
-   * @brief Insert up to max_num_tuples_to_insert tuples from a ValueAccessor
-   *        as a single batch, using the attribute_map to project and reorder
-   *        columns from the input ValueAccessor. Does not update header.
-   *
-   * @note Typical usage is where you want to bulk-insert columns from two
-   *       or more value accessors. Instead of writing out the columns into
-   *       one or more column vector value accessors, you can simply use this
-   *       function with the appropriate attribute_map for each value
-   *       accessor (InsertDestination::bulkInsertTuplesFromValueAccessors
-   *       handles all the details) to insert tuples without an extra temp copy.
-   * 
-   * @warning Must call bulkInsertPartialTuplesFinalize() to update the header,
-   *          until which point, the insertion is not visible to others.
-   * @warning The inserted tuples may be placed in sub-optimal locations in this
-   *          TupleStorageSubBlock.
-   *
-   * @param attribute_map A vector which maps the attributes of this
-   *        TupleStorageSubBlock's relation (gaps indicated with kInvalidCatalogId)
-   *         to the corresponding attributes which should be read from accessor.
-   * @param accessor A ValueAccessor to insert tuples from. The accessor's
-   *        iteration will be advanced to the first non-inserted tuple or, if
-   *        all accessible tuples were inserted in this sub-block, to the end
-   *        position.
-   * @return The number of tuples inserted from accessor.
-   **/
-  tuple_id bulkInsertPartialTuples(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor,
-    const tuple_id max_num_tuples_to_insert);
-
-  /**
-   * @brief Update header after a bulkInsertPartialTuples.
-   *
-   * @warning Only call this after a bulkInsertPartialTuples, passing in the
-   *          number of tuples that were inserted (return value of that function).
-   *
-   * @param num_tuples_inserted Number of tuples inserted (i.e., how much to
-   *        advance the header.num_tuples by). Should be equal to the return
-   *        value of bulkInsertPartialTuples.
-   **/
-  void bulkInsertPartialTuplesFinalize(tuple_id num_tuples_inserted);
-
-  /**
    * @brief Get the IDs of tuples in this StorageBlock which match a given Predicate.
    *
    * @param predicate The predicate to match.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31c80934/storage/TupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/TupleStorageSubBlock.hpp b/storage/TupleStorageSubBlock.hpp
index 26e8027..aed6eea 100644
--- a/storage/TupleStorageSubBlock.hpp
+++ b/storage/TupleStorageSubBlock.hpp
@@ -272,56 +272,6 @@ class TupleStorageSubBlock {
       ValueAccessor *accessor) = 0;
 
   /**
-   * @brief Insert up to max_num_tuples_to_insert tuples from a ValueAccessor
-   *        as a single batch, using the attribute_map to project and reorder
-   *        columns from the input ValueAccessor. Does not update header.
-   *
-   * @note Typical usage is where you want to bulk-insert columns from two
-   *       or more value accessors. Instead of writing out the columns into
-   *       one or more column vector value accessors, you can simply use this
-   *       function with the appropriate attribute_map for each value
-   *       accessor (InsertDestination::bulkInsertTuplesFromValueAccessors
-   *       handles all the details) to insert tuples without an extra temp copy.
-   * 
-   * @warning Must call bulkInsertPartialTuplesFinalize() to update the header,
-   *          until which point, the insertion is not visible to others.
-   * @warning The inserted tuples may be placed in a suboptimal position in the
-   *          block.
-   *
-   * @param attribute_map A vector which maps the attributes of this
-   *        TupleStorageSubBlock's relation (gaps indicated with kInvalidCatalogId)
-   *         to the corresponding attributes which should be read from accessor.
-   * @param accessor A ValueAccessor to insert tuples from. The accessor's
-   *        iteration will be advanced to the first non-inserted tuple or, if
-   *        all accessible tuples were inserted in this sub-block, to the end
-   *        position.
-   * @return The number of tuples inserted from accessor.
-   **/
-  virtual tuple_id bulkInsertPartialTuples(
-      const std::vector<attribute_id> &attribute_map,
-      ValueAccessor *accessor,
-      const tuple_id max_num_tuples_to_insert) {
-    LOG(FATAL) << "Partial bulk insert is not supported for this TupleStorageBlock type ("
-               << getTupleStorageSubBlockType() << ").";
-  }
-
-  /**
-   * @brief Update header after a bulkInsertPartialTuples.
-   *
-   * @warning Only call this after a bulkInsertPartialTuples, passing in the
-   *          number of tuples that were inserted (return value of that function).
-   *
-   * @param num_tuples_inserted Number of tuples inserted (i.e., how much to
-   *        advance the header.num_tuples by). Should be equal to the return
-   *        value of bulkInsertPartialTuples.
-   **/
-  virtual void bulkInsertPartialTuplesFinalize(
-      const tuple_id num_tuples_inserted) {
-    LOG(FATAL) << "Partial bulk insert is not supported for this TupleStorageBlock type ("
-               << getTupleStorageSubBlockType() << ").";
-  }
-
-  /**
    * @brief Get the (untyped) value of an attribute in a tuple in this buffer.
    * @warning This method may not be supported for all implementations of
    *          TupleStorageSubBlock. supportsUntypedGetAttributeValue() MUST be


[3/3] incubator-quickstep git commit: Copy test

Posted by ji...@apache.org.
Copy test


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c7fdc360
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c7fdc360
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c7fdc360

Branch: refs/heads/output-attr-order
Commit: c7fdc360e9f3ec9466d7854db8ee7fb85630ae91
Parents: 31c8093
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Jan 5 20:07:44 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Jan 5 20:07:44 2017 -0600

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt             |   1 +
 relational_operators/HashJoinOperator.cpp       | 143 ++++++++--
 storage/CMakeLists.txt                          |  10 +
 storage/InsertContext.hpp                       | 281 +++++++++++++++++++
 storage/InsertDestination.cpp                   |  28 ++
 storage/InsertDestination.hpp                   |   9 +-
 storage/SplitRowStoreTupleStorageSubBlock.cpp   |  22 ++
 storage/SplitRowStoreTupleStorageSubBlock.hpp   |   4 +
 storage/SplitRowStoreValueAccessor.hpp          |  22 ++
 storage/StorageBlock.cpp                        |  16 ++
 storage/StorageBlock.hpp                        |   3 +
 storage/TupleStorageSubBlock.hpp                |   6 +
 storage/ValueAccessor.hpp                       |  72 +++++
 types/containers/ColumnVectorsValueAccessor.hpp |  22 ++
 14 files changed, 609 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 9e4b1b6..b792a7b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -213,6 +213,7 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_HashTable
+                      quickstep_storage_InsertContext
                       quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 4a91f86..1a34e32 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -20,6 +20,7 @@
 #include "relational_operators/HashJoinOperator.hpp"
 
 #include <algorithm>
+#include <map>
 #include <memory>
 #include <unordered_map>
 #include <utility>
@@ -35,6 +36,7 @@
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/HashTable.hpp"
+#include "storage/InsertContext.hpp"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -63,12 +65,16 @@ namespace quickstep {
 
 namespace {
 
+typedef std::vector<std::pair<tuple_id, tuple_id>> VectorOfPairs;
+typedef std::pair<std::vector<tuple_id>, std::vector<tuple_id>> PairOfVectors;
+
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
 // tuples from the inner relation. It stores matching tuple ID pairs
-// in an unordered_map keyed by inner block ID.
-class MapBasedJoinedTupleCollector {
+// in an unordered_map keyed by inner block ID and a vector of
+// pairs of (build-tupleID, probe-tuple-ID).
+class VectorsOfPairsJoinedTuplesCollector {
  public:
-  MapBasedJoinedTupleCollector() {
+  VectorsOfPairsJoinedTuplesCollector() {
   }
 
   template <typename ValueAccessorT>
@@ -81,8 +87,7 @@ class MapBasedJoinedTupleCollector {
   // key is inner block_id, values are vectors of joined tuple ID pairs with
   // tuple ID from the inner block on the left and the outer block on the
   // right.
-  inline std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>>*
-      getJoinedTuples() {
+  inline std::unordered_map<block_id, VectorOfPairs>* getJoinedTuples() {
     return &joined_tuples_;
   }
 
@@ -92,7 +97,35 @@ class MapBasedJoinedTupleCollector {
   // cross-product of all tuples from both blocks, but simply using pairs of
   // tuple-IDs is expected to be more space efficient if the result set is less
   // than 1/64 the cardinality of the cross-product.
-  std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
+  std::unordered_map<block_id, VectorOfPairs> joined_tuples_;
+};
+
+// Another collector using an unordered_map keyed on inner block just like above,
+// except that it uses of a pair of (build-tupleIDs-vector, probe-tuple-IDs-vector).
+class PairsOfVectorsJoinedTuplesCollector {
+ public:
+  PairsOfVectorsJoinedTuplesCollector() {
+  }
+
+  template <typename ValueAccessorT>
+  inline void operator()(const ValueAccessorT &accessor,
+                         const TupleReference &tref) {
+    auto &entry = joined_tuples_[tref.block];
+    entry.first.emplace_back(tref.tuple);
+    entry.second.emplace_back(accessor.getCurrentPosition());
+  }
+
+  // Get a mutable pointer to the collected map of joined tuple ID pairs. The
+  // key is inner block_id, value is a pair consisting of
+  // inner block tuple IDs (first) and outer block tuple IDs (second).
+  inline std::unordered_map<block_id, PairOfVectors>* getJoinedTuples() {
+    return &joined_tuples_;
+  }
+
+ private:
+  std::unordered_map<
+    block_id,
+    std::pair<std::vector<tuple_id>, std::vector<tuple_id>>> joined_tuples_;
 };
 
 class SemiAntiJoinTupleCollector {
@@ -432,7 +465,7 @@ void HashInnerJoinWorkOrder::execute() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  MapBasedJoinedTupleCollector collector;
+  PairsOfVectorsJoinedTuplesCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -450,13 +483,49 @@ void HashInnerJoinWorkOrder::execute() {
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
 
-  for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+  std::map<attribute_id, attribute_id> build_attribute_map;
+  std::map<attribute_id, attribute_id> probe_attribute_map;
+  std::map<attribute_id, attribute_id> non_trivial_attribute_map;
+  std::vector<const Scalar *> non_trivial_expressions;
+  for (std::size_t i = 0; i < selection_.size(); ++i) {
+    const Scalar *scalar = selection_[i].get();
+    if (scalar->getDataSource() == Scalar::ScalarDataSource::kAttribute) {
+      const ScalarAttribute *scalar_attr =
+          static_cast<const ScalarAttribute *>(scalar);
+      const relation_id scalar_attr_relation_id =
+          scalar_attr->getRelationIdForValueAccessor();
+      const attribute_id scalar_attr_id =
+          scalar_attr->getAttributeIdForValueAccessor();
+
+      if (scalar_attr_relation_id == build_relation_id) {
+        build_attribute_map.emplace(scalar_attr_id, i);
+      } else {
+        DCHECK_EQ(probe_relation_id, scalar_attr->getRelationIdForValueAccessor());
+        probe_attribute_map.emplace(scalar_attr_id, i);
+      }
+    } else {
+      non_trivial_attribute_map.emplace(non_trivial_expressions.size(), i);
+      non_trivial_expressions.emplace_back(scalar);
+    }
+  }
+
+  std::unique_ptr<InsertContext> insert_context(
+      new InsertContext(output_destination_->getRelation()));
+  insert_context->addSource(build_attribute_map);
+  insert_context->addSource(probe_attribute_map);
+  insert_context->addSource(non_trivial_attribute_map);
+
+  MutableBlockReference output_block;
+  for (std::pair<const block_id, PairOfVectors>
            &build_block_entry : *collector.getJoinedTuples()) {
     BlockReference build_block =
         storage_manager_->getBlock(build_block_entry.first, build_relation_);
     const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
     std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
 
+    const std::vector<tuple_id> &build_tids = build_block_entry.second.first;
+    const std::vector<tuple_id> &probe_tids = build_block_entry.second.second;
+
     // Evaluate '*residual_predicate_', if any.
     //
     // TODO(chasseur): We might consider implementing true vectorized
@@ -468,17 +537,19 @@ void HashInnerJoinWorkOrder::execute() {
     // hash join is below a reasonable threshold so that we don't blow up
     // temporary memory requirements to an unreasonable degree.
     if (residual_predicate_ != nullptr) {
-      std::vector<std::pair<tuple_id, tuple_id>> filtered_matches;
+      PairOfVectors filtered_matches;
 
-      for (const std::pair<tuple_id, tuple_id> &hash_match
-           : build_block_entry.second) {
+      for (std::size_t i = 0; i < build_tids.size(); ++i) {
+        const tuple_id build_tid = build_tids[i];
+        const tuple_id probe_tid = probe_tids[i];
         if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
                                                         build_relation_id,
-                                                        hash_match.first,
+                                                        build_tid,
                                                         *probe_accessor,
                                                         probe_relation_id,
-                                                        hash_match.second)) {
-          filtered_matches.emplace_back(hash_match);
+                                                        probe_tid)) {
+          filtered_matches.first.emplace_back(build_tid);
+          filtered_matches.second.emplace_back(probe_tid);
         }
       }
 
@@ -502,22 +573,36 @@ void HashInnerJoinWorkOrder::execute() {
     // matching tuples in each individual inner block but very many inner
     // blocks with at least one match).
     ColumnVectorsValueAccessor temp_result;
-    for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
-         selection_cit != selection_.end();
-         ++selection_cit) {
-      temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id,
-                                                                  build_accessor.get(),
-                                                                  probe_relation_id,
-                                                                  probe_accessor.get(),
-                                                                  build_block_entry.second));
+    if (non_trivial_expressions.size() > 0) {
+      VectorOfPairs zipped_joined_tuple_ids;
+      zipped_joined_tuple_ids.reserve(build_tids.size());
+      for (std::size_t i = 0; i < build_tids.size(); ++i) {
+        zipped_joined_tuple_ids.emplace_back(build_tids[i], probe_tids[i]);
+      }
+
+      for (auto selection_cit = non_trivial_expressions.begin();
+           selection_cit != non_trivial_expressions.end();
+           ++selection_cit) {
+        temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id,
+                                                                    build_accessor.get(),
+                                                                    probe_relation_id,
+                                                                    probe_accessor.get(),
+                                                                    zipped_joined_tuple_ids));
+      }
     }
 
-    // NOTE(chasseur): calling the bulk-insert method of InsertDestination once
-    // for each pair of joined blocks incurs some extra overhead that could be
-    // avoided by keeping checked-out MutableBlockReferences across iterations
-    // of this loop, but that would get messy when combined with partitioning.
-    output_destination_->bulkInsertTuples(&temp_result);
+    std::unique_ptr<ValueAccessor> ordered_build_accessor(
+        build_accessor->createSharedOrderedTupleIdSequenceAdapterVirtual(build_tids));
+    std::unique_ptr<ValueAccessor> ordered_probe_accessor(
+        probe_accessor->createSharedOrderedTupleIdSequenceAdapterVirtual(probe_tids));
+
+    output_destination_->bulkInsertTuples(
+        { ordered_build_accessor.get(), ordered_probe_accessor.get(), &temp_result },
+        insert_context.get(),
+        &output_block);
   }
+
+  output_destination_->returnBlock(&output_block);
 }
 
 void HashSemiJoinWorkOrder::execute() {
@@ -550,7 +635,7 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   // We collect all the matching probe relation tuples, as there's a residual
   // preidcate that needs to be applied after collecting these matches.
-  MapBasedJoinedTupleCollector collector;
+  VectorsOfPairsJoinedTuplesCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -759,7 +844,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  MapBasedJoinedTupleCollector collector;
+  VectorsOfPairsJoinedTuplesCollector collector;
   // We probe the hash table and get all the matches. Unlike
   // executeWithoutResidualPredicate(), we have to collect all the matching
   // tuples, because after this step we still have to evalute the residual

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 61a8a99..f3869c9 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -219,6 +219,7 @@ add_library(quickstep_storage_HashTableKeyManager ../empty_src.cpp HashTableKeyM
 add_library(quickstep_storage_HashTablePool ../empty_src.cpp HashTablePool.hpp)
 add_library(quickstep_storage_IndexSubBlock ../empty_src.cpp IndexSubBlock.hpp)
 add_library(quickstep_storage_IndexSubBlockDescriptionFactory ../empty_src.cpp IndexSubBlockDescriptionFactory.hpp)
+add_library(quickstep_storage_InsertContext ../empty_src.cpp InsertContext.hpp)
 add_library(quickstep_storage_InsertDestination InsertDestination.cpp InsertDestination.hpp)
 add_library(quickstep_storage_InsertDestinationInterface
             ../empty_src.cpp
@@ -782,6 +783,13 @@ target_link_libraries(quickstep_storage_IndexSubBlock
 target_link_libraries(quickstep_storage_IndexSubBlockDescriptionFactory
                       quickstep_storage_StorageBlockLayout
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_InsertContext
+                      quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_InsertDestination
                       glog
                       gtest
@@ -940,6 +948,7 @@ target_link_libraries(quickstep_storage_SimpleScalarSeparateChainingHashTable
 target_link_libraries(quickstep_storage_SplitRowStoreTupleStorageSubBlock
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_expressions_predicate_PredicateCost
+                      quickstep_storage_InsertContext
                       quickstep_storage_SplitRowStoreValueAccessor
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_StorageErrors
@@ -1175,6 +1184,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_HashTablePool
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_IndexSubBlockDescriptionFactory
+                      quickstep_storage_InsertContext
                       quickstep_storage_InsertDestination
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_InsertDestination_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/InsertContext.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertContext.hpp b/storage/InsertContext.hpp
new file mode 100644
index 0000000..b321528
--- /dev/null
+++ b/storage/InsertContext.hpp
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_INSERT_CONTEXT_HPP_
+#define QUICKSTEP_STORAGE_INSERT_CONTEXT_HPP_
+
+#include <cstddef>
+#include <functional>
+#include <map>
+
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+struct CopyGroup {
+  CopyGroup(const attribute_id source_attr_id_in,
+            const std::size_t bytes_to_advance_in,
+            const std::size_t bytes_to_copy_in)
+      : source_attr_id(source_attr_id_in),
+        bytes_to_advance(bytes_to_advance_in),
+        bytes_to_copy(bytes_to_copy_in) {}
+
+  const attribute_id source_attr_id;
+  const std::size_t bytes_to_advance;
+  const std::size_t bytes_to_copy;
+};
+
+class CopyList {
+ public:
+  CopyList(const std::vector<CopyGroup> &copy_groups) {
+    for (const auto &copy_group : copy_groups) {
+      stride_copy_functors_.emplace_back(
+          CreateStrideCopyFunctorHelper(copy_group.bytes_to_copy,
+                                        copy_group.bytes_to_advance,
+                                        copy_group.source_attr_id));
+    }
+  }
+
+  inline std::size_t bulkInsertTuples(ValueAccessor *accessor,
+                                      const std::size_t stride_width,
+                                      const std::size_t num_tuples,
+                                      void *storage) const {
+//    std::cerr << "Call CopyList::bulkInsertTuples, copiers = "
+//              << stride_copy_functors_.size() << "\n";
+    DCHECK(!stride_copy_functors_.empty());
+
+    accessor->punctuateVirtual();
+    auto func_it = stride_copy_functors_.begin();
+    const std::size_t num_tuples_inserted =
+        (*func_it)(accessor, storage, stride_width, num_tuples);
+
+    for (++func_it; func_it != stride_copy_functors_.end(); ++func_it) {
+      accessor->rewindVirtual();
+      const std::size_t other_num_tuples_inserted =
+          (*func_it)(accessor, storage, stride_width, num_tuples);
+
+      (void)other_num_tuples_inserted;
+      DCHECK_EQ(num_tuples_inserted, other_num_tuples_inserted);
+    }
+
+    return num_tuples_inserted;
+  }
+
+ private:
+  typedef std::function<std::size_t (ValueAccessor *, void *, std::size_t, std::size_t)> StrideCopyFunctor;
+  std::vector<StrideCopyFunctor> stride_copy_functors_;
+
+  template <typename ...ArgTypes>
+  static StrideCopyFunctor CreateStrideCopyFunctorHelper(
+      const std::size_t bytes_to_copy,
+      ArgTypes &&...args) {
+    switch (bytes_to_copy) {
+      case 4:
+        return CreateStrideCopyFunctor<4>(std::forward<ArgTypes>(args)...);
+      case 8:
+        return CreateStrideCopyFunctor<8>(std::forward<ArgTypes>(args)...);
+      case 12:
+        return CreateStrideCopyFunctor<12>(std::forward<ArgTypes>(args)...);
+      case 16:
+        return CreateStrideCopyFunctor<16>(std::forward<ArgTypes>(args)...);
+      case 20:
+        return CreateStrideCopyFunctor<20>(std::forward<ArgTypes>(args)...);
+      case 24:
+        return CreateStrideCopyFunctor<24>(std::forward<ArgTypes>(args)...);
+      case 28:
+        return CreateStrideCopyFunctor<28>(std::forward<ArgTypes>(args)...);
+      case 32:
+        return CreateStrideCopyFunctor<32>(std::forward<ArgTypes>(args)...);
+      default:
+        return CreateStrideCopyFunctor(bytes_to_copy, std::forward<ArgTypes>(args)...);
+    }
+  }
+
+  static StrideCopyFunctor CreateStrideCopyFunctor(
+      const std::size_t bytes_to_copy,
+      const std::size_t bytes_to_advance,
+      const attribute_id source_attr_id) {
+    return [source_attr_id, bytes_to_advance, bytes_to_copy](
+        ValueAccessor *accessor,
+        void *storage,
+        std::size_t stride_width,
+        std::size_t num_tuples) -> std::size_t {
+      return InvokeOnAnyValueAccessor(
+          accessor,
+          [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+        char *dst = static_cast<char *>(storage) + bytes_to_advance;
+        std::size_t pos = 0;
+        while (pos < num_tuples && accessor->next()) {
+          std::memcpy(dst,
+                      accessor->template getUntypedValue<false>(source_attr_id),
+                      bytes_to_copy);
+          dst += stride_width;
+          ++pos;
+        }
+        return pos;
+      });
+    };
+  }
+
+  template <std::size_t bytes_to_copy>
+  static StrideCopyFunctor CreateStrideCopyFunctor(
+      const std::size_t bytes_to_advance,
+      const attribute_id source_attr_id) {
+    return [source_attr_id, bytes_to_advance](
+        ValueAccessor *accessor,
+        void *storage,
+        std::size_t stride_width,
+        std::size_t num_tuples) -> std::size_t {
+      return InvokeOnAnyValueAccessor(
+          accessor,
+          [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+//        std::cerr << "HERE!\n";
+        char *dst = static_cast<char *>(storage) + bytes_to_advance;
+        std::size_t pos = 0;
+        while (pos < num_tuples && accessor->next()) {
+          std::memcpy(dst,
+                      accessor->template getUntypedValue<false>(source_attr_id),
+                      bytes_to_copy);
+          dst += stride_width;
+          ++pos;
+        }
+//        std::cerr << "num_tuples = " << num_tuples << "\n";
+//        std::cerr << "pos = " << pos << "\n";
+        return pos;
+      });
+    };
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(CopyList);
+};
+
+class InsertContext {
+ public:
+  explicit InsertContext(const CatalogRelationSchema &output_relation)
+      : output_relation_(output_relation) {}
+
+  void addSource(const std::map<attribute_id, attribute_id> &attribute_map) {
+    std::vector<CopyGroup> copy_groups;
+
+    if (!attribute_map.empty()) {
+      auto attr_map_it = attribute_map.begin();
+      attribute_id init_src_attr_id = attr_map_it->first;
+      attribute_id init_dst_attr_id = attr_map_it->second;
+      std::size_t accum_length =
+          output_relation_.getAttributeById(init_dst_attr_id)->getType().maximumByteLength();
+
+      attribute_id prev_src_attr_id = init_src_attr_id;
+      attribute_id prev_dst_attr_id = init_dst_attr_id;
+      while ((++attr_map_it) != attribute_map.end()) {
+        attribute_id curr_src_attr_id = attr_map_it->first;
+        attribute_id curr_dst_attr_id = attr_map_it->second;
+
+        const std::size_t attr_length =
+            output_relation_.getAttributeById(curr_dst_attr_id)->getType().maximumByteLength();
+
+        if (curr_src_attr_id == prev_src_attr_id + 1 &&
+            curr_dst_attr_id == prev_dst_attr_id + 1) {
+          accum_length += attr_length;
+        } else {
+          // Add a copy group
+          copy_groups.emplace_back(init_src_attr_id,
+                                   output_relation_.getFixedLengthAttributeOffset(init_dst_attr_id),
+                                   accum_length);
+
+          init_src_attr_id = curr_src_attr_id;
+          init_dst_attr_id = curr_dst_attr_id;
+          accum_length = attr_length;
+        }
+
+        prev_src_attr_id = curr_src_attr_id;
+        prev_dst_attr_id = curr_dst_attr_id;
+      }
+
+      // Add the last copy group
+      copy_groups.emplace_back(init_src_attr_id,
+                               output_relation_.getFixedLengthAttributeOffset(init_dst_attr_id),
+                               accum_length);
+
+      for (const auto &cg : copy_groups) {
+        std::cout << cg.source_attr_id << ": " << cg.bytes_to_copy << " @" << cg.bytes_to_advance << "\n";
+      }
+      non_empty_copy_indices_.emplace_back(copy_lists_.size());
+    }
+
+    copy_lists_.emplace_back(std::make_unique<CopyList>(copy_groups));
+  }
+
+  std::size_t bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                               const std::size_t stride_width,
+                               const std::size_t num_tuples,
+                               void *storage) {
+    DCHECK_EQ(copy_lists_.size(), accessors.size());
+    DCHECK(!non_empty_copy_indices_.empty());
+
+    auto idx_it = non_empty_copy_indices_.begin();
+    const std::size_t num_tuples_inserted =
+        copy_lists_[*idx_it]->bulkInsertTuples(
+            accessors[*idx_it], stride_width, num_tuples, storage);
+    iteration_finished_ = accessors[*idx_it]->iterationFinishedVirtual();
+
+    for (++idx_it; idx_it != non_empty_copy_indices_.end(); ++idx_it) {
+      const std::size_t other_num_tuples_inserted =
+          copy_lists_[*idx_it]->bulkInsertTuples(
+              accessors[*idx_it], stride_width, num_tuples, storage);
+
+      (void)other_num_tuples_inserted;
+      DCHECK_EQ(num_tuples_inserted, other_num_tuples_inserted);
+      DCHECK_EQ(iteration_finished_, accessors[*idx_it]->iterationFinishedVirtual());
+    }
+
+    return num_tuples_inserted;
+  }
+
+  void beginIteration() {
+    iteration_finished_ = false;
+  }
+
+  bool iterationFinished() const {
+    return iteration_finished_;
+  }
+
+ private:
+  const CatalogRelationSchema &output_relation_;
+  std::vector<std::unique_ptr<CopyList>> copy_lists_;
+  std::vector<std::size_t> non_empty_copy_indices_;
+
+  bool iteration_finished_;
+
+  DISALLOW_COPY_AND_ASSIGN(InsertContext);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_INSERT_CONTEXT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 5e83453..5c7d430 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -33,6 +33,7 @@
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/InsertContext.hpp"
 #include "storage/InsertDestination.pb.h"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -221,6 +222,31 @@ void InsertDestination::bulkInsertTuples(ValueAccessor *accessor, bool always_ma
   });
 }
 
+void InsertDestination::bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                                         InsertContext *insert_context,
+                                         MutableBlockReference *output_block) {
+  DCHECK_GE(accessors.size(), 1u);
+
+  insert_context->beginIteration();
+  while (!insert_context->iterationFinished()) {
+    // FIXME(chasseur): Deal with TupleTooLargeForBlock exception.
+    if (!output_block->valid()) {
+      *output_block = this->getBlockForInsertion();
+    }
+    if ((*output_block)->bulkInsertTuples(accessors, insert_context) == 0 ||
+        !insert_context->iterationFinished()) {
+      // output_block is full.
+      this->returnBlock(std::move(*output_block), true);
+    }
+  }
+}
+
+void InsertDestination::returnBlock(MutableBlockReference *output_block) {
+  if (output_block->valid()) {
+    this->returnBlock(std::move(*output_block), false);
+  }
+}
+
 void InsertDestination::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map,
     ValueAccessor *accessor,
@@ -312,6 +338,7 @@ void AlwaysCreateBlockInsertDestination::returnBlock(MutableBlockReference &&blo
   // Due to the nature of this InsertDestination, a block will always be
   // streamed no matter if it's full or not.
   sendBlockFilledMessage(block->getID());
+  block.release();
 }
 
 MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
@@ -389,6 +416,7 @@ void BlockPoolInsertDestination::returnBlock(MutableBlockReference &&block, cons
   }
   // Note that the block will only be sent if it's full (true).
   sendBlockFilledMessage(block->getID());
+  block.release();
 }
 
 const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInternal() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 408e76b..ca2ed57 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -52,6 +52,7 @@ namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
+class InsertContext;
 class StorageManager;
 class ValueAccessor;
 
@@ -75,7 +76,7 @@ class InsertDestination : public InsertDestinationInterface {
    * @brief Constructor.
    *
    * @param relation The relation to insert tuples into.
-   * @param layout The layout to use for any newly-created blocks. If NULL,
+   * @param layout The layout to use for any n`ewly-created blocks. If NULL,
    *        defaults to relation's default layout.
    * @param storage_manager The StorageManager to use.
    * @param relational_op_index The index of the relational operator in the
@@ -147,6 +148,12 @@ class InsertDestination : public InsertDestinationInterface {
 
   void bulkInsertTuples(ValueAccessor *accessor, bool always_mark_full = false) override;
 
+  void bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                        InsertContext *insert_context,
+                        MutableBlockReference *output_block);
+
+  void returnBlock(MutableBlockReference *block);
+
   void bulkInsertTuplesWithRemappedAttributes(
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index f955c99..43aa40d 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "catalog/CatalogRelationSchema.hpp"
+#include "storage/InsertContext.hpp"
 #include "storage/SplitRowStoreValueAccessor.hpp"
 #include "storage/StorageBlockLayout.pb.h"
 #include "storage/StorageErrors.hpp"
@@ -386,6 +387,27 @@ tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(ValueAccessor *acce
   return header_->num_tuples - original_num_tuples;
 }
 
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(
+    const std::vector<ValueAccessor *> &accessors,
+    InsertContext *insert_context) {
+  const std::size_t num_available_slots =
+      tuple_storage_bytes_ / tuple_slot_bytes_ - header_->num_tuples - 1;
+  void *tuple_slot_start =
+      static_cast<char*>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_;
+
+  const std::size_t num_tuples_inserted =
+      insert_context->bulkInsertTuples(accessors,
+                                       tuple_slot_bytes_,
+                                       num_available_slots,
+                                       tuple_slot_start);
+
+  occupancy_bitmap_->setBitRange(header_->num_tuples, num_tuples_inserted, true);
+  header_->num_tuples += num_tuples_inserted;
+  header_->max_tid = header_->num_tuples - 1;
+
+  return num_tuples_inserted;
+}
+
 tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map,
     ValueAccessor *accessor) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index a930103..84b036d 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -39,6 +39,7 @@
 namespace quickstep {
 
 class ComparisonPredicate;
+class InsertContext;
 class Tuple;
 class TupleStorageSubBlockDescription;
 class ValueAccessor;
@@ -151,6 +152,9 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
 
   tuple_id bulkInsertTuples(ValueAccessor *accessor) override;
 
+  tuple_id bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                            InsertContext *insert_context) override;
+
   tuple_id bulkInsertTuplesWithRemappedAttributes(
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor) override;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/SplitRowStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreValueAccessor.hpp b/storage/SplitRowStoreValueAccessor.hpp
index 951a20a..c475361 100644
--- a/storage/SplitRowStoreValueAccessor.hpp
+++ b/storage/SplitRowStoreValueAccessor.hpp
@@ -97,6 +97,14 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     return num_tuples_;
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -284,6 +292,14 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -318,6 +334,11 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -349,6 +370,7 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
   const std::size_t per_tuple_null_bitmap_bytes_;
 
   std::size_t current_position_;
+  std::size_t punctuated_position_;
 
   // Initialized from 'occupancy_bitmap_' on-demand.
   mutable std::unique_ptr<TupleIdSequence> tuple_id_sequence_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ea74ee6..9029cd7 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -267,6 +267,22 @@ tuple_id StorageBlock::bulkInsertTuples(ValueAccessor *accessor) {
   return num_inserted;
 }
 
+tuple_id StorageBlock::bulkInsertTuples(
+    const std::vector<ValueAccessor *> &accessors,
+    InsertContext *insert_context) {
+  const tuple_id num_inserted =
+      tuple_store_->bulkInsertTuples(accessors, insert_context);
+  if (num_inserted != 0) {
+    invalidateAllIndexes();
+    dirty_ = true;
+  } else if (tuple_store_->isEmpty()) {
+    if (!accessors.front()->iterationFinishedVirtual()) {
+      throw TupleTooLargeForBlock(0);
+    }
+  }
+  return num_inserted;
+}
+
 tuple_id StorageBlock::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map,
     ValueAccessor *accessor) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 56b3bdc..9f0acb9 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
 class ColumnVectorsValueAccessor;
+class InsertContext;
 class InsertDestinationInterface;
 class Predicate;
 class Scalar;
@@ -283,6 +284,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   tuple_id bulkInsertTuples(ValueAccessor *accessor);
 
+  tuple_id bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                            InsertContext *insert_context);
   /**
    * @brief Insert as many tuples as possible from a ValueAccessor (all of the
    *        tuples accessible or as many as will fit in this StorageBlock) as a

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/TupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/TupleStorageSubBlock.hpp b/storage/TupleStorageSubBlock.hpp
index aed6eea..65f990a 100644
--- a/storage/TupleStorageSubBlock.hpp
+++ b/storage/TupleStorageSubBlock.hpp
@@ -34,6 +34,7 @@ namespace quickstep {
 
 class CatalogRelationSchema;
 class ComparisonPredicate;
+class InsertContext;
 class Tuple;
 class TupleStorageSubBlockDescription;
 class ValueAccessor;
@@ -245,6 +246,11 @@ class TupleStorageSubBlock {
    **/
   virtual tuple_id bulkInsertTuples(ValueAccessor *accessor) = 0;
 
+  virtual tuple_id bulkInsertTuples(const std::vector<ValueAccessor *> &accessors,
+                                    InsertContext *insert_context) {
+    FATAL_ERROR("Not implemented");
+  }
+
   /**
    * @brief Insert as many tuples as possible from a ValueAccessor (all of the
    *        tuples accessible or as many as will fit in this

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/storage/ValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/ValueAccessor.hpp b/storage/ValueAccessor.hpp
index e4a2906..9ce911e 100644
--- a/storage/ValueAccessor.hpp
+++ b/storage/ValueAccessor.hpp
@@ -184,6 +184,10 @@ class ValueAccessor {
    **/
   virtual tuple_id getNumTuplesVirtual() const = 0;
 
+  virtual void punctuateVirtual() = 0;
+
+  virtual void rewindVirtual() = 0;
+
   /**
    * @brief Returns whether this accessor has a fast strided ColumnAccessor available
    *        that can be used to optimize memory access in a tight loop iteration
@@ -305,6 +309,8 @@ class ValueAccessor {
   virtual ValueAccessor* createSharedTupleIdSequenceAdapterVirtual(
       const TupleIdSequence &id_sequence) = 0;
 
+  virtual ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) = 0;
   /**
    * @brief Get a TupleIdSequence indicating which positions this ValueAccessor
    *        is iterating over.
@@ -387,6 +393,14 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return id_sequence_.numTuples();
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -479,6 +493,14 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -513,6 +535,11 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -522,6 +549,7 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
   std::unique_ptr<InternalValueAccessorType> owned_accessor_;
   const TupleIdSequence &id_sequence_;
   TupleIdSequence::const_iterator current_position_;
+  TupleIdSequence::const_iterator punctuated_position_;
 
   DISALLOW_COPY_AND_ASSIGN(TupleIdSequenceAdapterValueAccessor);
 };
@@ -589,6 +617,14 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return id_sequence_.size();
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -685,6 +721,14 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -719,6 +763,11 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -728,6 +777,7 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
   std::unique_ptr<InternalValueAccessorType> owned_accessor_;
   const OrderedTupleIdSequence &id_sequence_;
   OrderedTupleIdSequence::size_type current_position_;
+  OrderedTupleIdSequence::size_type punctuated_position_;
 
   DISALLOW_COPY_AND_ASSIGN(OrderedTupleIdSequenceAdapterValueAccessor);
 };
@@ -785,6 +835,14 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return helper_.numPackedTuples();
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_tuple_;
+  }
+
+  inline void rewind() {
+    current_tuple_ = punctuated_position_;
+  }
+
   template <bool check_null = true>
   inline const void* getUntypedValue(const attribute_id attr_id) const {
     return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_tuple_);
@@ -911,6 +969,14 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -945,6 +1011,11 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -962,6 +1033,7 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
   const CatalogRelationSchema &relation_;
   HelperT helper_;
   tuple_id current_tuple_;
+  tuple_id punctuated_position_;
 
   friend TupleStorageSubBlockT;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c7fdc360/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index fe413a0..f8022a0 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -121,6 +121,14 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return column_length_;
   }
 
+  inline void punctuate() {
+    punctuated_position_ = current_position_;
+  }
+
+  inline void rewind() {
+    current_position_ = punctuated_position_;
+  }
+
   /**
    * @brief Get a pointer to a ColumnAccessor object that provides a fast strided memory
    *        access on the underlying storage block.
@@ -252,6 +260,14 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return getNumTuples();
   }
 
+  void punctuateVirtual() override {
+    punctuate();
+  }
+
+  void rewindVirtual() override {
+    rewind();
+  }
+
   const void* getUntypedValueVirtual(const attribute_id attr_id) const override {
     return getUntypedValue(attr_id);
   }
@@ -286,6 +302,11 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return createSharedTupleIdSequenceAdapter(id_sequence);
   }
 
+  ValueAccessor* createSharedOrderedTupleIdSequenceAdapterVirtual(
+      const OrderedTupleIdSequence &id_sequence) override {
+    return createSharedOrderedTupleIdSequenceAdapter(id_sequence);
+  }
+
   const TupleIdSequence* getTupleIdSequenceVirtual() const override {
     return getTupleIdSequence();
   }
@@ -304,6 +325,7 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
   std::vector<bool> column_native_;
   std::size_t column_length_;
   std::size_t current_position_;
+  std::size_t punctuated_position_;
   ScopedDeleter deleter_;
 
   DISALLOW_COPY_AND_ASSIGN(ColumnVectorsValueAccessor);