You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2018/11/01 00:00:22 UTC

[1/3] kudu git commit: deltas: fuzz tests for delta file and dms

Repository: kudu
Updated Branches:
  refs/heads/master 75e264cb2 -> 1043f3193


deltas: fuzz tests for delta file and dms

Here are new fuzz tests for deltafile-test and deltamemstore-test that
generate random deltas and iterate over them using random snapshots, testing
the various delta iterator functions. I found this to be a simpler
environment in which to debug delta-related issues, vs. more heavy-weight
tests (i.e. fuzz-itest).

The test introduces MirroredDeltas, a utility that's useful for tracking
deltas that are written into a delta file or DMS.

Change-Id: I539ff0f2055cf398a42efb824238188230e25516
Reviewed-on: http://gerrit.cloudera.org:8080/11140
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/04468d0b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/04468d0b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/04468d0b

Branch: refs/heads/master
Commit: 04468d0b4f9cc8d9dd2ba2e5654b7edd96513c33
Parents: 75e264c
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Jul 30 17:26:47 2018 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Oct 31 23:57:57 2018 +0000

----------------------------------------------------------------------
 src/kudu/common/columnblock.h         |  29 +-
 src/kudu/tablet/CMakeLists.txt        |   8 +-
 src/kudu/tablet/delta_key.h           |  20 +
 src/kudu/tablet/delta_store.h         |   5 +-
 src/kudu/tablet/deltafile-test.cc     |  42 ++
 src/kudu/tablet/deltamemstore-test.cc |  34 ++
 src/kudu/tablet/deltamemstore.cc      |   5 +-
 src/kudu/tablet/tablet-test-util.cc   |  36 ++
 src/kudu/tablet/tablet-test-util.h    | 651 ++++++++++++++++++++++++++++-
 9 files changed, 810 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/common/columnblock.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/columnblock.h b/src/kudu/common/columnblock.h
index 72046ba..fe23093 100644
--- a/src/kudu/common/columnblock.h
+++ b/src/kudu/common/columnblock.h
@@ -17,8 +17,10 @@
 #ifndef KUDU_COMMON_COLUMNBLOCK_H
 #define KUDU_COMMON_COLUMNBLOCK_H
 
-#include "kudu/common/types.h"
+#include <string>
+
 #include "kudu/common/row.h"
+#include "kudu/common/types.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/memory/arena.h"
@@ -104,6 +106,21 @@ class ColumnBlock {
     return type_;
   }
 
+  std::string ToString() const {
+    std::string s;
+    for (int i = 0; i < nrows(); i++) {
+      if (i > 0) {
+        s.append(" ");
+      }
+      if (is_nullable() && is_null(i)) {
+        s.append("NULL");
+      } else {
+        type_->AppendDebugStringForValue(cell_ptr(i), &s);
+      }
+    }
+    return s;
+  }
+
  private:
   friend class ColumnBlockCell;
   friend class ColumnDataView;
@@ -247,17 +264,19 @@ class ScopedColumnBlock : public ColumnBlock {
  public:
   typedef typename TypeTraits<type>::cpp_type cpp_type;
 
-  explicit ScopedColumnBlock(size_t n_rows)
+  explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true)
     : ColumnBlock(GetTypeInfo(type),
-                  new uint8_t[BitmapSize(n_rows)],
+                  allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr,
                   new cpp_type[n_rows],
                   n_rows,
                   new Arena(1024)),
       null_bitmap_(null_bitmap()),
       data_(reinterpret_cast<cpp_type *>(data())),
       arena_(arena()) {
-    // All rows begin null.
-    BitmapChangeBits(null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false);
+    if (allow_nulls) {
+      // All rows begin null.
+      BitmapChangeBits(null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false);
+    }
   }
 
   const cpp_type &operator[](size_t idx) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index 7c22c62..8b048ee 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -84,7 +84,13 @@ target_link_libraries(tablet
   kudu_util
   consensus)
 
-SET_KUDU_TEST_LINK_LIBS(tablet)
+add_library(tablet_test_util
+  tablet-test-util.cc)
+target_link_libraries(tablet_test_util
+  kudu_common
+  log_proto)
+
+SET_KUDU_TEST_LINK_LIBS(tablet tablet_test_util)
 ADD_KUDU_TEST(all_types-scan-correctness-test NUM_SHARDS 8 PROCESSORS 2)
 ADD_KUDU_TEST(cfile_set-test)
 ADD_KUDU_TEST(compaction-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/delta_key.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_key.h b/src/kudu/tablet/delta_key.h
index 1beddc1..5a92411 100644
--- a/src/kudu/tablet/delta_key.h
+++ b/src/kudu/tablet/delta_key.h
@@ -140,6 +140,26 @@ inline int DeltaKey::CompareTo<UNDO>(const DeltaKey &other) const {
   return other.timestamp_.CompareTo(timestamp_);
 }
 
+template<DeltaType Type>
+struct DeltaKeyLessThanFunctor {
+  bool operator() (const DeltaKey& a, const DeltaKey& b) const {
+    return a.CompareTo<Type>(b) < 0;
+  }
+};
+
+template<DeltaType Type>
+struct DeltaKeyEqualToFunctor {
+  bool operator() (const DeltaKey& a, const DeltaKey& b) const {
+    return a.CompareTo<Type>(b) == 0;
+  }
+};
+
+struct DeltaKeyHashFunctor {
+  size_t operator() (const DeltaKey& key) const {
+    return (key.row_idx() * 31ULL) + key.timestamp().ToUint64();
+  }
+};
+
 } // namespace tablet
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index d4ea390..6358df2 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -178,8 +178,9 @@ class DeltaIterator {
   // Iterate through all deltas, adding deltas for columns not
   // specified in 'col_ids' to 'out'.
   //
-  // The delta objects will be allocated out the provided Arena which
-  // must be non-NULL.
+  // Unlike CollectMutations, the iterator's MVCC snapshots are ignored; all
+  // deltas are considered relevant.
+  // The delta objects will be allocated out of the provided Arena, which must be non-Null.
   // Must have called PrepareBatch() with flag = PREPARE_FOR_COLLECT.
   virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                                  std::vector<DeltaKeyAndUpdate>* out,

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/deltafile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 0096e28..972d6ec 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -44,14 +45,17 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset.h"
+#include "kudu/tablet/tablet-test-util.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -63,10 +67,12 @@ DEFINE_int32(n_verify, 1, "number of times to verify the updates"
              "(useful for benchmarks");
 
 using std::is_sorted;
+using std::pair;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace tablet {
@@ -417,5 +423,41 @@ TEST_F(TestDeltaFile, TestEmptyFileIsAborted) {
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 }
 
+template <typename T>
+class DeltaTypeTestDeltaFile : public TestDeltaFile {
+};
+
+using MyTypes = ::testing::Types<DeltaTypeSelector<REDO>, DeltaTypeSelector<UNDO>>;
+TYPED_TEST_CASE(DeltaTypeTestDeltaFile, MyTypes);
+
+// Generates a series of random deltas,  writes them to a DeltaFile, reads them
+// back using a DeltaFileIterator, and verifies the results.
+TYPED_TEST(DeltaTypeTestDeltaFile, TestFuzz) {
+  // Arbitrary constants to control the running time and coverage of the test.
+  const int kNumColumns = 100;
+  const int kNumRows = 1000;
+  const int kNumDeltas = 10000;
+  const std::pair<uint64_t, uint64_t> kTimestampRange(0, 100);
+
+  // Build a schema with kNumColumns columns.
+  SchemaBuilder sb;
+  for (int i = 0; i < kNumColumns; i++) {
+    ASSERT_OK(sb.AddColumn(Substitute("col$0", i), UINT32));
+  }
+  Schema schema(sb.Build());
+
+  Random r(SeedRandom());
+  MirroredDeltas<TypeParam> deltas(&schema);
+
+  shared_ptr<DeltaFileReader> reader;
+  ASSERT_OK(CreateRandomDeltaFile<TypeParam>(
+      schema, this->fs_manager_.get(), &r,
+      kNumDeltas, { 0, kNumRows }, kTimestampRange, &deltas, &reader));
+
+  NO_FATALS(RunDeltaFuzzTest<TypeParam>(
+      *reader, &r, &deltas, kTimestampRange,
+      /*test_filter_column_ids_and_collect_deltas=*/true));
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/deltamemstore-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc
index ae7e1c3..c954bd0 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -38,6 +38,7 @@
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row_changelist.h"
+#include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/types.h"
@@ -50,15 +51,19 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/tablet/deltafile.h"
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset.h"
+#include "kudu/tablet/tablet-test-util.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
@@ -72,6 +77,7 @@ using std::string;
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace tablet {
@@ -544,5 +550,33 @@ TEST_F(TestDeltaMemStore, TestCollectMutations) {
   }
 }
 
+// Generates a series of random deltas,  writes them to a DMS, reads them back
+// using a DMSIterator, and verifies the results.
+TEST_F(TestDeltaMemStore, TestFuzz) {
+  // Arbitrary constants to control the running time and coverage of the test.
+  const int kNumColumns = 100;
+  const int kNumRows = 1000;
+  const int kNumDeltas = 10000;
+  const std::pair<uint64_t, uint64_t> kTimestampRange(0, 100);
+
+  // Build a schema with kNumColumns columns.
+  SchemaBuilder sb;
+  for (int i = 0; i < kNumColumns; i++) {
+    ASSERT_OK(sb.AddColumn(Substitute("col$0", i), UINT32));
+  }
+  Schema schema(sb.Build());
+
+  Random r(SeedRandom());
+  MirroredDeltas<DeltaTypeSelector<REDO>> deltas(&schema);
+
+  shared_ptr<DeltaMemStore> dms;
+  ASSERT_OK(CreateRandomDMS(
+      schema, &r, kNumDeltas, { 0, kNumRows }, kTimestampRange, &deltas, &dms));
+
+  NO_FATALS(RunDeltaFuzzTest<DeltaTypeSelector<REDO>>(
+      *dms, &r, &deltas, kTimestampRange,
+      /*test_filter_column_ids_and_collect_deltas=*/false));
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index 842a538..a704600 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -375,10 +375,7 @@ Status DMSIterator::FilterColumnIdsAndCollectDeltas(const vector<ColumnId>& col_
 }
 
 bool DMSIterator::HasNext() {
-  // TODO implement this if we ever want to include DeltaMemStore in minor
-  // delta compaction.
-  LOG(FATAL) << "Unimplemented";
-  return false;
+  return iter_->IsValid();
 }
 
 bool DMSIterator::MayHaveDeltas() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/tablet-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.cc b/src/kudu/tablet/tablet-test-util.cc
new file mode 100644
index 0000000..0575e3b
--- /dev/null
+++ b/src/kudu/tablet/tablet-test-util.cc
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/tablet-test-util.h"
+
+namespace kudu {
+namespace tablet {
+
+template<>
+bool MirroredDeltas<DeltaTypeSelector<REDO>>::IsDeltaRelevant(
+    Timestamp to_include, Timestamp ts) const {
+  return ts < to_include;
+}
+
+template<>
+bool MirroredDeltas<DeltaTypeSelector<UNDO>>::IsDeltaRelevant(
+    Timestamp to_include, Timestamp ts) const {
+  return ts >= to_include;
+}
+
+} // namespace tablet
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/04468d0b/src/kudu/tablet/tablet-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 06b964e..a7b74ae 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -14,31 +14,83 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TABLET_TABLET_TEST_UTIL_H
-#define KUDU_TABLET_TABLET_TEST_UTIL_H
+
+#pragma once
+
+#include <limits.h>
 
 #include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <functional>
+#include <map>
 #include <memory>
+#include <ostream>
+#include <set>
 #include <string>
+#include <type_traits>
+#include <unordered_set>
+#include <utility>
 #include <vector>
 
-#include <gflags/gflags.h>
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
 
+#include "kudu/cfile/cfile_util.h"
+#include "kudu/common/columnblock.h"
+#include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
-#include "kudu/gutil/casts.h"
+#include "kudu/common/row.h"
+#include "kudu/common/row_changelist.h"
+#include "kudu/common/rowblock.h"
+#include "kudu/common/rowid.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/timestamp.h"
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/fs/block_id.h"
+#include "kudu/fs/block_manager.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
-#include "kudu/tablet/row_op.h"
+#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_key.h"
+#include "kudu/tablet/delta_stats.h"
+#include "kudu/tablet/delta_store.h"
+#include "kudu/tablet/deltafile.h"
+#include "kudu/tablet/deltamemstore.h"
+#include "kudu/tablet/mutation.h"
+#include "kudu/tablet/mvcc.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet-harness.h"
 #include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/transactions/alter_schema_transaction.h"
-#include "kudu/tablet/transactions/write_transaction.h"
-#include "kudu/util/metrics.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 namespace kudu {
+
+namespace clock {
+class Clock;
+} // namespace clock
+
 namespace tablet {
 
+class RowSetMetadata;
+
 class KuduTabletTest : public KuduTest {
  public:
   explicit KuduTabletTest(const Schema& schema,
@@ -281,6 +333,589 @@ static Status WriteRow(const Slice &row_slice, RowSetWriterClass *writer) {
   return writer->AppendBlock(block);
 }
 
+template <DeltaType Type>
+struct DeltaTypeSelector {
+  static constexpr DeltaType kTag = Type;
+};
+
+// Tracks encoded deltas and provides a DeltaIterator-like interface for
+// querying them.
+//
+// This is effectively a poor man's DeltaMemStore, except it allows REINSERTs.
+template<typename T>
+class MirroredDeltas {
+ public:
+  // Map of row index to map of timestamp to encoded delta.
+  //
+  // The inner map is sorted by an order that respects DeltaType. That is, REDO
+  // timestamps are sorted in ascending order, while UNDO timestamps are sorted
+  // in descending order.
+  using ComparatorType = typename std::conditional<T::kTag == REDO,
+                                                   std::less<Timestamp>,
+                                                   std::greater<Timestamp>>::type;
+  using MirroredDeltaMap = std::map<rowid_t,
+                                    std::map<Timestamp, faststring, ComparatorType>>;
+
+  explicit MirroredDeltas(const Schema* schema)
+      : schema_(schema),
+        arena_(1024) {
+  }
+
+  // Tracks a new key/delta pair. The key must not already have an associated
+  // encoded delta.
+  void AddEncodedDelta(const DeltaKey& k, const faststring& changes) {
+    auto& existing = all_deltas_[k.row_idx()][k.timestamp()];
+    DCHECK_EQ(0, existing.length());
+    existing.assign_copy(changes.data(), changes.length());
+  }
+
+  // Returns true if all tracked deltas are irrelevant to 'ts', false otherwise.
+  bool CheckAllDeltasCulled(Timestamp ts) const {
+    for (const auto& e1 : all_deltas_) {
+      for (const auto& e2 : e1.second) {
+        if (IsDeltaRelevant(ts, e2.first)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  // Returns a string representation of all tracked deltas.
+  std::string ToString() const {
+    std::string s;
+    bool append_delim = false;
+    for (const auto& e1 : all_deltas_) {
+      for (const auto& e2 : e1.second) {
+        if (append_delim) {
+          StrAppend(&s, "\n");
+        } else {
+          append_delim = true;
+        }
+        StrAppend(&s, e1.first);
+        StrAppend(&s, ",");
+        StrAppend(&s, e2.first.ToString());
+        StrAppend(&s, ": ");
+        StrAppend(&s, RowChangeList(e2.second).ToString(*schema_));
+      }
+    }
+    return s;
+  }
+
+  // Applies tracked UPDATE and REINSERT values to 'cb'.
+  //
+  // Deltas not relevant to 'ts' are skipped. The set of rows considered is
+  // determined by 'start_row_idx' and the number of rows in 'cb'.
+  Status ApplyUpdates(const Schema& projection, Timestamp ts,
+                      rowid_t start_row_idx, int col_idx, ColumnBlock* cb) {
+    for (int i = 0; i < cb->nrows(); i++) {
+      rowid_t row_idx = start_row_idx + i;
+      for (const auto& e : all_deltas_[row_idx]) {
+        if (!IsDeltaRelevant(ts, e.first)) {
+          // No need to keep iterating; all future deltas for this row will also
+          // be irrelevant.
+          break;
+        }
+        RowChangeList changes(e.second);
+        if (changes.is_delete()) {
+          continue;
+        }
+        RowChangeListDecoder decoder(changes);
+        decoder.InitNoSafetyChecks();
+        RETURN_NOT_OK(decoder.ApplyToOneColumn(i, cb, projection, col_idx, &arena_));
+      }
+    }
+    return Status::OK();
+  }
+
+  // Applies deletions to 'sel_vec' by unselecting all rows whose last tracked
+  // delta is a DELETE.
+  //
+  // Deltas not relevant to 'ts' are skipped. The set of rows considered is
+  // determined by 'start_row_idx' and the number of rows in 'sel_vec'.
+  Status ApplyDeletes(Timestamp ts, rowid_t start_row_idx, SelectionVector* sel_vec) {
+    for (int i = 0; i < sel_vec->nrows(); i++) {
+      bool deleted = false;
+      for (const auto& e : all_deltas_[start_row_idx + i]) {
+        if (!IsDeltaRelevant(ts, e.first)) {
+          // No need to keep iterating; all future deltas for this row will also
+          // be irrelevant.
+          break;
+        }
+        RowChangeList changes(e.second);
+        RowChangeListDecoder decoder(changes);
+        decoder.InitNoSafetyChecks();
+        decoder.TwiddleDeleteStatus(&deleted);
+      }
+      if (deleted) {
+        sel_vec->SetRowUnselected(i);
+      }
+    }
+    return Status::OK();
+  }
+
+  // Transforms and writes deltas into 'deltas', a vector of "delta lists", each
+  // of which represents a particular row, and each entry of which is a
+  // Timestamp and encoded delta pair. The encoded delta is a Slice into
+  // all_deltas_ and should not outlive this class instance.
+  //
+  // Deltas not relevant to 'ts' are skipped. The set of rows considered is
+  // determined by 'start_row_idx' and the number of rows in 'deltas'.
+  using DeltaList = std::vector<std::pair<Timestamp, Slice>>;
+  void CollectMutations(Timestamp ts, rowid_t start_row_idx,
+                        std::vector<DeltaList>* deltas) {
+    for (int i = 0; i < deltas->size(); i++) {
+      for (const auto& e : all_deltas_[start_row_idx + i]) {
+        if (!IsDeltaRelevant(ts, e.first)) {
+          // No need to keep iterating; all future deltas for this row will also
+          // be irrelevant.
+          break;
+        }
+        (*deltas)[i].emplace_back(e.first, Slice(e.second));
+      }
+    }
+  }
+
+  // Transforms and writes deltas into 'deltas', a vector of delta key to encoded
+  // delta pairs. The encoded delta is a Slice into this class instance's arena
+  // and should not outlive it.
+  //
+  // Notably, this function does not compare timestamps; all deltas for the
+  // requested rows are returned. The set of rows considered is determined by
+  // 'start_row_idx' and 'num_rows'.
+  Status FilterColumnIdsAndCollectDeltas(rowid_t start_row_idx, size_t num_rows,
+                                         const std::vector<ColumnId>& col_ids,
+                                         std::vector<std::pair<DeltaKey, Slice>>* deltas) {
+    faststring buf;
+    RowChangeListEncoder encoder(&buf);
+
+    for (int i = 0; i < num_rows; i++) {
+      rowid_t row_idx = start_row_idx + i;
+      for (const auto& e : all_deltas_[row_idx]) {
+        encoder.Reset();
+        RETURN_NOT_OK(
+            RowChangeListDecoder::RemoveColumnIdsFromChangeList(
+                RowChangeList(e.second), col_ids, &encoder));
+        if (encoder.is_initialized()) {
+          RowChangeList changes = encoder.as_changelist();
+          Slice relocated;
+          CHECK(arena_.RelocateSlice(changes.slice(), &relocated));
+          deltas->emplace_back(DeltaKey(row_idx, e.first), relocated);
+        }
+      }
+    }
+    return Status::OK();
+  }
+
+  const MirroredDeltaMap& all_deltas() const { return all_deltas_; }
+  const Schema* schema() const { return schema_; }
+
+ private:
+  // Returns true if 'ts' is relevant to 'to_include', false otherwise.
+  bool IsDeltaRelevant(Timestamp to_include, Timestamp ts) const;
+
+  // All encoded deltas, arranged in DeltaKey order.
+  MirroredDeltaMap all_deltas_;
+
+  // Schema of all encoded deltas.
+  const Schema* schema_;
+
+  // Arena used for allocations in ApplyUpdates and FilterColumnIdsAndCollectDeltas.
+  Arena arena_;
+};
+
+// Returns a sequence of randomly chosen positive integers with the following properties:
+// 1. All integers are in the range [0, 'max_integer').
+// 2. The number of entries is equal to 'num_integers', or 'max_integer' if
+//    'num_integers' is greater than the size of the range in #1.
+// 3. An integer cannot repeat.
+// 4. The integers are in ascending order.
+static inline std::vector<size_t> GetRandomIntegerSequence(
+    Random* prng,
+    size_t max_integer,
+    size_t num_integers) {
+  // Clamp the length of the sequence in case the max is too low to supply the
+  // desired number of integers.
+  size_t num_integers_clamped = std::min(max_integer, num_integers);
+
+  // Pick some integers.
+  //
+  // We use an ordered set so that the sequence is in ascending order.
+  std::set<size_t> integers;
+  do {
+    integers.emplace(prng->Uniform(max_integer));
+  } while (integers.size() < num_integers_clamped);
+
+  return std::vector<size_t>(integers.begin(), integers.end());
+}
+
+template <typename T>
+void CreateRandomDeltas(const Schema& schema,
+                        Random* prng,
+                        int num_deltas,
+                        std::pair<rowid_t, rowid_t> row_range,
+                        std::pair<uint64_t, uint64_t> ts_range,
+                        bool allow_reinserts,
+                        MirroredDeltas<T>* mirror) {
+  DCHECK_GT(row_range.second, row_range.first);
+  DCHECK_GT(ts_range.second, ts_range.first);
+
+  // Randomly generate a set of delta keys, then sort them.
+  std::unordered_set<DeltaKey,
+                     DeltaKeyHashFunctor,
+                     DeltaKeyEqualToFunctor<T::kTag>> keys;
+  int i = 0;
+  while (i < num_deltas) {
+    rowid_t row_idx = prng->Uniform(row_range.second - row_range.first) +
+                      row_range.first;
+    uint64_t ts_val = prng->Uniform(ts_range.second - ts_range.first) +
+                      ts_range.first;
+    if (EmplaceIfNotPresent(&keys, row_idx, Timestamp(ts_val))) {
+      i++;
+    }
+  }
+  std::vector<DeltaKey> sorted_keys(keys.begin(), keys.end());
+  std::sort(sorted_keys.begin(), sorted_keys.end(),
+            DeltaKeyLessThanFunctor<T::kTag>());
+
+  // Randomly generate deltas using the keys.
+  //
+  // Because the timestamps are sorted sorted in DeltaType order, we can track
+  // the deletion status of each row directly.
+  faststring buf;
+  RowChangeListEncoder encoder(&buf);
+  bool is_deleted;
+  boost::optional<rowid_t> prev_row_idx;
+  for (i = 0; i < sorted_keys.size(); i++) {
+    encoder.Reset();
+    const auto& k = sorted_keys[i];
+
+    if (!prev_row_idx || prev_row_idx != k.row_idx()) {
+      // New row; reset the deletion status.
+      is_deleted = false;
+      prev_row_idx = k.row_idx();
+    }
+
+    if (is_deleted) {
+      // The row is deleted; we must REINSERT it.
+      DCHECK(allow_reinserts);
+      RowBuilder rb(schema);
+      for (int i = 0; i < schema.num_columns(); i++) {
+        rb.AddUint32(prng->Next());
+      }
+      encoder.SetToReinsert(rb.row());
+      is_deleted = false;
+      VLOG(3) << "REINSERT: " << k.row_idx() << "," << k.timestamp().ToString()
+              << ": " << encoder.as_changelist().ToString(schema);
+    } else if (prng->Uniform(100) < 90 ||
+               (!allow_reinserts &&
+                i + 1 < sorted_keys.size() &&
+                k.row_idx() == sorted_keys[i + 1].row_idx())) {
+      // The row is live and we randomly chose to UPDATE it. Do so to a random
+      // assortment of columns.
+      //
+      // There's a special case here for when we chose to DELETE (see below) but
+      // we're not allowed to REINSERT: if this won't be the last delta for this
+      // row, we'll generate another UPDATE instead of the DELETE. This is
+      // because we've generated the keys up front; if we DELETE now and can't
+      // REINSERT, we'd have to discard the remaining keys for this row.
+      int num_cols_to_update = std::min(prng->Uniform(5) + 1UL,
+                                        schema.num_columns());
+      auto idxs_to_update = GetRandomIntegerSequence(prng,
+                                                     schema.num_columns(),
+                                                     num_cols_to_update);
+      for (auto idx : idxs_to_update) {
+        uint32_t u32_val = prng->Next();
+        auto col_id = schema.column_id(idx);
+        const auto& col = schema.column(idx);
+        encoder.AddColumnUpdate(col, col_id, &u32_val);
+      }
+      VLOG(3) << "UPDATE: " << k.row_idx() << "," << k.timestamp().ToString()
+              << ": " << encoder.as_changelist().ToString(schema);
+    } else {
+      // The row is live; DELETE it.
+      encoder.SetToDelete();
+      is_deleted = true;
+      VLOG(3) << "DELETE: " << k.row_idx() << "," << k.timestamp().ToString();
+    }
+
+    mirror->AddEncodedDelta(k, buf);
+  }
+}
+
+static inline Schema GetRandomProjection(const Schema& schema,
+                                         Random* prng,
+                                         size_t max_cols_to_project) {
+  // Set up the projection.
+  auto idxs_to_project = GetRandomIntegerSequence(prng,
+                                                  schema.num_columns(),
+                                                  max_cols_to_project);
+  std::vector<ColumnSchema> projected_cols;
+  std::vector<ColumnId> projected_col_ids;
+  for (auto idx : idxs_to_project) {
+    projected_cols.emplace_back(schema.column(idx));
+    projected_col_ids.emplace_back(schema.column_id(idx));
+  }
+  return Schema(projected_cols, projected_col_ids, 0);
+}
+
+// Create a DMS and populate it with random deltas.
+//
+// 'num_deltas' dictates the number of deltas that should be created.
+//
+// 'row_range' and 'ts_range' constrain the DeltaKeys used in the created deltas.
+//
+// 'mirror' will be updated with all created deltas.
+static inline Status CreateRandomDMS(
+    const Schema& schema,
+    Random* prng,
+    int num_deltas,
+    std::pair<rowid_t, rowid_t> row_range,
+    std::pair<uint64_t, uint64_t> ts_range,
+    MirroredDeltas<DeltaTypeSelector<REDO>>* mirror,
+    std::shared_ptr<DeltaMemStore>* dms) {
+  DCHECK(mirror);
+  DCHECK(dms);
+
+  // Create a smattering of deltas in 'mirror'.
+  CreateRandomDeltas(schema, prng, num_deltas,
+                     std::move(row_range), std::move(ts_range),
+                     /*allow_reinserts=*/ false, mirror);
+
+  // Add them to the DMS.
+  std::shared_ptr<DeltaMemStore> local_dms;
+  RETURN_NOT_OK(DeltaMemStore::Create(
+      0, 0, new log::LogAnchorRegistry(), MemTracker::GetRootTracker(),
+      &local_dms));
+  RETURN_NOT_OK(local_dms->Init(nullptr));
+  consensus::OpId op_id(consensus::MaximumOpId());
+  for (const auto& e1 : mirror->all_deltas()) {
+    for (const auto& e2 : e1.second) {
+      DeltaKey k(e1.first, e2.first);
+      RowChangeList changes(e2.second);
+      RETURN_NOT_OK(local_dms->Update(e2.first, e1.first,
+                                      RowChangeList(e2.second), op_id));
+    }
+  }
+
+  *dms = std::move(local_dms);
+  return Status::OK();
+}
+
+// Create a delta file, populate it with random deltas, and return an opened
+// DeltaFileReader for it.
+//
+// 'num_deltas' dictates the number of deltas that should be created.
+//
+// 'row_range' and 'ts_range' constrain the DeltaKeys used in the created deltas.
+//
+// 'mirror' will be updated with all created deltas.
+template <typename T>
+Status CreateRandomDeltaFile(const Schema& schema,
+                             FsManager* fs_manager,
+                             Random* prng,
+                             int num_deltas,
+                             std::pair<rowid_t, rowid_t> row_range,
+                             std::pair<uint64_t, uint64_t> ts_range,
+                             MirroredDeltas<T>* mirror,
+                             std::shared_ptr<DeltaFileReader>* delta_reader) {
+  DCHECK(mirror);
+  DCHECK(delta_reader);
+
+  // Create a smattering of deltas in 'mirror'.
+  CreateRandomDeltas(schema, prng, num_deltas,
+                     std::move(row_range), std::move(ts_range),
+                     /*allow_reinserts=*/ true, mirror);
+
+  // Write them out to a delta file in order.
+  std::unique_ptr<fs::WritableBlock> wb;
+  RETURN_NOT_OK(fs_manager->CreateNewBlock({}, &wb));
+  BlockId block_id = wb->id();
+  std::unique_ptr<DeltaFileWriter> writer(new DeltaFileWriter(std::move(wb)));
+  RETURN_NOT_OK(writer->Start());
+  DeltaStats stats;
+  for (const auto& e1 : mirror->all_deltas()) {
+    for (const auto& e2 : e1.second) {
+      DeltaKey k(e1.first, e2.first);
+      RowChangeList changes(e2.second);
+      RETURN_NOT_OK(writer->AppendDelta<T::kTag>(k, changes));
+      RETURN_NOT_OK(stats.UpdateStats(k.timestamp(), changes));
+    }
+  }
+  writer->WriteDeltaStats(stats);
+  RETURN_NOT_OK(writer->Finish());
+
+  // Open a reader for this newly written delta file.
+  std::unique_ptr<fs::ReadableBlock> rb;
+  RETURN_NOT_OK(fs_manager->OpenBlock(block_id, &rb));
+  return DeltaFileReader::Open(std::move(rb), T::kTag,
+                               cfile::ReaderOptions(), delta_reader);
+}
+
+// Fuzz tests a DeltaStore by generating a fairly random DeltaIterator, using it
+// to retrieve deltas from 'store' via several DeltaIterator methods, and
+// comparing those deltas with the ones found in 'mirror'. Assumes that both
+// 'store' and 'mirror' have been initialized with the same logical deltas.
+//
+// 'ts_range' controls the timestamp range to be used by the iterator.
+//
+// If 'test_filter_column_ids_and_collect_deltas' is true, will test that
+// DeltaIterator method too.
+template <typename T>
+void RunDeltaFuzzTest(const DeltaStore& store,
+                      Random* prng,
+                      MirroredDeltas<T>* mirror,
+                      std::pair<uint64_t, uint64_t> ts_range,
+                      bool test_filter_column_ids_and_collect_deltas) {
+  // Arbitrary constants to control the running time and coverage of the test.
+  const int kMaxBatchSize = 1000;
+  const int kNumScans = 100;
+  const int kMaxColsToProject = 10;
+  const int kMaxColsToFilter = 4;
+
+  // Run a series of tests on random timestamps as well as one scan on a
+  // snapshot for whom all deltas are relevant.
+  for (int i = 0; i < kNumScans + 1; i++) {
+    // Pick a timestamp for the iterator. The last iteration will use a snapshot
+    // that includes all deltas.
+    Timestamp ts;
+    if (i < kNumScans) {
+      ts = Timestamp(prng->Uniform(ts_range.second - ts_range.first) +
+                     ts_range.first);
+    } else if (T::kTag == REDO) {
+      ts = Timestamp::kMax;
+    } else {
+      DCHECK(T::kTag == UNDO);
+      ts = Timestamp::kMin;
+    }
+
+    // Create and initialize the iterator. If none iterator is returned, it's
+    // because all deltas in 'store' were irrelevant; verify this.
+    Schema projection = GetRandomProjection(*mirror->schema(), prng, kMaxColsToProject);
+    SCOPED_TRACE(strings::Substitute("Projection $0", projection.ToString()));
+    RowIteratorOptions opts;
+    opts.projection = &projection;
+    opts.snap_to_include = MvccSnapshot(ts);
+    SCOPED_TRACE(strings::Substitute("Timestamp $0", ts.ToString()));
+    DeltaIterator* raw_iter;
+    Status s = store.NewDeltaIterator(opts, &raw_iter);
+    if (s.IsNotFound()) {
+      ASSERT_STR_CONTAINS(s.ToString(), "MvccSnapshot outside the range of this delta");
+      ASSERT_TRUE(mirror->CheckAllDeltasCulled(ts));
+      continue;
+    }
+    ASSERT_OK(s);
+    std::unique_ptr<DeltaIterator> iter(raw_iter);
+    ASSERT_OK(iter->Init(nullptr));
+
+    // Run PREPARE_FOR_APPLY method tests in batches, in case there's some bug
+    // related to batching.
+    ASSERT_OK(iter->SeekToOrdinal(0));
+    rowid_t start_row_idx = 0;
+    while (iter->HasNext()) {
+      int batch_size = prng->Uniform(kMaxBatchSize) + 1;
+      SCOPED_TRACE(strings::Substitute("APPLY: batch starting at $0 ($1 rows)",
+                                       start_row_idx, batch_size));
+      ASSERT_OK(iter->PrepareBatch(batch_size, DeltaIterator::PREPARE_FOR_APPLY));
+
+      // Test ApplyUpdates: all relevant updates are applied to the column block.
+      for (int j = 0; j < opts.projection->num_columns(); j++) {
+        SCOPED_TRACE(strings::Substitute("Column $0", j));
+        ScopedColumnBlock<UINT32> expected_scb(batch_size, false);
+        ScopedColumnBlock<UINT32> actual_scb(batch_size, false);
+        for (int k = 0; k < batch_size; k++) {
+          expected_scb[k] = 0;
+          actual_scb[k] = 0;
+        }
+        ASSERT_OK(mirror->ApplyUpdates(*opts.projection, ts, start_row_idx, j,
+                                       &expected_scb));
+        ASSERT_OK(iter->ApplyUpdates(j, &actual_scb));
+        ASSERT_EQ(expected_scb, actual_scb)
+            << "Expected column block: " << expected_scb.ToString()
+            << "\nActual column block: " << actual_scb.ToString();
+      }
+
+      // Test ApplyDeletes: the selection vector is all true and a row is unset
+      // if the last relevant update deleted it.
+      {
+        SelectionVector expected_sv(batch_size);
+        SelectionVector actual_sv(batch_size);
+        expected_sv.SetAllTrue();
+        actual_sv.SetAllTrue();
+        ASSERT_OK(mirror->ApplyDeletes(ts, start_row_idx, &expected_sv));
+        ASSERT_OK(iter->ApplyDeletes(&actual_sv));
+        ASSERT_EQ(expected_sv, actual_sv);
+      }
+      start_row_idx += batch_size;
+    }
+
+    // Now run PREPARE_FOR_COLLECT method tests.
+    ASSERT_OK(iter->SeekToOrdinal(0));
+    start_row_idx = 0;
+    while (iter->HasNext()) {
+      int batch_size = prng->Uniform(kMaxBatchSize - 1) + 1;
+      SCOPED_TRACE(strings::Substitute("COLLECT: batch starting at $0 ($1 rows)",
+                                       start_row_idx, batch_size));
+      ASSERT_OK(iter->PrepareBatch(batch_size, DeltaIterator::PREPARE_FOR_COLLECT));
+
+      // Test CollectMutations: all relevant updates are returned.
+      {
+        Arena arena(1024);
+        std::vector<typename MirroredDeltas<T>::DeltaList> expected_muts(batch_size);
+        std::vector<Mutation*> actual_muts(batch_size);
+        ASSERT_OK(iter->CollectMutations(&actual_muts, &arena));
+        mirror->CollectMutations(ts, start_row_idx, &expected_muts);
+        for (int i = 0; i < expected_muts.size(); i++) {
+          const auto& expected = expected_muts[i];
+          auto* actual = actual_muts[i];
+
+          // Mutations from CollectMutations() are in the opposite timestamp
+          // order than what's needed for REDOs or UNDOs.
+          Mutation::ReverseMutationList(&actual);
+
+          for (int j = 0; j < expected.size(); j++) {
+            ASSERT_TRUE(actual);
+            ASSERT_EQ(expected[j].first, actual->timestamp());
+            ASSERT_EQ(expected[j].second, actual->changelist().slice());
+            actual = actual->next();
+          }
+          ASSERT_FALSE(actual);
+        }
+      }
+
+      // Test FilterColumnIdsAndCollectDeltas with a random filter set.
+      //
+      // Note that this operation only works on a totally inclusive snapshot.
+      if (test_filter_column_ids_and_collect_deltas && i == kNumScans) {
+
+        // Create a sequence of column ids to filter.
+        auto idxs_to_filter = GetRandomIntegerSequence(prng,
+                                                       opts.projection->num_columns(),
+                                                       kMaxColsToFilter);
+        std::vector<ColumnId> col_ids_to_filter(idxs_to_filter.size());
+        for (auto idx : idxs_to_filter) {
+          col_ids_to_filter.emplace_back(opts.projection->column_id(idx));
+        }
+
+        // Collect and filter, then compare the results.
+        Arena arena(1024);
+        std::vector<std::pair<DeltaKey, Slice>> expected_deltas;
+        std::vector<DeltaKeyAndUpdate> actual_deltas;
+        ASSERT_OK(mirror->FilterColumnIdsAndCollectDeltas(
+            start_row_idx, batch_size, col_ids_to_filter, &expected_deltas));
+        ASSERT_OK(iter->FilterColumnIdsAndCollectDeltas(
+            col_ids_to_filter, &actual_deltas, &arena));
+        ASSERT_EQ(expected_deltas.size(), actual_deltas.size());
+        for (int j = 0; j < expected_deltas.size(); j++) {
+          ASSERT_TRUE(expected_deltas[j].first.CompareTo<T::kTag>(
+              actual_deltas[j].key) == 0);
+          ASSERT_EQ(expected_deltas[j].second, actual_deltas[j].cell);
+        }
+      }
+      start_row_idx += batch_size;
+    }
+  }
+}
+
 } // namespace tablet
 } // namespace kudu
-#endif


[3/3] kudu git commit: KUDU-686 (part 2/2): use DeltaPreparer in DeltaFileIterator

Posted by ad...@apache.org.
KUDU-686 (part 2/2): use DeltaPreparer in DeltaFileIterator

This patch rewrites much of DeltaFileIterator to leverage DeltaPreparer.
Besides sharing code, the motivation is to take advantage of the performance
improvement inherent to DeltaPreparer: decoding a batch of deltas just once
in PrepareBatch() as opposed to on each call to ApplyUpdates().

Seeing as DeltaPreparer was originally built for DMSIterator, here are the
various augmentations that were necessary to support DeltaFileIterator:
- REINSERT support, which meant more complicated deletion state tracking.
- FilterColumnIdsAndCollectDeltas support, cribbed from DeltaFileIterator.
- A templatized traits system to control which features were enabled. This
  also meant templatizing both DeltaPreparer and DeltaFileIterator.
- Early out from the "apply all deltas for a row" loop when the timestamp
  is no longer relevant. There's an opportunity to seek here and skip any
  remaining deltas belonging to the row, but testing with a new DMS iterator
  microbenchmark showed that this is only an improvement when the number of
  deltas skipped is sufficiently high.

The improvement should be most noticeable on tables with wide schemas where
multiple columns are projected. In these situations, the column-by-column
ApplyUpdates() approach incurred a lot of unnecessary delta decoding. To
prove it, I included a new deltafile iterator microbenchmark that scans a
subset of a wide schema's columns as a DeltaApplier would.

Before:

  Performance counter stats for 'bin/deltafile-test --gtest_filter=*Benchmark*' (5 runs):

      11358.256100      task-clock (msec)         #    0.998 CPUs utilized            ( +-  3.39% )
               140      context-switches          #    0.012 K/sec                    ( +- 27.37% )
                 6      cpu-migrations            #    0.001 K/sec                    ( +- 52.36% )
            34,231      page-faults               #    0.003 M/sec                    ( +-  0.42% )
    42,288,292,153      cycles                    #    3.723 GHz                      ( +-  4.12% )
   100,853,942,114      instructions              #    2.38  insn per cycle           ( +-  5.35% )
    19,689,789,259      branches                  # 1733.522 M/sec                    ( +-  5.49% )
        69,419,478      branch-misses             #    0.35% of all branches          ( +-  5.14% )

      11.378958537 seconds time elapsed                                          ( +-  3.38% )

After:

  Performance counter stats for 'bin/deltafile-test --gtest_filter=*Benchmark*' (5 runs):

       4089.419224      task-clock (msec)         #    0.995 CPUs utilized            ( +-  1.25% )
                43      context-switches          #    0.011 K/sec                    ( +-  4.10% )
                 2      cpu-migrations            #    0.000 K/sec                    ( +- 32.39% )
            34,948      page-faults               #    0.009 M/sec                    ( +-  0.22% )
    15,269,907,971      cycles                    #    3.734 GHz                      ( +-  1.30% )
    32,409,048,370      instructions              #    2.12  insn per cycle           ( +-  1.85% )
     5,848,268,795      branches                  # 1430.098 M/sec                    ( +-  1.85% )
        32,900,262      branch-misses             #    0.56% of all branches          ( +-  2.80% )

       4.111096224 seconds time elapsed                                          ( +-  1.18% )

It's interesting to see the number of page faults increase while everything
else has gone down, but that makes sense: the new implementation allocates
memory in PrepareBatch() in order to optimize the structure of the deltas.

To be extra safe, I also looped fuzz-itest 1000 times in slow mode. All of
the runs passed.

Change-Id: I87de52092262c4b42c1bd5107f9139edfc3888b5
Reviewed-on: http://gerrit.cloudera.org:8080/11395
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1043f319
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1043f319
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1043f319

Branch: refs/heads/master
Commit: 1043f3193371d9829bf6d85221bdc19607662440
Parents: bd8d747
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Sep 5 12:39:23 2018 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Oct 31 23:58:16 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_key.h           |  17 +-
 src/kudu/tablet/delta_store.cc        | 210 +++++++++++++--
 src/kudu/tablet/delta_store.h         |  69 ++++-
 src/kudu/tablet/deltafile-test.cc     |  69 +++++
 src/kudu/tablet/deltafile.cc          | 400 ++++++-----------------------
 src/kudu/tablet/deltafile.h           |  45 +---
 src/kudu/tablet/deltamemstore-test.cc |  60 ++++-
 src/kudu/tablet/deltamemstore.cc      |  33 ++-
 src/kudu/tablet/deltamemstore.h       |   6 +-
 src/kudu/tablet/mvcc.cc               |  10 +
 src/kudu/tablet/mvcc.h                |   4 +
 src/kudu/tablet/tablet-test-util.h    |   5 -
 12 files changed, 525 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/delta_key.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_key.h b/src/kudu/tablet/delta_key.h
index 5a92411..96767f2 100644
--- a/src/kudu/tablet/delta_key.h
+++ b/src/kudu/tablet/delta_key.h
@@ -39,7 +39,7 @@ enum DeltaType {
   // are sorted by increasing transaction timestamp.
   REDO,
   // UNDO delta files contain the mutations that were applied
-  // prior to the time the base data was last/flushed compacted
+  // prior to the time the base data was last flushed/compacted
   // and allow to execute point-in-time snapshot scans. UNDO
   // deltas are sorted by decreasing transaction timestamp.
   UNDO
@@ -47,6 +47,21 @@ enum DeltaType {
 
 const char* DeltaType_Name(DeltaType t);
 
+// An alternate representation of the raw DeltaType. By templating on
+// DeltaTypeSelector instead of DeltaType, it's far easier to reference the
+// DeltaType at runtime. For example:
+//
+// template <typename T>
+// void Foo() {
+//   cout << T::kTag == REDO ? "REDO" : "UNDO" << endl;
+// }
+//
+// Foo<DeltaTypeSelector<REDO>>(); // prints 'REDO'
+template <DeltaType Type>
+struct DeltaTypeSelector {
+  static constexpr DeltaType kTag = Type;
+};
+
 // Each entry in the delta memrowset or delta files is keyed by the rowid
 // which has been updated, as well as the timestamp which performed the update.
 class DeltaKey {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/delta_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index e327a7c..94ee510 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -20,7 +20,6 @@
 #include <algorithm>
 #include <cstdlib>
 #include <cstring>
-#include <memory>
 #include <ostream>
 
 #include <glog/logging.h>
@@ -41,6 +40,7 @@
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/util/debug-util.h"
+#include "kudu/util/faststring.h"
 #include "kudu/util/memory/arena.h"
 
 namespace kudu {
@@ -51,6 +51,47 @@ using std::string;
 using std::vector;
 using strings::Substitute;
 
+namespace {
+
+// Returns whether a mutation at 'ts' is relevant under 'snap'.
+//
+// If not relevant, further checks whether any remaining deltas for this row can
+// be skipped; this is an optimization and not necessary for correctness.
+template<DeltaType Type>
+bool IsDeltaRelevant(const MvccSnapshot& snap,
+                     const Timestamp& ts,
+                     bool* finished_row);
+
+template<>
+bool IsDeltaRelevant<REDO>(const MvccSnapshot& snap,
+                           const Timestamp& ts,
+                           bool* finished_row) {
+  *finished_row = false;
+  if (!snap.IsCommitted(ts)) {
+    if (!snap.MayHaveCommittedTransactionsAtOrAfter(ts)) {
+      *finished_row = true;
+    }
+    return false;
+  }
+  return true;
+}
+
+template<>
+bool IsDeltaRelevant<UNDO>(const MvccSnapshot& snap,
+                           const Timestamp& ts,
+                           bool* finished_row) {
+  *finished_row = false;
+  if (snap.IsCommitted(ts)) {
+    if (!snap.MayHaveUncommittedTransactionsAtOrBefore(ts)) {
+      *finished_row = true;
+    }
+    return false;
+  }
+  return true;
+}
+
+} // anonymous namespace
+
 string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool pad_key) const {
   return StrCat(Substitute("($0 delta key=$2, change_list=$1)",
                            DeltaType_Name(type),
@@ -61,20 +102,24 @@ string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool p
                                                  key.timestamp().ToString()))));
 }
 
-DeltaPreparer::DeltaPreparer(RowIteratorOptions opts)
+template<class Traits>
+DeltaPreparer<Traits>::DeltaPreparer(RowIteratorOptions opts)
     : opts_(std::move(opts)),
       cur_prepared_idx_(0),
       prev_prepared_idx_(0),
-      prepared_for_(NOT_PREPARED) {
+      prepared_for_(NOT_PREPARED),
+      deletion_state_(UNKNOWN) {
 }
 
-void DeltaPreparer::Seek(rowid_t row_idx) {
-  prev_prepared_idx_ = row_idx;
+template<class Traits>
+void DeltaPreparer<Traits>::Seek(rowid_t row_idx) {
   cur_prepared_idx_ = row_idx;
+  prev_prepared_idx_ = row_idx;
   prepared_for_ = NOT_PREPARED;
 }
 
-void DeltaPreparer::Start(DeltaIterator::PrepareFlag flag) {
+template<class Traits>
+void DeltaPreparer<Traits>::Start(DeltaIterator::PrepareFlag flag) {
   if (updates_by_col_.empty()) {
     updates_by_col_.resize(opts_.projection->num_columns());
   }
@@ -82,7 +127,9 @@ void DeltaPreparer::Start(DeltaIterator::PrepareFlag flag) {
     ufc.clear();
   }
   deleted_.clear();
+  reinserted_.clear();
   prepared_deltas_.clear();
+  deletion_state_ = UNKNOWN;
   switch (flag) {
     case DeltaIterator::PREPARE_FOR_APPLY:
       prepared_for_ = PREPARED_FOR_APPLY;
@@ -95,24 +142,34 @@ void DeltaPreparer::Start(DeltaIterator::PrepareFlag flag) {
   }
 }
 
-void DeltaPreparer::Finish(size_t nrows) {
+template<class Traits>
+void DeltaPreparer<Traits>::Finish(size_t nrows) {
+  MaybeProcessPreviousRowChange(boost::none);
   prev_prepared_idx_ = cur_prepared_idx_;
   cur_prepared_idx_ += nrows;
 }
 
-Status DeltaPreparer::AddDelta(const DeltaKey& key, Slice val) {
-  if (!opts_.snap_to_include.IsCommitted(key.timestamp())) {
+template<class Traits>
+Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* finished_row) {
+  if (!IsDeltaRelevant<Traits::kType>(opts_.snap_to_include,
+                                      key.timestamp(), finished_row)) {
     return Status::OK();
   }
+  MaybeProcessPreviousRowChange(key.row_idx());
 
   if (prepared_for_ == PREPARED_FOR_APPLY) {
     RowChangeListDecoder decoder((RowChangeList(val)));
-    decoder.InitNoSafetyChecks();
-    DCHECK(!decoder.is_reinsert()) << "Reinserts are not supported in the DeltaMemStore.";
-    if (decoder.is_delete()) {
-      deleted_.emplace_back(key.row_idx());
+    if (Traits::kInitializeDecodersWithSafetyChecks) {
+      RETURN_NOT_OK(decoder.Init());
     } else {
-      DCHECK(decoder.is_update());
+      decoder.InitNoSafetyChecks();
+    }
+    if (!Traits::kAllowReinserts && decoder.is_reinsert()) {
+      LOG(DFATAL) << "Attempted to reinsert but not supported" << GetStackTrace();
+      return Status::InvalidArgument("Reinserts are not supported");
+    }
+    UpdateDeletionState(decoder.get_type());
+    if (!decoder.is_delete()) {
       while (decoder.HasNext()) {
         RowChangeListDecoder::DecodedUpdate dec;
         RETURN_NOT_OK(decoder.DecodeNext(&dec));
@@ -152,12 +209,14 @@ Status DeltaPreparer::AddDelta(const DeltaKey& key, Slice val) {
     prepared_deltas_.emplace_back(d);
   }
 
+  last_added_idx_ = key.row_idx();
   return Status::OK();
 }
 
-Status DeltaPreparer::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
+template<class Traits>
+Status DeltaPreparer<Traits>::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
   DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
-  DCHECK_EQ(cur_prepared_idx_ - prev_prepared_idx_, dst->nrows());
+  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->nrows());
 
   const ColumnSchema* col_schema = &opts_.projection->column(col_to_apply);
   for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
@@ -171,20 +230,28 @@ Status DeltaPreparer::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
   return Status::OK();
 }
 
-Status DeltaPreparer::ApplyDeletes(SelectionVector* sel_vec) {
+template<class Traits>
+Status DeltaPreparer<Traits>::ApplyDeletes(SelectionVector* sel_vec) {
   DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
-  DCHECK_EQ(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
+  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
 
   for (const auto& row_id : deleted_) {
     uint32_t idx_in_block = row_id - prev_prepared_idx_;
     sel_vec->SetRowUnselected(idx_in_block);
   }
 
+  for (const auto& row_id : reinserted_) {
+    uint32_t idx_in_block = row_id - prev_prepared_idx_;
+    sel_vec->SetRowSelected(idx_in_block);
+  }
+
   return Status::OK();
 }
 
-Status DeltaPreparer::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
+template<class Traits>
+Status DeltaPreparer<Traits>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
   DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
+  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->size());
   for (const PreparedDelta& src : prepared_deltas_) {
     DeltaKey key = src.key;
     RowChangeList changelist(src.val);
@@ -196,18 +263,52 @@ Status DeltaPreparer::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
   return Status::OK();
 }
 
-Status DeltaPreparer::FilterColumnIdsAndCollectDeltas(const vector<ColumnId>& /*col_ids*/,
-                                                      vector<DeltaKeyAndUpdate>* /*out*/,
-                                                      Arena* /*arena*/) {
-  LOG(DFATAL) << "Attempt to call FilterColumnIdsAndCollectDeltas on DMS" << GetStackTrace();
-  return Status::InvalidArgument("FilterColumsAndAppend() is not supported by DMSIterator");
+template<class Traits>
+Status DeltaPreparer<Traits>::FilterColumnIdsAndCollectDeltas(
+    const vector<ColumnId>& col_ids,
+    vector<DeltaKeyAndUpdate>* out,
+    Arena* arena) {
+  if (!Traits::kAllowFilterColumnIdsAndCollectDeltas) {
+    LOG(DFATAL) << "Attempted to call FilterColumnIdsAndCollectDeltas on DMS"
+                << GetStackTrace();
+    return Status::InvalidArgument(
+        "FilterColumnIdsAndCollectDeltas is not supported");
+  }
+
+  // May only be used on a fully inclusive snapshot.
+  DCHECK(opts_.snap_to_include.Equals(Traits::kType == REDO ?
+                                      MvccSnapshot::CreateSnapshotIncludingAllTransactions() :
+                                      MvccSnapshot::CreateSnapshotIncludingNoTransactions()));
+
+  faststring buf;
+  RowChangeListEncoder encoder(&buf);
+  for (const auto& src : prepared_deltas_) {
+    encoder.Reset();
+    RETURN_NOT_OK(
+        RowChangeListDecoder::RemoveColumnIdsFromChangeList(RowChangeList(src.val),
+                                                            col_ids,
+                                                            &encoder));
+    if (encoder.is_initialized()) {
+      RowChangeList rcl = encoder.as_changelist();
+      DeltaKeyAndUpdate upd;
+      upd.key = src.key;
+      CHECK(arena->RelocateSlice(rcl.slice(), &upd.cell));
+      out->emplace_back(upd);
+    }
+  }
+
+  return Status::OK();
 }
 
-bool DeltaPreparer::MayHaveDeltas() const {
+template<class Traits>
+bool DeltaPreparer<Traits>::MayHaveDeltas() const {
   DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
   if (!deleted_.empty()) {
     return true;
   }
+  if (!reinserted_.empty()) {
+    return true;
+  }
   for (auto& col : updates_by_col_) {
     if (!col.empty()) {
       return true;
@@ -216,6 +317,65 @@ bool DeltaPreparer::MayHaveDeltas() const {
   return false;
 }
 
+template<class Traits>
+void DeltaPreparer<Traits>::MaybeProcessPreviousRowChange(boost::optional<rowid_t> cur_row_idx) {
+  if (prepared_for_ == PREPARED_FOR_APPLY &&
+      last_added_idx_ &&
+      (!cur_row_idx || cur_row_idx != *last_added_idx_)) {
+    switch (deletion_state_) {
+      case DELETED:
+        deleted_.emplace_back(*last_added_idx_);
+        deletion_state_ = UNKNOWN;
+        break;
+      case REINSERTED:
+        reinserted_.emplace_back(*last_added_idx_);
+        deletion_state_ = UNKNOWN;
+        break;
+      default:
+        break;
+    }
+  }
+}
+
+template<class Traits>
+void DeltaPreparer<Traits>::UpdateDeletionState(RowChangeList::ChangeType op) {
+  // We can't use RowChangeListDecoder.TwiddleDeleteStatus because:
+  // 1. Our deletion status includes an additional UNKNOWN state.
+  // 2. The logical chain of DELETEs and REINSERTs for a given row may extend
+  //    across DeltaPreparer instances. For example, the same row may be deleted
+  //    in one delta file and reinserted in the next. But, because
+  //    DeltaPreparers cannot exchange this information in the context of batch
+  //    preparation, we have to allow any state transition from UNKNOWN.
+  //
+  // DELETE+REINSERT pairs are reset back to UNKNOWN: these rows were both
+  // deleted and reinserted in the same batch, so their states haven't actually changed.
+  if (op == RowChangeList::kDelete) {
+    DCHECK_NE(deletion_state_, DELETED);
+    if (deletion_state_ == UNKNOWN) {
+      deletion_state_ = DELETED;
+    } else {
+      DCHECK_EQ(deletion_state_, REINSERTED);
+      deletion_state_ = UNKNOWN;
+    }
+  } else {
+    DCHECK(op == RowChangeList::kUpdate || op == RowChangeList::kReinsert);
+    if (op == RowChangeList::kReinsert) {
+      DCHECK_NE(deletion_state_, REINSERTED);
+      if (deletion_state_ == UNKNOWN) {
+        deletion_state_ = REINSERTED;
+      } else {
+        DCHECK_EQ(deletion_state_, DELETED);
+        deletion_state_ = UNKNOWN;
+      }
+    }
+  }
+}
+
+// Explicit specialization for callers outside this compilation unit.
+template class DeltaPreparer<DMSPreparerTraits>;
+template class DeltaPreparer<DeltaFilePreparerTraits<REDO>>;
+template class DeltaPreparer<DeltaFilePreparerTraits<UNDO>>;
+
 Status DebugDumpDeltaIterator(DeltaType type,
                               DeltaIterator* iter,
                               const Schema& schema,

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 3f1c160..e9e8846 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -24,6 +24,9 @@
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
+
+#include "kudu/common/row_changelist.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/tablet/delta_key.h"
@@ -216,6 +219,26 @@ class DeltaIterator : public PreparedDeltas {
   virtual ~DeltaIterator() {}
 };
 
+// DeltaPreparer traits suited for a DMSIterator.
+struct DMSPreparerTraits {
+  static constexpr DeltaType kType = REDO;
+  static constexpr bool kAllowReinserts = false;
+  static constexpr bool kAllowFilterColumnIdsAndCollectDeltas = false;
+  static constexpr bool kInitializeDecodersWithSafetyChecks = false;
+};
+
+// DeltaPreparer traits suited for a DeltaFileIterator.
+//
+// This is just a partial specialization; the DeltaFileIterator is expected to
+// dictate the DeltaType.
+template<DeltaType Type>
+struct DeltaFilePreparerTraits {
+  static constexpr DeltaType kType = Type;
+  static constexpr bool kAllowReinserts = true;
+  static constexpr bool kAllowFilterColumnIdsAndCollectDeltas = true;
+  static constexpr bool kInitializeDecodersWithSafetyChecks = true;
+};
+
 // Encapsulates all logic and responsibility related to "delta preparation";
 // that is, the transformation of encoded deltas into an in-memory
 // representation more suitable for efficient service during iteration.
@@ -224,6 +247,7 @@ class DeltaIterator : public PreparedDeltas {
 // is responsible for loading encoded deltas from a backing store, passing them
 // to the DeltaPreparer to be transformed, and later, calling the DeltaPreparer
 // to serve the deltas.
+template <class Traits>
 class DeltaPreparer : public PreparedDeltas {
  public:
   explicit DeltaPreparer(RowIteratorOptions opts);
@@ -247,10 +271,14 @@ class DeltaPreparer : public PreparedDeltas {
 
   // Prepares the delta given by 'key' whose encoded changes are pointed to by 'val'.
   //
-  // Upon completion, it is safe for the memory behind 'val' to be destroyed.
+  // On success, the memory pointed to by 'val' can be destroyed. The
+  // 'finished_row' output parameter will be set if we can determine that all
+  // future deltas belonging to 'key.row_idx()' are irrelevant under the
+  // snapshot provided at preparer construction time; the caller can skip ahead
+  // to deltas belonging to the next row.
   //
   // Call when a new delta becomes available in DeltaIterator::PrepareBatch.
-  Status AddDelta(const DeltaKey& key, Slice val);
+  Status AddDelta(const DeltaKey& key, Slice val, bool* finished_row);
 
   Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
 
@@ -265,9 +293,24 @@ class DeltaPreparer : public PreparedDeltas {
   bool MayHaveDeltas() const override;
 
   rowid_t cur_prepared_idx() const { return cur_prepared_idx_; }
+  boost::optional<rowid_t> last_added_idx() const { return last_added_idx_; }
+  const RowIteratorOptions& opts() const { return opts_; }
 
  private:
-  // Options with which the DeltaPreparer was constructed.
+  // Checks whether we are done processing a row's deltas. If so, attempts to
+  // convert the row's latest deletion state into a saved deletion or
+  // reinsertion. By deferring this work to when a row is finished, we avoid
+  // creating unnecessary deletions and reinsertions for rows that are
+  // repeatedly deleted and reinserted.
+  //
+  // 'cur_row_idx' may be unset when there is no new row index, such as when
+  // called upon completion of an entire batch of deltas (i.e. from Finish()).
+  void MaybeProcessPreviousRowChange(boost::optional<rowid_t> cur_row_idx);
+
+  // Update the deletion state of the current row being processed based on 'op'.
+  void UpdateDeletionState(RowChangeList::ChangeType op);
+
+  // Options with which the DeltaPreparer's iterator was constructed.
   const RowIteratorOptions opts_;
 
   // The row index at which the most recent batch preparation ended.
@@ -276,6 +319,9 @@ class DeltaPreparer : public PreparedDeltas {
   // The value of 'cur_prepared_idx_' from the previous batch.
   rowid_t prev_prepared_idx_;
 
+  // The index of the row last added in AddDelta(), if one exists.
+  boost::optional<rowid_t> last_added_idx_;
+
   // Whether there are any prepared blocks.
   enum PreparedFor {
     // There are no prepared blocks. Attempts to call a PreparedDeltas function
@@ -283,9 +329,11 @@ class DeltaPreparer : public PreparedDeltas {
     NOT_PREPARED,
 
     // The DeltaPreparer has prepared a batch of deltas for applying. All deltas
-    // in the batch have been decoded. UPDATEs and REINSERTs have been coalesced
-    // into a column-major data structure suitable for ApplyUpdates. DELETES
-    // have been coalesced into a row-major data structure suitable for ApplyDeletes.
+    // in the batch have been decoded. Operations affecting row data (i.e.
+    // UPDATEs and REINSERTs) have been coalesced into a column-major data
+    // structure suitable for ApplyUpdates. Operations affecting row lifecycle
+    // (i.e. DELETES and REINSERTs) have been coalesced into a row-major data
+    // structure suitable for ApplyDeletes.
     //
     // ApplyUpdates and ApplyDeltas are now callable.
     PREPARED_FOR_APPLY,
@@ -308,6 +356,15 @@ class DeltaPreparer : public PreparedDeltas {
   typedef std::deque<ColumnUpdate> UpdatesForColumn;
   std::vector<UpdatesForColumn> updates_by_col_;
   std::deque<rowid_t> deleted_;
+  std::deque<rowid_t> reinserted_;
+
+  // The deletion state of the row last processed by AddDelta().
+  enum RowDeletionState {
+    UNKNOWN,
+    DELETED,
+    REINSERTED
+  };
+  RowDeletionState deletion_state_;
 
   // State when prepared_for_ == PREPARED_FOR_COLLECT
   // ------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/deltafile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 972d6ec..7c4a454 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -459,5 +459,74 @@ TYPED_TEST(DeltaTypeTestDeltaFile, TestFuzz) {
       /*test_filter_column_ids_and_collect_deltas=*/true));
 }
 
+// Performance benchmark that simulates the work of a DeltaApplier:
+// - Seeks to the first row.
+// - Prepares a batch of deltas.
+// - Initializes a selection vector and calls ApplyDeletes on it.
+// - Calls ApplyUpdates for each column in the projection.
+//
+// The deltas are randomly generated, as is the projection used in iteration.
+TYPED_TEST(DeltaTypeTestDeltaFile, BenchmarkPrepareAndApply) {
+  const int kNumColumns = 100;
+  const int kNumColumnsToScan = 10;
+  const int kNumUpdates = 10000;
+  const int kMaxRow = 1000;
+  const int kMaxTimestamp = 100;
+  const int kNumScans = 10;
+
+  // Build a schema with kNumColumns columns.
+  SchemaBuilder sb;
+  for (int i = 0; i < kNumColumns; i++) {
+    ASSERT_OK(sb.AddColumn(Substitute("col$0", i), UINT32));
+  }
+  Schema schema(sb.Build());
+
+  Random prng(SeedRandom());
+  MirroredDeltas<TypeParam> deltas(&schema);
+
+  // Populate a delta file with random deltas.
+  shared_ptr<DeltaFileReader> reader;
+  ASSERT_OK(CreateRandomDeltaFile<TypeParam>(
+      schema, this->fs_manager_.get(), &prng,
+      kNumUpdates, { 0, kMaxRow },  { 0, kMaxTimestamp }, &deltas, &reader));
+
+  for (int i = 0; i < kNumScans; i++) {
+    // Create a random projection with kNumColumnsToScan columns.
+    Schema projection = GetRandomProjection(schema, &prng, kNumColumnsToScan);
+
+    // Create an iterator at a randomly chosen timestamp.
+    Timestamp ts = Timestamp(prng.Uniform(kMaxTimestamp));
+    RowIteratorOptions opts;
+    opts.projection = &projection;
+    opts.snap_to_include = MvccSnapshot(ts);
+    DeltaIterator* raw_iter;
+    Status s = reader->NewDeltaIterator(opts, &raw_iter);
+    if (s.IsNotFound()) {
+      ASSERT_STR_CONTAINS(s.ToString(), "MvccSnapshot outside the range of this delta");
+      continue;
+    }
+    ASSERT_OK(s);
+    unique_ptr<DeltaIterator> iter(raw_iter);
+    ASSERT_OK(iter->Init(nullptr));
+
+    // Scan from the iterator as if we were a DeltaApplier.
+    ASSERT_OK(iter->SeekToOrdinal(0));
+    ASSERT_OK(iter->PrepareBatch(kMaxRow, DeltaIterator::PREPARE_FOR_APPLY));
+    SelectionVector sel_vec(kMaxRow);
+    sel_vec.SetAllTrue();
+    ASSERT_OK(iter->ApplyDeletes(&sel_vec));
+    if (!sel_vec.AnySelected()) {
+      continue;
+    }
+    ScopedColumnBlock<UINT32> scb(kMaxRow);
+    for (int j = 0; j < kMaxRow; j++) {
+      scb[j] = 0;
+    }
+    for (int j = 0; j < projection.num_columns(); j++) {
+      ASSERT_OK(iter->ApplyUpdates(j, &scb));
+    }
+  }
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 9a1be53..3a0f017 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -27,11 +27,10 @@
 #include <gflags/gflags_declare.h>
 
 #include "kudu/cfile/binary_plain_block.h"
-#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/cfile_writer.h"
-#include "kudu/common/columnblock.h"
+#include "kudu/cfile/index_btree.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowblock.h"
@@ -43,15 +42,13 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/trace.h"
 
@@ -338,8 +335,12 @@ Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
     TRACE_COUNTER_INCREMENT("delta_iterators_relevant", 1);
     // Ugly cast, but it lets the iterator fully initialize the reader
     // during its first seek.
-    *iterator = new DeltaFileIterator(
-        const_cast<DeltaFileReader*>(this)->shared_from_this(), opts, delta_type_);
+    auto s_this = const_cast<DeltaFileReader*>(this)->shared_from_this();
+    if (delta_type_ == REDO) {
+      *iterator = new DeltaFileIterator<REDO>(std::move(s_this), opts);
+    } else {
+      *iterator = new DeltaFileIterator<UNDO>(std::move(s_this), opts);
+    }
     return Status::OK();
   }
   VLOG(2) << "Culling "
@@ -398,20 +399,18 @@ uint64_t DeltaFileReader::EstimateSize() const {
 // DeltaFileIterator
 ////////////////////////////////////////////////////////////
 
-DeltaFileIterator::DeltaFileIterator(shared_ptr<DeltaFileReader> dfr,
-                                     RowIteratorOptions opts,
-                                     DeltaType delta_type)
+template<DeltaType Type>
+DeltaFileIterator<Type>::DeltaFileIterator(shared_ptr<DeltaFileReader> dfr,
+                                           RowIteratorOptions opts)
     : dfr_(std::move(dfr)),
-      opts_(std::move(opts)),
-      prepared_idx_(0xdeadbeef),
-      prepared_count_(0),
+      preparer_(std::move(opts)),
       prepared_(false),
       exhausted_(false),
       initted_(false),
-      delta_type_(delta_type),
       cache_blocks_(CFileReader::CACHE_BLOCK) {}
 
-Status DeltaFileIterator::Init(ScanSpec* spec) {
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::Init(ScanSpec* spec) {
   DCHECK(!initted_) << "Already initted";
 
   if (spec) {
@@ -423,17 +422,18 @@ Status DeltaFileIterator::Init(ScanSpec* spec) {
   return Status::OK();
 }
 
-Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) {
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
   DCHECK(initted_) << "Must call Init()";
 
   // Finish the initialization of any lazily-initialized state.
-  RETURN_NOT_OK(dfr_->Init(opts_.io_context));
+  RETURN_NOT_OK(dfr_->Init(preparer_.opts().io_context));
 
   // Check again whether this delta file is relevant given the snapshot
   // that we are querying. We did this already before creating the
   // DeltaFileIterator, but due to lazy initialization, it's possible
   // that we weren't able to check at that time.
-  if (!dfr_->IsRelevantForSnapshot(opts_.snap_to_include)) {
+  if (!dfr_->IsRelevantForSnapshot(preparer_.opts().snap_to_include)) {
     exhausted_ = true;
     delta_blocks_.clear();
     return Status::OK();
@@ -441,7 +441,7 @@ Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) {
 
   if (!index_iter_) {
     index_iter_.reset(IndexTreeIterator::Create(
-        opts_.io_context,
+        preparer_.opts().io_context,
         dfr_->cfile_reader().get(),
         dfr_->cfile_reader()->validx_root()));
   }
@@ -461,22 +461,23 @@ Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) {
   }
   RETURN_NOT_OK(s);
 
-  prepared_idx_ = idx;
-  prepared_count_ = 0;
+  preparer_.Seek(idx);
   prepared_ = false;
   delta_blocks_.clear();
   exhausted_ = false;
   return Status::OK();
 }
 
-Status DeltaFileIterator::ReadCurrentBlockOntoQueue() {
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
   DCHECK(initted_) << "Must call Init()";
   DCHECK(index_iter_) << "Must call SeekToOrdinal()";
 
   unique_ptr<PreparedDeltaBlock> pdb(new PreparedDeltaBlock());
   BlockPointer dblk_ptr = index_iter_->GetCurrentBlockPointer();
   shared_ptr<CFileReader> reader = dfr_->cfile_reader();
-  RETURN_NOT_OK(reader->ReadBlock(opts_.io_context, dblk_ptr, cache_blocks_, &pdb->block_));
+  RETURN_NOT_OK(reader->ReadBlock(preparer_.opts().io_context,
+                                  dblk_ptr, cache_blocks_, &pdb->block_));
 
   // The data has been successfully read. Finish creating the decoder.
   pdb->prepared_block_start_idx_ = 0;
@@ -502,7 +503,8 @@ Status DeltaFileIterator::ReadCurrentBlockOntoQueue() {
   return Status::OK();
 }
 
-Status DeltaFileIterator::GetFirstRowIndexInCurrentBlock(rowid_t *idx) {
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::GetFirstRowIndexInCurrentBlock(rowid_t *idx) {
   DCHECK(index_iter_) << "Must call SeekToOrdinal()";
 
   Slice index_entry = index_iter_->GetCurrentKey();
@@ -512,8 +514,9 @@ Status DeltaFileIterator::GetFirstRowIndexInCurrentBlock(rowid_t *idx) {
   return Status::OK();
 }
 
-Status DeltaFileIterator::GetLastRowIndexInDecodedBlock(const BinaryPlainBlockDecoder &dec,
-                                                        rowid_t *idx) {
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::GetLastRowIndexInDecodedBlock(const BinaryPlainBlockDecoder &dec,
+                                                              rowid_t *idx) {
   DCHECK_GT(dec.Count(), 0);
   Slice s(dec.string_at_index(dec.Count() - 1));
   DeltaKey k;
@@ -522,19 +525,20 @@ Status DeltaFileIterator::GetLastRowIndexInDecodedBlock(const BinaryPlainBlockDe
   return Status::OK();
 }
 
-
-string DeltaFileIterator::PreparedDeltaBlock::ToString() const {
+template<DeltaType Type>
+string DeltaFileIterator<Type>::PreparedDeltaBlock::ToString() const {
   return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
                       block_ptr_.ToString().c_str());
 }
 
-Status DeltaFileIterator::PrepareBatch(size_t nrows, PrepareFlag flag) {
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, PrepareFlag flag) {
   DCHECK(initted_) << "Must call Init()";
   DCHECK(exhausted_ || index_iter_) << "Must call SeekToOrdinal()";
 
   CHECK_GT(nrows, 0);
 
-  rowid_t start_row = prepared_idx_ + prepared_count_;
+  rowid_t start_row = preparer_.cur_prepared_idx();
   rowid_t stop_row = start_row + nrows - 1;
 
   // Remove blocks from our list which are no longer relevant to the range
@@ -581,21 +585,21 @@ Status DeltaFileIterator::PrepareBatch(size_t nrows, PrepareFlag flag) {
   VLOG(2) << "Done preparing deltas for " << start_row << "-" << stop_row
           << ": row block spans " << delta_blocks_.size() << " delta blocks";
   #endif
-  prepared_idx_ = start_row;
-  prepared_count_ = nrows;
   prepared_ = true;
+
+  preparer_.Start(flag);
+  RETURN_NOT_OK(AddDeltas(start_row, stop_row));
+  preparer_.Finish(nrows);
   return Status::OK();
 }
 
-template<class Visitor>
-Status DeltaFileIterator::VisitMutations(Visitor *visitor) {
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::AddDeltas(rowid_t start_row, rowid_t stop_row) {
   DCHECK(prepared_) << "must Prepare";
 
-  rowid_t start_row = prepared_idx_;
-
   for (auto& block : delta_blocks_) {
-    BinaryPlainBlockDecoder& bpd = *block->decoder_;
-    DVLOG(2) << "Visiting delta block " << block->first_updated_idx_ << "-"
+    const BinaryPlainBlockDecoder& bpd = *block->decoder_;
+    DVLOG(2) << "Adding deltas from delta block " << block->first_updated_idx_ << "-"
              << block->last_updated_idx_ << " for row block starting at " << start_row;
 
     if (PREDICT_FALSE(start_row > block->last_updated_idx_)) {
@@ -608,40 +612,47 @@ Status DeltaFileIterator::VisitMutations(Visitor *visitor) {
       continue;
     }
 
-    rowid_t previous_rowidx = MathLimits<rowid_t>::kMax;
-    bool continue_visit = true;
+    bool finished_row = false;
     for (int i = block->prepared_block_start_idx_; i < bpd.Count(); i++) {
       Slice slice = bpd.string_at_index(i);
 
       // Decode and check the ID of the row we're going to update.
       DeltaKey key;
       RETURN_NOT_OK(key.DecodeFrom(&slice));
-      rowid_t row_idx = key.row_idx();
 
-      // Check if the previous visitor notified us we don't need to apply more
-      // mutations to this row and skip if we don't.
-      if (row_idx == previous_rowidx && !continue_visit) {
+      // If this delta is for the same row as before, skip it if the previous
+      // AddDelta() call told us that we're done with this row.
+      if (preparer_.last_added_idx() &&
+          preparer_.last_added_idx() == key.row_idx() &&
+          finished_row) {
         continue;
-      } else {
-        previous_rowidx = row_idx;
-        continue_visit = true;
       }
+      finished_row = false;
 
       // Check that the delta is within the block we're currently processing.
-      if (row_idx >= start_row + prepared_count_) {
+      if (key.row_idx() > stop_row) {
         // Delta is for a row which comes after the block we're processing.
         return Status::OK();
-      } else if (row_idx < start_row) {
+      }
+      if (key.row_idx() < start_row) {
         // Delta is for a row which comes before the block we're processing.
         continue;
       }
-      RETURN_NOT_OK(visitor->Visit(key, slice, &continue_visit));
+
+      // Note: if AddDelta sets 'finished_row' to true, we could skip the
+      // remaining deltas for this row by seeking the block decoder. This trades
+      // off the cost of a seek against the cost of decoding some irrelevant delta keys.
+      //
+      // Given that updates are expected to be uncommon and that most scans are
+      // _not_ historical, the current implementation eschews seeking in favor of
+      // skipping irrelevant deltas one by one.
+      RETURN_NOT_OK(preparer_.AddDelta(key, slice, &finished_row));
       if (VLOG_IS_ON(3)) {
         RowChangeList rcl(slice);
-        DVLOG(3) << "Visited " << DeltaType_Name(delta_type_)
+        DVLOG(3) << "Visited " << DeltaType_Name(DeltaTypeSelector<Type>::kTag)
                  << " delta for key: " << key.ToString() << " Mut: "
-                 << rcl.ToString(*opts_.projection) << " Continue?: "
-                 << (continue_visit ? "TRUE" : "FALSE");
+                 << rcl.ToString(*preparer_.opts().projection)
+                 << " Continue?: " << (!finished_row ? "TRUE" : "FALSE");
       }
     }
   }
@@ -649,294 +660,49 @@ Status DeltaFileIterator::VisitMutations(Visitor *visitor) {
   return Status::OK();
 }
 
-// Returns whether a REDO mutation with 'timestamp' is relevant under 'snap'.
-// If snap cannot include any mutations with a higher timestamp 'continue_visit' is
-// set to false, it's set to true otherwise.
-inline bool IsRedoRelevant(const MvccSnapshot& snap,
-                           const Timestamp& timestamp,
-                           bool* continue_visit) {
-  *continue_visit = true;
-  if (!snap.IsCommitted(timestamp)) {
-    if (!snap.MayHaveCommittedTransactionsAtOrAfter(timestamp)) {
-      *continue_visit = false;
-    }
-    return false;
-  }
-  return true;
-}
-
-// Returns whether an UNDO mutation with 'timestamp' is relevant under 'snap'.
-// If snap cannot include any mutations with a lower timestamp 'continue_visit' is
-// set to false, it's set to true otherwise.
-inline bool IsUndoRelevant(const MvccSnapshot& snap,
-                           const Timestamp& timestamp,
-                           bool* continue_visit) {
-  *continue_visit = true;
-  if (snap.IsCommitted(timestamp)) {
-    if (!snap.MayHaveUncommittedTransactionsAtOrBefore(timestamp)) {
-      *continue_visit = false;
-    }
-    return false;
-  }
-  return true;
-}
-
 template<DeltaType Type>
-struct ApplyingVisitor {
-
-  Status Visit(const DeltaKey &key, const Slice &deltas, bool* continue_visit);
-
-  inline Status ApplyMutation(const DeltaKey &key, const Slice &deltas) {
-    int64_t rel_idx = key.row_idx() - dfi->prepared_idx_;
-    DCHECK_GE(rel_idx, 0);
-
-    // TODO(todd): this code looks eerily similar to DMSIterator::ApplyUpdates!
-    // I bet it can be combined.
-
-    const Schema* schema = dfi->opts_.projection;
-    RowChangeListDecoder decoder((RowChangeList(deltas)));
-    RETURN_NOT_OK(decoder.Init());
-    if (decoder.is_update() || decoder.is_reinsert()) {
-      return decoder.ApplyToOneColumn(rel_idx, dst, *schema, col_to_apply, dst->arena());
-    }
-
-    DCHECK(decoder.is_delete());
-    // If it's a DELETE, then it will be processed by LivenessVisitor.
-    return Status::OK();
-  }
-
-  DeltaFileIterator *dfi;
-  size_t col_to_apply;
-  ColumnBlock *dst;
-};
-
-template<>
-inline Status ApplyingVisitor<REDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas,
-                                           bool* continue_visit) {
-  if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
-    DVLOG(3) << "Applied redo delta";
-    return ApplyMutation(key, deltas);
-  }
-  DVLOG(3) << "Redo delta uncommitted, skipped applying.";
-  return Status::OK();
+Status DeltaFileIterator<Type>::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
+  return preparer_.ApplyUpdates(col_to_apply, dst);
 }
 
-template<>
-inline Status ApplyingVisitor<UNDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas,
-                                           bool* continue_visit) {
-  if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
-    DVLOG(3) << "Applied undo delta";
-    return ApplyMutation(key, deltas);
-  }
-  DVLOG(3) << "Undo delta committed, skipped applying.";
-  return Status::OK();
-}
-
-Status DeltaFileIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
-  DCHECK_LE(prepared_count_, dst->nrows());
-
-  if (delta_type_ == REDO) {
-    DVLOG(3) << "Applying REDO mutations to " << col_to_apply;
-    ApplyingVisitor<REDO> visitor = {this, col_to_apply, dst};
-    return VisitMutations(&visitor);
-  }
-  DVLOG(3) << "Applying UNDO mutations to " << col_to_apply;
-  ApplyingVisitor<UNDO> visitor = {this, col_to_apply, dst};
-  return VisitMutations(&visitor);
-}
-
-// Visitor which establishes the liveness of a row by applying deletes and reinserts.
 template<DeltaType Type>
-struct LivenessVisitor {
-
-  Status Visit(const DeltaKey &key, const Slice &deltas, bool* continue_visit);
-
-  inline Status ApplyDelete(const DeltaKey &key, const Slice &deltas) {
-    int64_t rel_idx = key.row_idx() - dfi->prepared_idx_;
-    DCHECK_GE(rel_idx, 0);
-
-    RowChangeListDecoder decoder((RowChangeList(deltas)));
-    RETURN_NOT_OK(decoder.Init());
-    if (decoder.is_update()) {
-      DVLOG(3) << "Didn't delete row (update)";
-      // If this is an update the row must be selected.
-      DCHECK(sel_vec->IsRowSelected(rel_idx));
-      return Status::OK();
-    }
-
-    if (decoder.is_delete()) {
-      DVLOG(3) << "Row deleted";
-      sel_vec->SetRowUnselected(rel_idx);
-      return Status::OK();
-    }
-
-    DCHECK(decoder.is_reinsert());
-    DVLOG(3) << "Re-selected the row (reinsert)";
-    // If this is a reinsert the row must be unselected.
-    DCHECK(!sel_vec->IsRowSelected(rel_idx));
-    sel_vec->SetRowSelected(rel_idx);
-    return Status::OK();
-  }
-
-  DeltaFileIterator *dfi;
-  SelectionVector *sel_vec;
-};
-
-template<>
-inline Status LivenessVisitor<REDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas,
-                                           bool* continue_visit) {
-  if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
-    return ApplyDelete(key, deltas);
-  }
-  return Status::OK();
-}
-
-template<>
-inline Status LivenessVisitor<UNDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas,
-                                           bool* continue_visit) {
-  if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
-    return ApplyDelete(key, deltas);
-  }
-  return Status::OK();
-}
-
-
-Status DeltaFileIterator::ApplyDeletes(SelectionVector* sel_vec) {
-  DCHECK_LE(prepared_count_, sel_vec->nrows());
-  if (delta_type_ == REDO) {
-    DVLOG(3) << "Applying REDO deletes";
-    LivenessVisitor<REDO> visitor = { this, sel_vec };
-    return VisitMutations(&visitor);
-  }
-  DVLOG(3) << "Applying UNDO deletes";
-  LivenessVisitor<UNDO> visitor = { this, sel_vec };
-  return VisitMutations(&visitor);
+Status DeltaFileIterator<Type>::ApplyDeletes(SelectionVector* sel_vec) {
+  return preparer_.ApplyDeletes(sel_vec);
 }
 
-// Visitor which, for each mutation, adds it into a ColumnBlock of
-// Mutation *s, prepending to each linked list. See CollectMutations().
 template<DeltaType Type>
-struct CollectingVisitor {
-
-  Status Visit(const DeltaKey &key, const Slice &deltas, bool* continue_visit);
-
-  Status Collect(const DeltaKey &key, const Slice &deltas) {
-    int64_t rel_idx = key.row_idx() - dfi->prepared_idx_;
-    DCHECK_GE(rel_idx, 0);
-
-    RowChangeList changelist(deltas);
-    Mutation *mutation = Mutation::CreateInArena(dst_arena, key.timestamp(), changelist);
-    mutation->PrependToList(&dst->at(rel_idx));
-
-    return Status::OK();
-  }
-
-  DeltaFileIterator *dfi;
-  vector<Mutation *> *dst;
-  Arena *dst_arena;
-};
-
-template<>
-inline Status CollectingVisitor<REDO>::Visit(const DeltaKey& key,
-                                             const Slice& deltas,
-                                             bool* continue_visit) {
-  if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
-    return Collect(key, deltas);
-  }
-  return Status::OK();
-}
-
-template<>
-inline Status CollectingVisitor<UNDO>::Visit(const DeltaKey& key,
-                                             const Slice& deltas,
-                                             bool* continue_visit) {
-  if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
-    return Collect(key, deltas);
-  }
-  return Status::OK();
-}
-
-Status DeltaFileIterator::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
-  DCHECK_LE(prepared_count_, dst->size());
-  if (delta_type_ == REDO) {
-    CollectingVisitor<REDO> visitor = {this, dst, arena};
-    return VisitMutations(&visitor);
-  }
-  CollectingVisitor<UNDO> visitor = {this, dst, arena};
-  return VisitMutations(&visitor);
+Status DeltaFileIterator<Type>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
+  return preparer_.CollectMutations(dst, arena);
 }
 
-bool DeltaFileIterator::HasNext() {
+template<DeltaType Type>
+bool DeltaFileIterator<Type>::HasNext() {
   return !exhausted_ || !delta_blocks_.empty();
 }
 
-bool DeltaFileIterator::MayHaveDeltas() const {
-  // TODO(awong): change the API to take in the col_to_apply and check for
-  // deltas on that column only.
-  DCHECK(prepared_) << "must Prepare";
-  for (auto& block : delta_blocks_) {
-    BinaryPlainBlockDecoder& bpd = *block->decoder_;
-    if (PREDICT_FALSE(prepared_idx_ > block->last_updated_idx_)) {
-      continue;
-    }
-    if (block->prepared_block_start_idx_ < bpd.Count()) {
-      return true;
-    }
-  }
-  return false;
+template<DeltaType Type>
+bool DeltaFileIterator<Type>::MayHaveDeltas() const {
+  return preparer_.MayHaveDeltas();
 }
 
-string DeltaFileIterator::ToString() const {
+template<DeltaType Type>
+string DeltaFileIterator<Type>::ToString() const {
   return "DeltaFileIterator(" + dfr_->ToString() + ")";
 }
 
-struct FilterAndAppendVisitor {
-
-  Status Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) {
-
-    // FilterAndAppendVisitor visitor visits all mutations.
-    *continue_visit = true;
-
-    faststring buf;
-    RowChangeListEncoder enc(&buf);
-    RETURN_NOT_OK(
-        RowChangeListDecoder::RemoveColumnIdsFromChangeList(RowChangeList(deltas),
-                                                            col_ids,
-                                                            &enc));
-    if (enc.is_initialized()) {
-      RowChangeList rcl = enc.as_changelist();
-      DeltaKeyAndUpdate upd;
-      upd.key = key;
-      CHECK(arena->RelocateSlice(rcl.slice(), &upd.cell));
-      out->push_back(upd);
-    }
-    // if enc.is_initialized() return false, that means deltas only
-    // contained the specified columns.
-    return Status::OK();
-  }
-
-  const DeltaFileIterator* dfi;
-  const vector<ColumnId>& col_ids;
-  vector<DeltaKeyAndUpdate>* out;
-  Arena* arena;
-};
-
-Status DeltaFileIterator::FilterColumnIdsAndCollectDeltas(
+template<DeltaType Type>
+Status DeltaFileIterator<Type>::FilterColumnIdsAndCollectDeltas(
     const vector<ColumnId>& col_ids,
     vector<DeltaKeyAndUpdate>* out,
     Arena* arena) {
-  FilterAndAppendVisitor visitor = {this, col_ids, out, arena};
-  return VisitMutations(&visitor);
+  return preparer_.FilterColumnIdsAndCollectDeltas(col_ids, out, arena);
 }
 
-void DeltaFileIterator::FatalUnexpectedDelta(const DeltaKey &key, const Slice &deltas,
+template<DeltaType Type>
+void DeltaFileIterator<Type>::FatalUnexpectedDelta(const DeltaKey &key, const Slice &deltas,
                                              const string &msg) {
   LOG(FATAL) << "Saw unexpected delta type in deltafile " << dfr_->ToString() << ": "
-             << " rcl=" << RowChangeList(deltas).ToString(*opts_.projection)
+             << " rcl=" << RowChangeList(deltas).ToString(*preparer_.opts().projection)
              << " key=" << key.ToString() << " (" << msg << ")";
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 439aa48..8ec98ce 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -26,12 +26,10 @@
 
 #include <glog/logging.h>
 
-#include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_writer.h"
-#include "kudu/cfile/index_btree.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
@@ -40,7 +38,6 @@
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
-#include "kudu/tablet/rowset.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/once.h"
 #include "kudu/util/slice.h"
@@ -58,11 +55,9 @@ class ScanSpec;
 class SelectionVector;
 struct ColumnId;
 
-namespace tablet {
-class MvccSnapshot;
-} // namespace tablet
-
 namespace cfile {
+class BinaryPlainBlockDecoder;
+class IndexTreeIterator;
 struct ReaderOptions;
 } // namespace cfile
 
@@ -76,12 +71,8 @@ struct IOContext;
 namespace tablet {
 
 class Mutation;
-template<DeltaType Type>
-struct ApplyingVisitor;
-template<DeltaType Type>
-struct CollectingVisitor;
-template<DeltaType Type>
-struct LivenessVisitor;
+class MvccSnapshot;
+struct RowIteratorOptions;
 
 class DeltaFileWriter {
  public:
@@ -200,6 +191,7 @@ class DeltaFileReader : public DeltaStore,
                            std::shared_ptr<DeltaFileReader>* out) const;
 
  private:
+  template<DeltaType Type>
   friend class DeltaFileIterator;
 
   DISALLOW_COPY_AND_ASSIGN(DeltaFileReader);
@@ -228,6 +220,7 @@ class DeltaFileReader : public DeltaStore,
 // Iterator over the deltas contained in a delta file.
 //
 // See DeltaIterator for details.
+template <DeltaType Type>
 class DeltaFileIterator : public DeltaIterator {
  public:
   Status Init(ScanSpec* spec) override;
@@ -254,13 +247,6 @@ class DeltaFileIterator : public DeltaIterator {
 
  private:
   friend class DeltaFileReader;
-  friend struct ApplyingVisitor<REDO>;
-  friend struct ApplyingVisitor<UNDO>;
-  friend struct CollectingVisitor<REDO>;
-  friend struct CollectingVisitor<UNDO>;
-  friend struct LivenessVisitor<REDO>;
-  friend struct LivenessVisitor<UNDO>;
-  friend struct FilterAndAppendVisitor;
 
   DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);
 
@@ -303,8 +289,7 @@ class DeltaFileIterator : public DeltaIterator {
 
   // The pointers in 'opts' and 'dfr' must remain valid for the lifetime of the iterator.
   DeltaFileIterator(std::shared_ptr<DeltaFileReader> dfr,
-                    RowIteratorOptions opts,
-                    DeltaType delta_type);
+                    RowIteratorOptions opts);
 
   // Determine the row index of the first update in the block currently
   // pointed to by index_iter_.
@@ -318,10 +303,7 @@ class DeltaFileIterator : public DeltaIterator {
   // onto the end of the delta_blocks_ queue.
   Status ReadCurrentBlockOntoQueue();
 
-  // Visit all mutations in the currently prepared row range with the specified
-  // visitor class.
-  template<class Visitor>
-  Status VisitMutations(Visitor *visitor);
+  Status AddDeltas(rowid_t start_row, rowid_t stop_row);
 
   // Log a FATAL error message about a bad delta.
   void FatalUnexpectedDelta(const DeltaKey &key, const Slice &deltas,
@@ -329,13 +311,10 @@ class DeltaFileIterator : public DeltaIterator {
 
   std::shared_ptr<DeltaFileReader> dfr_;
 
-  const RowIteratorOptions opts_;
+  DeltaPreparer<DeltaFilePreparerTraits<Type>> preparer_;
 
   gscoped_ptr<cfile::IndexTreeIterator> index_iter_;
 
-  // TODO: add better comments here.
-  rowid_t prepared_idx_;
-  uint32_t prepared_count_;
   bool prepared_;
   bool exhausted_;
   bool initted_;
@@ -347,12 +326,6 @@ class DeltaFileIterator : public DeltaIterator {
   // Temporary buffer used in seeking.
   faststring tmp_buf_;
 
-  // Temporary buffer used for RowChangeList projection.
-  faststring delta_buf_;
-
-  // The type of this delta iterator, i.e. UNDO or REDO.
-  const DeltaType delta_type_;
-
   cfile::CFileReader::CacheControl cache_blocks_;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/deltamemstore-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc
index c954bd0..856a4aa 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -47,7 +47,6 @@
 #include "kudu/consensus/opid_util.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/casts.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -70,7 +69,13 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-DEFINE_int32(benchmark_num_passes, 100, "Number of passes to apply deltas in the benchmark");
+DEFINE_int32(benchmark_num_passes,
+#ifdef NDEBUG
+             100,
+#else
+             1,
+#endif
+             "Number of passes to apply deltas in the benchmark");
 
 using std::shared_ptr;
 using std::string;
@@ -290,6 +295,52 @@ TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
   }
 }
 
+class TestDeltaMemStoreNumUpdates : public TestDeltaMemStore,
+                                    public ::testing::WithParamInterface<int> {
+};
+
+INSTANTIATE_TEST_CASE_P(DifferentNumUpdates,
+                        TestDeltaMemStoreNumUpdates, ::testing::Values(2, 20, 200));
+
+TEST_P(TestDeltaMemStoreNumUpdates, BenchmarkSnapshotScans) {
+  const int kNumRows = 100;
+
+  // Populate the DMS with kNumRows * GetParam() updates. For each row, every
+  // update is at a different timestamp.
+  faststring buf;
+  RowChangeListEncoder update(&buf);
+  LOG_TIMING(INFO, Substitute("updating $0 rows $1 times each", kNumRows, GetParam())) {
+    for (rowid_t row_idx = 0; row_idx < kNumRows; row_idx++) {
+      for (int ts_val = 0; ts_val < GetParam(); ts_val++) {
+        update.Reset();
+
+        Timestamp ts(ts_val);
+        uint32_t new_val = ts_val;
+        update.AddColumnUpdate(schema_.column(kIntColumn),
+                               schema_.column_id(kIntColumn), &new_val);
+        CHECK_OK(dms_->Update(ts, row_idx, RowChangeList(buf), op_id_));
+      }
+    }
+  }
+
+  // Now scan the DMS at each timestamp. The scans are repeated in a number of
+  // passes to stabilize the results.
+  ScopedColumnBlock<UINT32> ints(kNumRows);
+  LOG_TIMING(INFO, Substitute("running $0 scans for each timestamp",
+                              FLAGS_benchmark_num_passes)) {
+    for (int ts_val = 0; ts_val < GetParam(); ts_val++) {
+      LOG_TIMING(INFO, Substitute("running $0 scans at timestamp $1",
+                                  FLAGS_benchmark_num_passes, ts_val)) {
+        for (int pass = 0; pass < FLAGS_benchmark_num_passes; pass++) {
+          Timestamp ts(ts_val);
+          MvccSnapshot snap(ts);
+          NO_FATALS(ApplyUpdates(snap, 0, kIntColumn, &ints));
+        }
+      }
+    }
+  }
+}
+
 // Test when a slice column has been updated multiple times in the
 // memrowset that the referred to values properly end up in the
 // right arena.
@@ -466,7 +517,7 @@ TEST_F(TestDeltaMemStore, TestIteratorDoesUpdates) {
   }
   ASSERT_OK(s);
 
-  gscoped_ptr<DMSIterator> iter(down_cast<DMSIterator *>(raw_iter));
+  unique_ptr<DeltaIterator> iter(raw_iter);
   ASSERT_OK(iter->Init(nullptr));
 
   int block_start_row = 50;
@@ -514,8 +565,7 @@ TEST_F(TestDeltaMemStore, TestCollectMutations) {
   }
   ASSERT_OK(s);
 
-  gscoped_ptr<DMSIterator> iter(down_cast<DMSIterator *>(raw_iter));
-
+  unique_ptr<DeltaIterator> iter(raw_iter);
   ASSERT_OK(iter->Init(nullptr));
   ASSERT_OK(iter->SeekToOrdinal(0));
   ASSERT_OK(iter->PrepareBatch(kBatchSize, DeltaIterator::PREPARE_FOR_COLLECT));

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index f68727b..212873d 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -20,6 +20,7 @@
 #include <ostream>
 #include <utility>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
 #include "kudu/common/row_changelist.h"
@@ -234,14 +235,40 @@ Status DMSIterator::PrepareBatch(size_t nrows, PrepareFlag flag) {
   rowid_t stop_row = start_row + nrows - 1;
   preparer_.Start(flag);
 
+  bool finished_row = false;
   while (iter_->IsValid()) {
     Slice key_slice, val;
     iter_->GetCurrentEntry(&key_slice, &val);
     DeltaKey key;
     RETURN_NOT_OK(key.DecodeFrom(&key_slice));
-    DCHECK_GE(key.row_idx(), start_row);
-    if (key.row_idx() > stop_row) break;
-    RETURN_NOT_OK(preparer_.AddDelta(key, val));
+    rowid_t cur_row = key.row_idx();
+    DCHECK_GE(cur_row, start_row);
+
+    // If this delta is for the same row as before, skip it if the previous
+    // AddDelta() call told us that we're done with this row.
+    if (preparer_.last_added_idx() &&
+        preparer_.last_added_idx() == cur_row &&
+        finished_row) {
+      iter_->Next();
+      continue;
+    }
+    finished_row = false;
+
+    if (cur_row > stop_row) {
+      // Delta is for a row which comes after the block we're processing.
+      break;
+    }
+
+    // Note: if AddDelta() sets 'finished_row' to true, we could skip the
+    // remaining deltas for this row by seeking the tree iterator. This trades
+    // off the cost of a seek against the cost of decoding some irrelevant delta
+    // keys. Experimentation with a microbenchmark revealed that only when ~50
+    // deltas were skipped was the seek cheaper than the decoding.
+    //
+    // Given that updates are expected to be uncommon and that most scans are
+    // _not_ historical, the current implementation eschews seeking in favor of
+    // skipping irrelevant deltas one by one.
+    RETURN_NOT_OK(preparer_.AddDelta(key, val, &finished_row));
     iter_->Next();
   }
   preparer_.Finish(nrows);

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/deltamemstore.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 66e8c62..fa093b3 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -23,8 +23,6 @@
 #include <string>
 #include <vector>
 
-#include <gtest/gtest_prod.h>
-
 #include "kudu/common/rowid.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/gutil/atomicops.h"
@@ -221,8 +219,6 @@ class DMSIterator : public DeltaIterator {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(DMSIterator);
-  FRIEND_TEST(TestDeltaMemStore, TestIteratorDoesUpdates);
-  FRIEND_TEST(TestDeltaMemStore, TestCollectMutations);
   friend class DeltaMemStore;
 
   // Initialize the iterator.
@@ -234,7 +230,7 @@ class DMSIterator : public DeltaIterator {
 
   const std::shared_ptr<const DeltaMemStore> dms_;
 
-  DeltaPreparer preparer_;
+  DeltaPreparer<DMSPreparerTraits> preparer_;
 
   gscoped_ptr<DeltaMemStore::DMSTreeIter> iter_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index ce65068..449651a 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -511,6 +511,16 @@ void MvccSnapshot::AddCommittedTimestamp(Timestamp timestamp) {
   }
 }
 
+bool MvccSnapshot::Equals(const MvccSnapshot& other) const {
+  if (all_committed_before_ != other.all_committed_before_) {
+    return false;
+  }
+  if (none_committed_at_or_after_ != other.none_committed_at_or_after_) {
+    return false;
+  }
+  return committed_timestamps_ == other.committed_timestamps_;
+}
+
 ////////////////////////////////////////////////////////////
 // ScopedTransaction
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 347376e..a87e3c9 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -111,6 +111,10 @@ class MvccSnapshot {
   // yet we need to construct a scanner that accurately represents that set.
   void AddCommittedTimestamps(const std::vector<Timestamp>& timestamps);
 
+  // Returns true if 'other' represents the same set of timestamps as this
+  // snapshot, false otherwise.
+  bool Equals(const MvccSnapshot& other) const;
+
  private:
   friend class MvccManager;
   FRIEND_TEST(MvccTest, TestMayHaveCommittedTransactionsAtOrAfter);

http://git-wip-us.apache.org/repos/asf/kudu/blob/1043f319/src/kudu/tablet/tablet-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index a7b74ae..f724533 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -333,11 +333,6 @@ static Status WriteRow(const Slice &row_slice, RowSetWriterClass *writer) {
   return writer->AppendBlock(block);
 }
 
-template <DeltaType Type>
-struct DeltaTypeSelector {
-  static constexpr DeltaType kTag = Type;
-};
-
 // Tracks encoded deltas and provides a DeltaIterator-like interface for
 // querying them.
 //


[2/3] kudu git commit: KUDU-686 (part 1/2): decompose guts of DMSIterator into DeltaPreparer

Posted by ad...@apache.org.
KUDU-686 (part 1/2): decompose guts of DMSIterator into DeltaPreparer

To address KUDU-686, we're going to repurpose DMSIterator's PrepareBatch()
machinery and associated in-memory state for use in the DeltaFileIterator.
Doing so obviates the need for a "multi-pass" approach to ApplyUpdates()
because DeltaFileIterator will no longer be performing any decoding there,
having done all of it in PrepareBatch().

This patch lays the groundwork by refactoring the guts of DMSIterator into
the new DeltaPreparer class. DMSIterator will continue to concern itself
with CBTree iteration, but will delegate the delta preparation and service
to DeltaPreparer.

In performing this refactor, I tried to be as faithful as possible to the
original code. The one exception is that I replaced prepared_idx_ and
prepared_count_ with state that I found easier to understand.

No new tests; I figured there was enough test coverage of DMSIterator, and
testing DeltaPreparer directly seemed like it'd be low bang for the buck.

Change-Id: I26c92dfa69b5323e7fbb18d02010ed99cc29c3e5
Reviewed-on: http://gerrit.cloudera.org:8080/11394
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bd8d747e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bd8d747e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bd8d747e

Branch: refs/heads/master
Commit: bd8d747ed57b3cfa53c0c1b4465589b7ec5f1663
Parents: 04468d0
Author: Adar Dembo <ad...@cloudera.com>
Authored: Tue Sep 4 22:38:57 2018 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Oct 31 23:58:08 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_iterator_merger.cc |  10 +-
 src/kudu/tablet/delta_iterator_merger.h  |  34 +++--
 src/kudu/tablet/delta_store.cc           | 165 ++++++++++++++++++++
 src/kudu/tablet/delta_store.h            | 209 ++++++++++++++++++++------
 src/kudu/tablet/deltafile.cc             |  33 ++--
 src/kudu/tablet/deltafile.h              |  28 ++--
 src/kudu/tablet/deltamemstore.cc         | 151 +++----------------
 src/kudu/tablet/deltamemstore.h          |  64 ++------
 8 files changed, 420 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_iterator_merger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.cc b/src/kudu/tablet/delta_iterator_merger.cc
index 23e6fff..4230c1c 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -43,7 +43,7 @@ DeltaIteratorMerger::DeltaIteratorMerger(
     vector<unique_ptr<DeltaIterator> > iters)
     : iters_(std::move(iters)) {}
 
-Status DeltaIteratorMerger::Init(ScanSpec *spec) {
+Status DeltaIteratorMerger::Init(ScanSpec* spec) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->Init(spec));
   }
@@ -64,21 +64,21 @@ Status DeltaIteratorMerger::PrepareBatch(size_t nrows, PrepareFlag flag) {
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
+Status DeltaIteratorMerger::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->ApplyUpdates(col_to_apply, dst));
   }
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::ApplyDeletes(SelectionVector *sel_vec) {
+Status DeltaIteratorMerger::ApplyDeletes(SelectionVector* sel_vec) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->ApplyDeletes(sel_vec));
   }
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::CollectMutations(vector<Mutation *> *dst, Arena *arena) {
+Status DeltaIteratorMerger::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->CollectMutations(dst, arena));
   }
@@ -117,7 +117,7 @@ bool DeltaIteratorMerger::HasNext() {
   return false;
 }
 
-bool DeltaIteratorMerger::MayHaveDeltas() {
+bool DeltaIteratorMerger::MayHaveDeltas() const {
   for (const unique_ptr<DeltaIterator>& iter : iters_) {
     if (iter->MayHaveDeltas()) {
       return true;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_iterator_merger.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.h b/src/kudu/tablet/delta_iterator_merger.h
index d1bd9b4..cbfb685 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -23,7 +23,6 @@
 #include <vector>
 
 #include "kudu/common/rowid.h"
-#include "kudu/gutil/port.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/util/status.h"
 
@@ -57,18 +56,27 @@ class DeltaIteratorMerger : public DeltaIterator {
   ////////////////////////////////////////////////////////////
   // Implementations of DeltaIterator
   ////////////////////////////////////////////////////////////
-  virtual Status Init(ScanSpec *spec) OVERRIDE;
-  virtual Status SeekToOrdinal(rowid_t idx) OVERRIDE;
-  virtual Status PrepareBatch(size_t nrows, PrepareFlag flag) OVERRIDE;
-  virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) OVERRIDE;
-  virtual Status ApplyDeletes(SelectionVector *sel_vec) OVERRIDE;
-  virtual Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) OVERRIDE;
-  virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
-                                                 std::vector<DeltaKeyAndUpdate>* out,
-                                                 Arena* arena) OVERRIDE;
-  virtual bool HasNext() OVERRIDE;
-  bool MayHaveDeltas() override;
-  virtual std::string ToString() const OVERRIDE;
+  Status Init(ScanSpec* spec) override;
+
+  Status SeekToOrdinal(rowid_t idx) override;
+
+  Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
+
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
+
+  Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
+
+  Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
+                                         std::vector<DeltaKeyAndUpdate>* out,
+                                         Arena* arena) override;
+
+  bool HasNext() override;
+
+  bool MayHaveDeltas() const override;
+
+  std::string ToString() const override;
 
  private:
   explicit DeltaIteratorMerger(std::vector<std::unique_ptr<DeltaIterator> > iters);

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 379a2fa..e327a7c 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -19,18 +19,28 @@
 
 #include <algorithm>
 #include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <ostream>
 
 #include <glog/logging.h>
 
+#include "kudu/common/columnblock.h"
+#include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
+#include "kudu/common/rowblock.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/types.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/deltafile.h"
+#include "kudu/tablet/mutation.h"
+#include "kudu/tablet/mvcc.h"
+#include "kudu/util/debug-util.h"
 #include "kudu/util/memory/arena.h"
 
 namespace kudu {
@@ -51,6 +61,161 @@ string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool p
                                                  key.timestamp().ToString()))));
 }
 
+DeltaPreparer::DeltaPreparer(RowIteratorOptions opts)
+    : opts_(std::move(opts)),
+      cur_prepared_idx_(0),
+      prev_prepared_idx_(0),
+      prepared_for_(NOT_PREPARED) {
+}
+
+void DeltaPreparer::Seek(rowid_t row_idx) {
+  prev_prepared_idx_ = row_idx;
+  cur_prepared_idx_ = row_idx;
+  prepared_for_ = NOT_PREPARED;
+}
+
+void DeltaPreparer::Start(DeltaIterator::PrepareFlag flag) {
+  if (updates_by_col_.empty()) {
+    updates_by_col_.resize(opts_.projection->num_columns());
+  }
+  for (UpdatesForColumn& ufc : updates_by_col_) {
+    ufc.clear();
+  }
+  deleted_.clear();
+  prepared_deltas_.clear();
+  switch (flag) {
+    case DeltaIterator::PREPARE_FOR_APPLY:
+      prepared_for_ = PREPARED_FOR_APPLY;
+      break;
+    case DeltaIterator::PREPARE_FOR_COLLECT:
+      prepared_for_ = PREPARED_FOR_COLLECT;
+      break;
+    default:
+      LOG(FATAL) << "Unknown prepare flag: " << flag;
+  }
+}
+
+void DeltaPreparer::Finish(size_t nrows) {
+  prev_prepared_idx_ = cur_prepared_idx_;
+  cur_prepared_idx_ += nrows;
+}
+
+Status DeltaPreparer::AddDelta(const DeltaKey& key, Slice val) {
+  if (!opts_.snap_to_include.IsCommitted(key.timestamp())) {
+    return Status::OK();
+  }
+
+  if (prepared_for_ == PREPARED_FOR_APPLY) {
+    RowChangeListDecoder decoder((RowChangeList(val)));
+    decoder.InitNoSafetyChecks();
+    DCHECK(!decoder.is_reinsert()) << "Reinserts are not supported in the DeltaMemStore.";
+    if (decoder.is_delete()) {
+      deleted_.emplace_back(key.row_idx());
+    } else {
+      DCHECK(decoder.is_update());
+      while (decoder.HasNext()) {
+        RowChangeListDecoder::DecodedUpdate dec;
+        RETURN_NOT_OK(decoder.DecodeNext(&dec));
+        int col_idx;
+        const void* col_val;
+        RETURN_NOT_OK(dec.Validate(*opts_.projection, &col_idx, &col_val));
+        if (col_idx == -1) {
+          // This column isn't being projected.
+          continue;
+        }
+        int col_size = opts_.projection->column(col_idx).type_info()->size();
+
+        // If we already have an earlier update for the same column, we can
+        // just overwrite that one.
+        if (updates_by_col_[col_idx].empty() ||
+            updates_by_col_[col_idx].back().row_id != key.row_idx()) {
+          updates_by_col_[col_idx].emplace_back();
+        }
+
+        ColumnUpdate& cu = updates_by_col_[col_idx].back();
+        cu.row_id = key.row_idx();
+        if (col_val == nullptr) {
+          cu.new_val_ptr = nullptr;
+        } else {
+          memcpy(cu.new_val_buf, col_val, col_size);
+          // NOTE: we're constructing a pointer here to an element inside the deque.
+          // This is safe because deques never invalidate pointers to their elements.
+          cu.new_val_ptr = cu.new_val_buf;
+        }
+      }
+    }
+  } else {
+    DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
+    PreparedDelta d;
+    d.key = key;
+    d.val = val;
+    prepared_deltas_.emplace_back(d);
+  }
+
+  return Status::OK();
+}
+
+Status DeltaPreparer::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
+  DCHECK_EQ(cur_prepared_idx_ - prev_prepared_idx_, dst->nrows());
+
+  const ColumnSchema* col_schema = &opts_.projection->column(col_to_apply);
+  for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
+    int32_t idx_in_block = cu.row_id - prev_prepared_idx_;
+    DCHECK_GE(idx_in_block, 0);
+    SimpleConstCell src(col_schema, cu.new_val_ptr);
+    ColumnBlock::Cell dst_cell = dst->cell(idx_in_block);
+    RETURN_NOT_OK(CopyCell(src, &dst_cell, dst->arena()));
+  }
+
+  return Status::OK();
+}
+
+Status DeltaPreparer::ApplyDeletes(SelectionVector* sel_vec) {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
+  DCHECK_EQ(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
+
+  for (const auto& row_id : deleted_) {
+    uint32_t idx_in_block = row_id - prev_prepared_idx_;
+    sel_vec->SetRowUnselected(idx_in_block);
+  }
+
+  return Status::OK();
+}
+
+Status DeltaPreparer::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
+  for (const PreparedDelta& src : prepared_deltas_) {
+    DeltaKey key = src.key;
+    RowChangeList changelist(src.val);
+    uint32_t rel_idx = key.row_idx() - prev_prepared_idx_;
+
+    Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), changelist);
+    mutation->PrependToList(&dst->at(rel_idx));
+  }
+  return Status::OK();
+}
+
+Status DeltaPreparer::FilterColumnIdsAndCollectDeltas(const vector<ColumnId>& /*col_ids*/,
+                                                      vector<DeltaKeyAndUpdate>* /*out*/,
+                                                      Arena* /*arena*/) {
+  LOG(DFATAL) << "Attempt to call FilterColumnIdsAndCollectDeltas on DMS" << GetStackTrace();
+  return Status::InvalidArgument("FilterColumsAndAppend() is not supported by DMSIterator");
+}
+
+bool DeltaPreparer::MayHaveDeltas() const {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
+  if (!deleted_.empty()) {
+    return true;
+  }
+  for (auto& col : updates_by_col_) {
+    if (!col.empty()) {
+      return true;
+    }
+  }
+  return false;
+}
+
 Status DebugDumpDeltaIterator(DeltaType type,
                               DeltaIterator* iter,
                               const Schema& schema,

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 6358df2..3f1c160 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -19,12 +19,15 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <deque>
 #include <memory>
 #include <string>
 #include <vector>
 
 #include "kudu/common/rowid.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/tablet/delta_key.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -47,8 +50,6 @@ class DeltaFileWriter;
 class DeltaIterator;
 class DeltaStats;
 class Mutation;
-class MvccSnapshot;
-struct RowIteratorOptions;
 
 // Interface for the pieces of the system that track deltas/updates.
 // This is implemented by DeltaMemStore and by DeltaFileReader.
@@ -126,7 +127,60 @@ struct DeltaKeyAndUpdate {
   std::string Stringify(DeltaType type, const Schema& schema, bool pad_key = false) const;
 };
 
-class DeltaIterator {
+// Representation of deltas that have been "prepared" by an iterator. That is,
+// they have been consistently read (at a snapshot) from their backing store
+// into an in-memory format suitable for efficient retrieval.
+class PreparedDeltas {
+ public:
+  // Applies the snapshotted updates to one of the columns.
+  //
+  // 'dst' must be the same length as was previously passed to PrepareBatch()
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
+  virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) = 0;
+
+  // Applies any deletes to the given selection vector.
+  //
+  // Rows which have been deleted in the associated MVCC snapshot are set to 0
+  // in the selection vector so that they don't show up in the output.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
+  virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;
+
+  // Collects the mutations associated with each row in the current prepared batch.
+  //
+  // Each entry in the vector will be treated as a singly linked list of Mutation
+  // objects. If there are no mutations for that row, the entry will be unmodified.
+  // If there are mutations, they will be prepended at the head of the linked list
+  // (i.e the resulting list will be in descending timestamp order)
+  //
+  // The Mutation objects will be allocated out of the provided Arena, which must be non-NULL.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
+  virtual Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) = 0;
+
+  // Iterates through all deltas, adding deltas for columns not specified in
+  // 'col_ids' to 'out'.
+  //
+  // Unlike CollectMutations, the iterator's MVCC snapshots are ignored; all
+  // deltas are considered relevant.
+  //
+  // The delta objects will be allocated out the provided Arena, which must be non-NULL.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
+  virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
+                                                 std::vector<DeltaKeyAndUpdate>* out,
+                                                 Arena* arena) = 0;
+
+  // Returns true if there might exist deltas to be applied. It is safe to
+  // conservatively return true, but this would force a skip over decoder-level
+  // evaluation.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
+  virtual bool MayHaveDeltas() const = 0;
+};
+
+class DeltaIterator : public PreparedDeltas {
  public:
   // Initialize the iterator. This must be called once before any other
   // call.
@@ -143,8 +197,8 @@ class DeltaIterator {
   };
 
   // Prepare to apply deltas to a block of rows. This takes a consistent snapshot
-  // of all updates to the next 'nrows' rows, so that subsequent calls to
-  // ApplyUpdates() will not cause any "tearing"/non-atomicity.
+  // of all updates to the next 'nrows' rows, so that subsequent calls to a
+  // PreparedDeltas method will not cause any "tearing"/non-atomicity.
   //
   // 'flag' denotes whether the batch will be used for collecting mutations or
   // for applying them. Some implementations may choose to prepare differently.
@@ -153,62 +207,125 @@ class DeltaIterator {
   // of the previously prepared block.
   virtual Status PrepareBatch(size_t nrows, PrepareFlag flag) = 0;
 
-  // Apply the snapshotted updates to one of the columns.
-  // 'dst' must be the same length as was previously passed to PrepareBatch()
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_APPLY.
-  virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) = 0;
+  // Returns true if there are any more rows left in this iterator.
+  virtual bool HasNext() = 0;
+
+  // Return a string representation suitable for debug printouts.
+  virtual std::string ToString() const = 0;
+
+  virtual ~DeltaIterator() {}
+};
 
-  // Apply any deletes to the given selection vector.
-  // Rows which have been deleted in the associated MVCC snapshot are set to
-  // 0 in the selection vector so that they don't show up in the output.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_APPLY.
-  virtual Status ApplyDeletes(SelectionVector *sel_vec) = 0;
+// Encapsulates all logic and responsibility related to "delta preparation";
+// that is, the transformation of encoded deltas into an in-memory
+// representation more suitable for efficient service during iteration.
+//
+// This class is intended to be composed inside a DeltaIterator. The iterator
+// is responsible for loading encoded deltas from a backing store, passing them
+// to the DeltaPreparer to be transformed, and later, calling the DeltaPreparer
+// to serve the deltas.
+class DeltaPreparer : public PreparedDeltas {
+ public:
+  explicit DeltaPreparer(RowIteratorOptions opts);
 
-  // Collect the mutations associated with each row in the current prepared batch.
+  // Updates internal state to reflect a seek performed by a DeltaIterator.
   //
-  // Each entry in the vector will be treated as a singly linked list of Mutation
-  // objects. If there are no mutations for that row, the entry will be unmodified.
-  // If there are mutations, they will be prepended at the head of the linked list
-  // (i.e the resulting list will be in descending timestamp order)
+  // Call upon completion of DeltaIterator::SeekToOrdinal.
+  void Seek(rowid_t row_idx);
+
+  // Updates internal state to reflect the beginning of delta batch preparation
+  // on the part of a DeltaIterator.
   //
-  // The Mutation objects will be allocated out of the provided Arena, which must be non-NULL.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_COLLECT.
-  virtual Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) = 0;
+  // Call at the beginning of DeltaIterator::PrepareBatch.
+  void Start(DeltaIterator::PrepareFlag flag);
 
-  // Iterate through all deltas, adding deltas for columns not
-  // specified in 'col_ids' to 'out'.
+  // Updates internal state to reflect the end of delta batch preparation on the
+  // part of a DeltaIterator.
   //
-  // Unlike CollectMutations, the iterator's MVCC snapshots are ignored; all
-  // deltas are considered relevant.
-  // The delta objects will be allocated out of the provided Arena, which must be non-Null.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_COLLECT.
-  virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
-                                                 std::vector<DeltaKeyAndUpdate>* out,
-                                                 Arena* arena) = 0;
+  // Call at the end of DeltaIterator::PrepareBatch.
+  void Finish(size_t nrows);
 
-  // Returns true if there are any more rows left in this iterator.
-  virtual bool HasNext() = 0;
+  // Prepares the delta given by 'key' whose encoded changes are pointed to by 'val'.
+  //
+  // Upon completion, it is safe for the memory behind 'val' to be destroyed.
+  //
+  // Call when a new delta becomes available in DeltaIterator::PrepareBatch.
+  Status AddDelta(const DeltaKey& key, Slice val);
 
-  // Returns true if there might exist deltas to be applied. It is safe to
-  // conservatively return true, but this would force a skip over decoder-level
-  // evaluation.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_APPLY.
-  virtual bool MayHaveDeltas() = 0;
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
 
-  // Return a string representation suitable for debug printouts.
-  virtual std::string ToString() const = 0;
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  virtual ~DeltaIterator() {}
-};
+  Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
+
+  Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
+                                         std::vector<DeltaKeyAndUpdate>* out,
+                                         Arena* arena) override;
+
+  bool MayHaveDeltas() const override;
+
+  rowid_t cur_prepared_idx() const { return cur_prepared_idx_; }
+
+ private:
+  // Options with which the DeltaPreparer was constructed.
+  const RowIteratorOptions opts_;
 
-enum {
-  ITERATE_OVER_ALL_ROWS = 0
+  // The row index at which the most recent batch preparation ended.
+  rowid_t cur_prepared_idx_;
+
+  // The value of 'cur_prepared_idx_' from the previous batch.
+  rowid_t prev_prepared_idx_;
+
+  // Whether there are any prepared blocks.
+  enum PreparedFor {
+    // There are no prepared blocks. Attempts to call a PreparedDeltas function
+    // will fail.
+    NOT_PREPARED,
+
+    // The DeltaPreparer has prepared a batch of deltas for applying. All deltas
+    // in the batch have been decoded. UPDATEs and REINSERTs have been coalesced
+    // into a column-major data structure suitable for ApplyUpdates. DELETES
+    // have been coalesced into a row-major data structure suitable for ApplyDeletes.
+    //
+    // ApplyUpdates and ApplyDeltas are now callable.
+    PREPARED_FOR_APPLY,
+
+    // The DeltaPreparer has prepared a batch of deltas for collecting. Deltas
+    // remain encoded and in the order that they were loaded from the backing store.
+    //
+    // CollectMutations and FilterColumnIdsAndCollectDeltas are now callable.
+    PREPARED_FOR_COLLECT
+  };
+  PreparedFor prepared_for_;
+
+  // State when prepared_for_ == PREPARED_FOR_APPLY
+  // ------------------------------------------------------------
+  struct ColumnUpdate {
+    rowid_t row_id;
+    void* new_val_ptr;
+    uint8_t new_val_buf[16];
+  };
+  typedef std::deque<ColumnUpdate> UpdatesForColumn;
+  std::vector<UpdatesForColumn> updates_by_col_;
+  std::deque<rowid_t> deleted_;
+
+  // State when prepared_for_ == PREPARED_FOR_COLLECT
+  // ------------------------------------------------------------
+  struct PreparedDelta {
+    DeltaKey key;
+    Slice val;
+  };
+  std::deque<PreparedDelta> prepared_deltas_;
+
+  DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
 };
 
+enum { ITERATE_OVER_ALL_ROWS = 0 };
+
 // Dumps contents of 'iter' to 'out', line-by-line.  Used to unit test
 // minor delta compaction.
 //
-// If nrows is 0, all rows will be dumped.
+// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
 Status DebugDumpDeltaIterator(DeltaType type,
                               DeltaIterator* iter,
                               const Schema& schema,
@@ -218,7 +335,7 @@ Status DebugDumpDeltaIterator(DeltaType type,
 // Writes the contents of 'iter' to 'out', block by block.  Used by
 // minor delta compaction.
 //
-// If nrows is 0, all rows will be dumped.
+// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
 template<DeltaType Type>
 Status WriteDeltaIteratorToFile(DeltaIterator* iter,
                                 size_t nrows,

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index bbaad00..9a1be53 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -411,7 +411,7 @@ DeltaFileIterator::DeltaFileIterator(shared_ptr<DeltaFileReader> dfr,
       delta_type_(delta_type),
       cache_blocks_(CFileReader::CACHE_BLOCK) {}
 
-Status DeltaFileIterator::Init(ScanSpec *spec) {
+Status DeltaFileIterator::Init(ScanSpec* spec) {
   DCHECK(!initted_) << "Already initted";
 
   if (spec) {
@@ -734,7 +734,7 @@ inline Status ApplyingVisitor<UNDO>::Visit(const DeltaKey& key,
   return Status::OK();
 }
 
-Status DeltaFileIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
+Status DeltaFileIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
   DCHECK_LE(prepared_count_, dst->nrows());
 
   if (delta_type_ == REDO) {
@@ -796,8 +796,8 @@ inline Status LivenessVisitor<REDO>::Visit(const DeltaKey& key,
 
 template<>
 inline Status LivenessVisitor<UNDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas, bool*
-                                           continue_visit) {
+                                           const Slice& deltas,
+                                           bool* continue_visit) {
   if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
     return ApplyDelete(key, deltas);
   }
@@ -805,7 +805,7 @@ inline Status LivenessVisitor<UNDO>::Visit(const DeltaKey& key,
 }
 
 
-Status DeltaFileIterator::ApplyDeletes(SelectionVector *sel_vec) {
+Status DeltaFileIterator::ApplyDeletes(SelectionVector* sel_vec) {
   DCHECK_LE(prepared_count_, sel_vec->nrows());
   if (delta_type_ == REDO) {
     DVLOG(3) << "Applying REDO deletes";
@@ -842,8 +842,8 @@ struct CollectingVisitor {
 
 template<>
 inline Status CollectingVisitor<REDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas,
-                                           bool* continue_visit) {
+                                             const Slice& deltas,
+                                             bool* continue_visit) {
   if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
     return Collect(key, deltas);
   }
@@ -852,32 +852,31 @@ inline Status CollectingVisitor<REDO>::Visit(const DeltaKey& key,
 
 template<>
 inline Status CollectingVisitor<UNDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas, bool*
-                                           continue_visit) {
+                                             const Slice& deltas,
+                                             bool* continue_visit) {
   if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) {
     return Collect(key, deltas);
   }
   return Status::OK();
 }
 
-Status DeltaFileIterator::CollectMutations(vector<Mutation *> *dst, Arena *dst_arena) {
+Status DeltaFileIterator::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
   DCHECK_LE(prepared_count_, dst->size());
   if (delta_type_ == REDO) {
-    CollectingVisitor<REDO> visitor = {this, dst, dst_arena};
-    return VisitMutations(&visitor);
-  } else {
-    CollectingVisitor<UNDO> visitor = {this, dst, dst_arena};
+    CollectingVisitor<REDO> visitor = {this, dst, arena};
     return VisitMutations(&visitor);
   }
+  CollectingVisitor<UNDO> visitor = {this, dst, arena};
+  return VisitMutations(&visitor);
 }
 
 bool DeltaFileIterator::HasNext() {
   return !exhausted_ || !delta_blocks_.empty();
 }
 
-bool DeltaFileIterator::MayHaveDeltas() {
-  // TODO: change the API to take in the col_to_apply and check for deltas on
-  // that column only.
+bool DeltaFileIterator::MayHaveDeltas() const {
+  // TODO(awong): change the API to take in the col_to_apply and check for
+  // deltas on that column only.
   DCHECK(prepared_) << "must Prepare";
   for (auto& block : delta_blocks_) {
     BinaryPlainBlockDecoder& bpd = *block->decoder_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 811db5b..439aa48 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -230,19 +230,27 @@ class DeltaFileReader : public DeltaStore,
 // See DeltaIterator for details.
 class DeltaFileIterator : public DeltaIterator {
  public:
-  Status Init(ScanSpec *spec) OVERRIDE;
+  Status Init(ScanSpec* spec) override;
+
+  Status SeekToOrdinal(rowid_t idx) override;
+
+  Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
+
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
+
+  Status CollectMutations(std::vector<Mutation*>*dst, Arena* arena) override;
 
-  Status SeekToOrdinal(rowid_t idx) OVERRIDE;
-  Status PrepareBatch(size_t nrows, PrepareFlag flag) OVERRIDE;
-  Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) OVERRIDE;
-  Status ApplyDeletes(SelectionVector *sel_vec) OVERRIDE;
-  Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) OVERRIDE;
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                          std::vector<DeltaKeyAndUpdate>* out,
-                                         Arena* arena) OVERRIDE;
-  std::string ToString() const OVERRIDE;
-  virtual bool HasNext() OVERRIDE;
-  bool MayHaveDeltas() override;
+                                         Arena* arena) override;
+
+  std::string ToString() const override;
+
+  bool HasNext() override;
+
+  bool MayHaveDeltas() const override;
 
  private:
   friend class DeltaFileReader;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index a704600..f68727b 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -17,29 +17,23 @@
 
 #include "kudu/tablet/deltamemstore.h"
 
-#include <cstring>
 #include <ostream>
 #include <utility>
 
 #include <glog/logging.h>
 
-#include "kudu/common/columnblock.h"
-#include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
-#include "kudu/common/rowblock.h"
-#include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
-#include "kudu/common/types.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/deltafile.h"
-#include "kudu/tablet/mutation.h"
-#include "kudu/tablet/mvcc.h"
-#include "kudu/util/debug-util.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/memcmpable_varint.h"
 #include "kudu/util/memory/memory.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -201,12 +195,8 @@ void DeltaMemStore::DebugPrint() const {
 DMSIterator::DMSIterator(const shared_ptr<const DeltaMemStore>& dms,
                          RowIteratorOptions opts)
     : dms_(dms),
-      opts_(std::move(opts)),
+      preparer_(std::move(opts)),
       iter_(dms->tree_.NewIterator()),
-      initted_(false),
-      prepared_idx_(0),
-      prepared_count_(0),
-      prepared_for_(NOT_PREPARED),
       seeked_(false) {}
 
 Status DMSIterator::Init(ScanSpec* /*spec*/) {
@@ -221,9 +211,7 @@ Status DMSIterator::SeekToOrdinal(rowid_t row_idx) {
 
   bool exact; /* unused */
   iter_->SeekAtOrAfter(Slice(buf), &exact);
-  prepared_idx_ = row_idx;
-  prepared_count_ = 0;
-  prepared_for_ = NOT_PREPARED;
+  preparer_.Seek(row_idx);
   seeked_ = true;
   return Status::OK();
 }
@@ -242,17 +230,9 @@ Status DMSIterator::PrepareBatch(size_t nrows, PrepareFlag flag) {
   // copy here is instead a single copy of the data, so is likely faster.
   CHECK(seeked_);
   DCHECK(initted_) << "must init";
-  rowid_t start_row = prepared_idx_ + prepared_count_;
+  rowid_t start_row = preparer_.cur_prepared_idx();
   rowid_t stop_row = start_row + nrows - 1;
-
-  if (updates_by_col_.empty()) {
-    updates_by_col_.resize(opts_.projection->num_columns());
-  }
-  for (UpdatesForColumn& ufc : updates_by_col_) {
-    ufc.clear();
-  }
-  deleted_.clear();
-  prepared_deltas_.clear();
+  preparer_.Start(flag);
 
   while (iter_->IsValid()) {
     Slice key_slice, val;
@@ -261,133 +241,38 @@ Status DMSIterator::PrepareBatch(size_t nrows, PrepareFlag flag) {
     RETURN_NOT_OK(key.DecodeFrom(&key_slice));
     DCHECK_GE(key.row_idx(), start_row);
     if (key.row_idx() > stop_row) break;
-
-    if (!opts_.snap_to_include.IsCommitted(key.timestamp())) {
-      // The transaction which applied this update is not yet committed
-      // in this iterator's MVCC snapshot. Hence, skip it.
-      iter_->Next();
-      continue;
-    }
-
-    if (flag == PREPARE_FOR_APPLY) {
-      RowChangeListDecoder decoder((RowChangeList(val)));
-      decoder.InitNoSafetyChecks();
-      DCHECK(!decoder.is_reinsert()) << "Reinserts are not supported in the DeltaMemStore.";
-      if (decoder.is_delete()) {
-        deleted_.push_back(key.row_idx());
-      } else {
-        DCHECK(decoder.is_update());
-        while (decoder.HasNext()) {
-          RowChangeListDecoder::DecodedUpdate dec;
-          RETURN_NOT_OK(decoder.DecodeNext(&dec));
-          int col_idx;
-          const void* col_val;
-          RETURN_NOT_OK(dec.Validate(*opts_.projection, &col_idx, &col_val));
-          if (col_idx == -1) {
-            // This column isn't being projected.
-            continue;
-          }
-          int col_size = opts_.projection->column(col_idx).type_info()->size();
-
-          // If we already have an earlier update for the same column, we can
-          // just overwrite that one.
-          if (updates_by_col_[col_idx].empty() ||
-              updates_by_col_[col_idx].back().row_id != key.row_idx()) {
-            updates_by_col_[col_idx].emplace_back();
-          }
-
-          ColumnUpdate& cu = updates_by_col_[col_idx].back();
-          cu.row_id = key.row_idx();
-          if (col_val == nullptr) {
-            cu.new_val_ptr = nullptr;
-          } else {
-            memcpy(cu.new_val_buf, col_val, col_size);
-            // NOTE: we're constructing a pointer here to an element inside the deque.
-            // This is safe because deques never invalidate pointers to their elements.
-            cu.new_val_ptr = cu.new_val_buf;
-          }
-        }
-      }
-    } else {
-      DCHECK_EQ(flag, PREPARE_FOR_COLLECT);
-      PreparedDelta d;
-      d.key = key;
-      d.val = val;
-      prepared_deltas_.push_back(d);
-    }
-
+    RETURN_NOT_OK(preparer_.AddDelta(key, val));
     iter_->Next();
   }
-  prepared_idx_ = start_row;
-  prepared_count_ = nrows;
-  prepared_for_ = flag == PREPARE_FOR_APPLY ? PREPARED_FOR_APPLY : PREPARED_FOR_COLLECT;
+  preparer_.Finish(nrows);
   return Status::OK();
 }
 
-Status DMSIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
-  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
-  DCHECK_EQ(prepared_count_, dst->nrows());
-
-  const ColumnSchema* col_schema = &opts_.projection->column(col_to_apply);
-  for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
-    int32_t idx_in_block = cu.row_id - prepared_idx_;
-    DCHECK_GE(idx_in_block, 0);
-    SimpleConstCell src(col_schema, cu.new_val_ptr);
-    ColumnBlock::Cell dst_cell = dst->cell(idx_in_block);
-    RETURN_NOT_OK(CopyCell(src, &dst_cell, dst->arena()));
-  }
-
-  return Status::OK();
+Status DMSIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
+  return preparer_.ApplyUpdates(col_to_apply, dst);
 }
 
-
-Status DMSIterator::ApplyDeletes(SelectionVector *sel_vec) {
-  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
-  DCHECK_EQ(prepared_count_, sel_vec->nrows());
-
-  for (auto& row_id : deleted_) {
-    uint32_t idx_in_block = row_id - prepared_idx_;
-    sel_vec->SetRowUnselected(idx_in_block);
-  }
-
-  return Status::OK();
+Status DMSIterator::ApplyDeletes(SelectionVector* sel_vec) {
+  return preparer_.ApplyDeletes(sel_vec);
 }
 
 
-Status DMSIterator::CollectMutations(vector<Mutation *> *dst, Arena *arena) {
-  DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
-  for (const PreparedDelta& src : prepared_deltas_) {
-    DeltaKey key = src.key;;
-    RowChangeList changelist(src.val);
-    uint32_t rel_idx = key.row_idx() - prepared_idx_;
-
-    Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), changelist);
-    mutation->PrependToList(&dst->at(rel_idx));
-  }
-  return Status::OK();
+Status DMSIterator::CollectMutations(vector<Mutation*>*dst, Arena* arena) {
+  return preparer_.CollectMutations(dst, arena);
 }
 
 Status DMSIterator::FilterColumnIdsAndCollectDeltas(const vector<ColumnId>& col_ids,
                                                     vector<DeltaKeyAndUpdate>* out,
                                                     Arena* arena) {
-  LOG(DFATAL) << "Attempt to call FilterColumnIdsAndCollectDeltas on DMS" << GetStackTrace();
-  return Status::InvalidArgument("FilterColumsAndAppend() is not supported by DMSIterator");
+  return preparer_.FilterColumnIdsAndCollectDeltas(col_ids, out, arena);
 }
 
 bool DMSIterator::HasNext() {
   return iter_->IsValid();
 }
 
-bool DMSIterator::MayHaveDeltas() {
-  if (!deleted_.empty()) {
-    return true;
-  }
-  for (auto& col: updates_by_col_) {
-    if (!col.empty()) {
-      return true;
-    }
-  }
-  return false;
+bool DMSIterator::MayHaveDeltas() const {
+  return preparer_.MayHaveDeltas();
 }
 
 string DMSIterator::ToString() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltamemstore.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 253a2c9..66e8c62 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -19,7 +19,6 @@
 
 #include <cstddef>
 #include <cstdint>
-#include <deque>
 #include <memory>
 #include <string>
 #include <vector>
@@ -33,13 +32,10 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/tablet/concurrent_btree.h"
-#include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
-#include "kudu/tablet/rowset.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/memory/arena.h"
-#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -55,16 +51,17 @@ struct ColumnId;
 
 namespace consensus {
 class OpId;
-}
+} // namespace consensus
 
 namespace fs {
 struct IOContext;
-}
+} // namespace fs
 
 namespace tablet {
 
 class DeltaFileWriter;
 class Mutation;
+struct RowIteratorOptions;
 
 struct DMSTreeTraits : public btree::BTreeTraits {
   typedef ThreadSafeMemoryTrackingArena ArenaType;
@@ -200,27 +197,27 @@ class DeltaMemStore : public DeltaStore,
 // functions.
 class DMSIterator : public DeltaIterator {
  public:
-  Status Init(ScanSpec *spec) OVERRIDE;
+  Status Init(ScanSpec* spec) override;
 
-  Status SeekToOrdinal(rowid_t row_idx) OVERRIDE;
+  Status SeekToOrdinal(rowid_t row_idx) override;
 
-  Status PrepareBatch(size_t nrows, PrepareFlag flag) OVERRIDE;
+  Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
 
-  Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) OVERRIDE;
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
 
-  Status ApplyDeletes(SelectionVector *sel_vec) OVERRIDE;
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) OVERRIDE;
+  Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                          std::vector<DeltaKeyAndUpdate>* out,
-                                         Arena* arena) OVERRIDE;
+                                         Arena* arena) override;
 
-  std::string ToString() const OVERRIDE;
+  std::string ToString() const override;
 
-  virtual bool HasNext() OVERRIDE;
+  bool HasNext() override;
 
-  bool MayHaveDeltas() override;
+  bool MayHaveDeltas() const override;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(DMSIterator);
@@ -237,47 +234,14 @@ class DMSIterator : public DeltaIterator {
 
   const std::shared_ptr<const DeltaMemStore> dms_;
 
-  const RowIteratorOptions opts_;
+  DeltaPreparer preparer_;
 
   gscoped_ptr<DeltaMemStore::DMSTreeIter> iter_;
 
   bool initted_;
 
-  // The index at which the last PrepareBatch() call was made
-  rowid_t prepared_idx_;
-
-  // The number of rows for which the last PrepareBatch() call was made
-  uint32_t prepared_count_;
-
-  // Whether there are prepared blocks built through PrepareBatch().
-  enum PreparedFor {
-    NOT_PREPARED,
-    PREPARED_FOR_APPLY,
-    PREPARED_FOR_COLLECT
-  };
-  PreparedFor prepared_for_;
-
   // True if SeekToOrdinal() been called at least once.
   bool seeked_;
-
-  // State when prepared_for_ == PREPARED_FOR_APPLY
-  // ------------------------------------------------------------
-  struct ColumnUpdate {
-    rowid_t row_id;
-    void* new_val_ptr;
-    uint8_t new_val_buf[16];
-  };
-  typedef std::deque<ColumnUpdate> UpdatesForColumn;
-  std::vector<UpdatesForColumn> updates_by_col_;
-  std::deque<rowid_t> deleted_;
-
-  // State when prepared_for_ == PREPARED_FOR_COLLECT
-  // ------------------------------------------------------------
-  struct PreparedDelta {
-    DeltaKey key;
-    Slice val;
-  };
-  std::deque<PreparedDelta> prepared_deltas_;
 };
 
 } // namespace tablet