You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/11 02:54:33 UTC

[kudu] branch master updated (a0100ee -> 054084f)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from a0100ee  [mini-sentry] increase timeout for Sentry startup
     new 84e1e8d  tool: fixes for kudu local_replica dump rowset
     new 054084f  experiments: merge iterator optimization tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/experiments/merge-test.py          | 519 ++++++++++++++++++++++++++++
 src/kudu/tools/kudu-tool-test.cc            | 133 +++++--
 src/kudu/tools/tool_action_local_replica.cc |  87 ++++-
 3 files changed, 682 insertions(+), 57 deletions(-)
 create mode 100755 src/kudu/experiments/merge-test.py


[kudu] 02/02: experiments: merge iterator optimization tests

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 054084fb69e2e55ad8ddd4b858ad5d4902db2d37
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Feb 26 00:56:42 2019 -0800

    experiments: merge iterator optimization tests
    
    Here's a brief exploration into various MergeIterator algorithms, prototyped
    in Python. Only after I was done did I see that there was an existing
    experiment on this same subject in C++ (see merge-test.cc). It's not all
    wasted work though; that experiment didn't include the new "hot/cold" heap
    algorithms, nor did it account for all MergeIterator quirks such as paged
    blocks and lower/upper bounds.
    
    Below are some timing results on a big el7 machine. The "real" input was a
    representative (i.e. mostly compacted) 40GB tablet:
    - NaiveMergeIterator, half-overlapping: 44.7854778767s Counter({'cmp': 25291510, 'peak_blocks_in_mem': 100})
    - SingleHeapMergeIterator, half-overlapping: 11.020619154s Counter({'cmp': 10266988, 'peak_blocks_in_mem': 3})
    - DoubleHeapMergeIterator, half-overlapping: 3.72211503983s Counter({'cmp': 1178497, 'peak_blocks_in_mem': 3})
    - TripleHeapMergeIterator, half-overlapping: 3.52963089943s Counter({'cmp': 1071682, 'peak_blocks_in_mem': 3})
    - NaiveMergeIterator, non-overlapping: 44.3896560669s Counter({'cmp': 25958482, 'peak_blocks_in_mem': 100})
    - SingleHeapMergeIterator, non-overlapping: 10.9636461735s Counter({'cmp': 10598336, 'peak_blocks_in_mem': 1})
    - DoubleHeapMergeIterator, non-overlapping: 2.80402898788s Counter({'cmp': 4021, 'peak_blocks_in_mem': 1})
    - TripleHeapMergeIterator, non-overlapping: 2.83524298668s Counter({'cmp': 4021, 'peak_blocks_in_mem': 1})
    - NaiveMergeIterator, overlapping: 80.1467709541s Counter({'cmp': 47662665, 'peak_blocks_in_mem': 100})
    - SingleHeapMergeIterator, overlapping: 9.61102318764s Counter({'cmp': 8554237, 'peak_blocks_in_mem': 100})
    - DoubleHeapMergeIterator, overlapping: 9.68881893158s Counter({'cmp': 8553345, 'peak_blocks_in_mem': 100})
    - TripleHeapMergeIterator, overlapping: 9.55243206024s Counter({'cmp': 8563292, 'peak_blocks_in_mem': 100})
    - NaiveMergeIterator, real: 1099763.37405s Counter({'cmp': 578660759971, 'peak_blocks_in_mem': 1294})
    - SingleHeapMergeIterator, real: 30513.3831122s Counter({'cmp': 30785961774, 'peak_blocks_in_mem': 5})
    - DoubleHeapMergeIterator, real: 7987.11197996s Counter({'cmp': 4173739455, 'peak_blocks_in_mem': 15})
    - TripleHeapMergeIterator, real: 7155.59520698s Counter({'cmp': 2784969619, 'peak_blocks_in_mem': 5})
    
    Change-Id: I6ae1d2f9e4f41337f475146c648cbab122395f83
    Reviewed-on: http://gerrit.cloudera.org:8080/12587
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 src/kudu/experiments/merge-test.py | 519 +++++++++++++++++++++++++++++++++++++
 1 file changed, 519 insertions(+)

diff --git a/src/kudu/experiments/merge-test.py b/src/kudu/experiments/merge-test.py
new file mode 100755
index 0000000..feddee3
--- /dev/null
+++ b/src/kudu/experiments/merge-test.py
@@ -0,0 +1,519 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from collections import Counter
+from functools import total_ordering
+import glob
+import gzip
+import heapq
+import io
+import logging
+import random
+import time
+import unittest
+
+# Adjustable experiment parameters.
+BLOCK_SIZE = 1000
+MAX_ITEM = 1000000
+NUM_ITERATORS = 100
+MIN_ITEMS_PER_ITERATOR = 1
+MAX_ITEMS_PER_ITERATOR = 10000
+
+class MyHeap(object):
+  """ Heap with custom key comparator. See https://stackoverflow.com/a/8875823. """
+  def __init__(self, initial=None, key=lambda x:x):
+    self.key = key
+    if initial:
+      self._data = [(key(item), item) for item in initial]
+      heapq.heapify(self._data)
+    else:
+      self._data = []
+
+  def push(self, item):
+    heapq.heappush(self._data, (self.key(item), item))
+
+  def pop(self):
+    return heapq.heappop(self._data)[1]
+
+  def top(self):
+    return self._data[0][1]
+
+  def __len__(self):
+    return len(self._data)
+
+  def __str__(self):
+    return ', '.join(str(e[0]) for e in self._data)
+
+class BlockIterator(object):
+  """ Iterator of generic items. Returns items on a block basis. """
+  next_block_idx = 0
+
+  def __init__(self, items):
+    assert len(items) > 0
+    self.items = items
+    self.lower_bound = self.items[0]
+    self.upper_bound = self.items[-1]
+
+  def next_block(self):
+    """ Return an array containing the next block of items """
+    block = self.items[self.next_block_idx:self.next_block_idx + BLOCK_SIZE]
+    assert len(block) > 0
+    self.next_block_idx += len(block)
+    return block
+
+  def has_next(self):
+    return len(self.items) > 0 and self.next_block_idx < len(self.items)
+
+  def __str__(self):
+    return ", ".join(str(i) for i in self.items[self.next_block_idx:])
+
+class FileBlockIterator(object):
+  """
+  Iterator of generic items loaded from an (optionally gzipped) file. Returns
+  items on a block basis.
+
+  When constructed for a file with name 'foo', expects to find a file with name
+  'foo.firstlast' containing two lines: the first and last items in 'foo'.
+  """
+
+  def __init__(self, filename, stats):
+    self.stats = stats
+    self.filename = filename
+
+    if self.filename.endswith(".gz"):
+      self.fh = gzip.open(self.filename, "rb")
+    else:
+      self.fh = open(self.filename, "rb")
+    self.br = io.BufferedReader(self.fh)
+
+    with open(self.filename + ".firstlast", "rb") as flfh:
+      l = flfh.readline()[:-1]
+      assert len(l) > 0
+      self.lower_bound = ComparisonCountingObject(l, self.stats)
+      l = flfh.readline()[:-1]
+      assert len(l) > 0
+      self.upper_bound = ComparisonCountingObject(l, self.stats)
+
+    self.next_line = self.br.readline()
+
+  def next_block(self):
+    """ Return an array containing the next block of items """
+    assert self.has_next()
+    block = []
+    while True:
+      l = self.next()
+      block.append(ComparisonCountingObject(l, self.stats))
+      if not self.has_next():
+        # EOF
+        break
+      if len(block) == BLOCK_SIZE:
+        break
+    return block
+
+  def next(self):
+    assert self.has_next()
+    l = self.next_line
+    self.next_line = self.br.readline()
+    return l[:-1]
+
+  def has_next(self):
+    return len(self.next_line) > 0
+
+  def __str__(self):
+    return self.filename
+
+class PagingBlockIterator(object):
+  """
+  Iterator-like object that pages an entire block of items into memory and
+  provides finer-grained access to the items.
+  """
+
+  def __init__(self, block_iter, stats):
+    """
+    block_iter: BlockIterator
+    stats: collections.Counter for keeping track of perf stats
+    """
+    self.block_iter = block_iter
+    self.stats = stats
+    self.paged_block = None
+    self.paged_block_idx = -1
+    self.paged_one_block = False
+
+  def page_next_block(self):
+    # Block must be fully consumed before paging the next one.
+    assert not self.paged_block or len(self.paged_block) == self.paged_block_idx + 1
+
+    if self.block_iter.has_next():
+      next_block = self.block_iter.next_block()
+      assert len(next_block) > 0
+
+      if self.paged_block is None:
+        self.stats['blocks_in_mem'] += 1
+        self.stats['peak_blocks_in_mem'] = max(self.stats['peak_blocks_in_mem'],
+                                               self.stats['blocks_in_mem'])
+
+      self.paged_block = next_block
+      self.paged_block_idx = 0
+      self.paged_one_block = True
+    else:
+      if self.paged_block is not None:
+        self.stats['blocks_in_mem'] -= 1
+      self.paged_block = None
+
+  def advance(self):
+    assert self.paged_block
+
+    item = self.paged_block[self.paged_block_idx]
+    if len(self.paged_block) == self.paged_block_idx + 1:
+      self.page_next_block()
+      return True
+    else:
+      self.paged_block_idx += 1
+      return False
+
+  def has_next(self):
+    return self.paged_block or self.block_iter.has_next()
+
+  def has_ever_paged(self):
+    return self.paged_one_block
+
+  def min(self):
+    return (self.paged_block[0]
+            if self.has_ever_paged() else self.block_iter.lower_bound)
+
+  def max(self):
+    return (self.paged_block[-1]
+            if self.has_ever_paged() else self.block_iter.upper_bound)
+
+  def cur(self):
+    return (self.paged_block[self.paged_block_idx]
+            if self.has_ever_paged() else self.block_iter.lower_bound)
+
+  def __str__(self):
+    s = ""
+    if self.paged_block:
+      s += ", ".join(str(i) for i in self.paged_block[self.paged_block_idx:])
+    suffix = str(self.block_iter)
+    if suffix:
+      s += ", "
+      s += suffix
+    return s
+
+def remove_dead_iters(iters):
+  """
+  Convenience method to filter out any fully-consumed iterators.
+  """
+  live_iters = []
+  for i in iters:
+    if i.has_next():
+      live_iters.append(i)
+  return live_iters
+
+class NaiveMergeIterator(object):
+  """
+  Simple merge iterator that uses no optimizations whatsoever. Every call to
+  next() iterates over all live iterators and returns the smallest item.
+  """
+  def __init__(self, iters):
+    self.iters = remove_dead_iters(iters)
+
+    # This iterator ignores bounds, so we must page in all blocks up front.
+    [i.page_next_block() for i in self.iters]
+
+  def next(self):
+    smallest_iter = None
+    for i in self.iters:
+      if not smallest_iter or i.cur() < smallest_iter.cur():
+        smallest_iter = i
+    assert smallest_iter
+    item = smallest_iter.cur()
+    smallest_iter.advance()
+    self.iters = remove_dead_iters(self.iters)
+    return item
+
+  def has_next(self):
+    return len(self.iters) > 0
+
+class SingleHeapMergeIterator(object):
+  """
+  More sophisticated merge iterator that uses a heap to optimize next() calls.
+
+  Initially, the underlying iterators' bounds are used to establish heap order.
+  When an iterator is next (i.e. when its first item is the global minimum), the
+  first block is paged in and the iterator is put back in the heap. In the
+  steady state, blocks are paged in as iterators are advanced and the heap is
+  reordered with every call to next().
+  """
+  def __init__(self, iters):
+    self.iters = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur())
+
+  def next(self):
+    smallest_iter = None
+    while True:
+      smallest_iter = self.iters.pop()
+      if not smallest_iter.has_ever_paged():
+        # Page in the first block and retry.
+        smallest_iter.page_next_block()
+        self.iters.push(smallest_iter)
+        continue
+      break
+
+    item = smallest_iter.cur()
+    smallest_iter.advance()
+    if smallest_iter.has_next():
+      self.iters.push(smallest_iter)
+    return item
+
+  def has_next(self):
+    return len(self.iters) > 0
+
+class DoubleHeapMergeIterator(object):
+  """
+  Hot/cold heap-based merge iterator.
+
+  This variant assigns iterators to two heaps. The "hot" heap includes all
+  iterators currently needed to perform the merge, while the "cold" heap
+  contains the rest.
+
+  While algorithmically equivalent to the basic heap-based merge iterator, the
+  amount of heap reordering is typically less due to the reduced size of the
+  working set (i.e. the size of the hot heap). This is especially true when the
+  input iterators do not overlap, as that allows the algorithm to maximize the
+  size of the cold heap.
+  """
+  def __init__(self, iters):
+    self.cold = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur())
+    self.hot = MyHeap([], key=lambda x : x.cur())
+    self._refill_hot()
+
+  def _refill_hot(self):
+    while len(self.cold) > 0 and (len(self.hot) == 0 or
+                                  self.hot.top().max() >= self.cold.top().cur()):
+      warmest = self.cold.pop()
+      if not warmest.has_ever_paged():
+        # Page in the first block and retry.
+        warmest.page_next_block()
+        self.cold.push(warmest)
+        continue
+      self.hot.push(warmest)
+
+  def next(self):
+    smallest_iter = self.hot.pop()
+
+    item = smallest_iter.cur()
+    paged_new_block = smallest_iter.advance()
+    is_dead = not smallest_iter.has_next()
+
+    if is_dead:
+      self._refill_hot()
+    elif paged_new_block:
+      if len(self.hot) > 0 and self.hot.top().max() < smallest_iter.cur():
+        # 'smallest_iter' is no longer in the merge window.
+        self.cold.push(smallest_iter)
+      else:
+        self.hot.push(smallest_iter)
+      self._refill_hot()
+    else:
+      self.hot.push(smallest_iter)
+    return item
+
+  def has_next(self):
+    return len(self.hot) > 0 or len(self.cold) > 0
+
+class TripleHeapMergeIterator(object):
+  """
+  Advanced hot/cold heap-based merge iterator.
+
+  Like DoubleHeapMergeIterator but uses an additional heap (of the result of
+  max() in each iterator found in "hot") to more accurately track the top end
+  of the merge window. The result is an even smaller hot heap.
+  """
+  def __init__(self, iters):
+    self.cold = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur())
+    self.hot = MyHeap([], key=lambda x : x.cur())
+    self.hotmaxes = MyHeap([])
+    self._refill_hot()
+
+  def _refill_hot(self):
+    while len(self.cold) > 0 and (len(self.hotmaxes) == 0 or
+                                  self.hotmaxes.top() >= self.cold.top().cur()):
+      warmest = self.cold.pop()
+      if not warmest.has_ever_paged():
+        # Page in the first block and retry.
+        warmest.page_next_block()
+        self.cold.push(warmest)
+        continue
+      self.hot.push(warmest)
+      self.hotmaxes.push(warmest.max())
+
+  def next(self):
+    smallest_iter = self.hot.pop()
+    # Defer pop of hotmaxes; it only needs to happen if we've finished a block.
+
+    item = smallest_iter.cur()
+    paged_new_block = smallest_iter.advance()
+    is_dead = not smallest_iter.has_next()
+
+    if is_dead:
+      self.hotmaxes.pop()
+      self._refill_hot()
+    elif paged_new_block:
+      self.hotmaxes.pop()
+      if len(self.hotmaxes) > 0 and self.hotmaxes.top() < smallest_iter.cur():
+        # 'smallest_iter' is no longer in the merge window.
+        self.cold.push(smallest_iter)
+      else:
+        self.hot.push(smallest_iter)
+        self.hotmaxes.push(smallest_iter.max())
+      self._refill_hot()
+    else:
+      self.hot.push(smallest_iter)
+    return item
+
+  def has_next(self):
+    return len(self.hot) > 0 or len(self.cold) > 0
+
+@total_ordering
+class ComparisonCountingObject(object):
+  def __init__(self, val, stats):
+    self.val = val
+    self.stats = stats
+
+  def __eq__(self, rhs):
+    assert isinstance(rhs, ComparisonCountingObject)
+    self.stats['cmp'] += 1
+    return self.val.__eq__(rhs.val)
+
+  def __lt__(self, rhs):
+    assert isinstance(rhs, ComparisonCountingObject)
+    self.stats['cmp'] += 1
+    return self.val.__lt__(rhs.val)
+
+  def __str__(self):
+    return str(self.val)
+
+class ComparisonCountingInt(object):
+  def __init__(self, val, stats):
+    self.val = val
+    self.stats = stats
+
+  def __cmp__(self, rhs):
+    assert isinstance(rhs, ComparisonCountingInt)
+    self.stats['cmp'] += 1
+    return self.val.__cmp__(rhs.val)
+
+  def __str__(self):
+    return str(self.val)
+
+class TestMerges(unittest.TestCase):
+  maxDiff = 1e9
+  def generate_input(self, pattern):
+    lists_of_items = []
+    expected_items = []
+    last_item = 0
+    for i in xrange(NUM_ITERATORS):
+      if pattern == 'overlapping':
+        min_item = 0
+      elif pattern == 'non-overlapping':
+        min_item = last_item
+      elif pattern == 'half-overlapping':
+        min_item = last_item - MAX_ITEM / 2
+      num_items = random.randint(MIN_ITEMS_PER_ITERATOR, MAX_ITEMS_PER_ITERATOR)
+      items = random.sample(xrange(min_item, MAX_ITEM + min_item), num_items)
+      items.sort()
+      lists_of_items.append(items)
+      expected_items.extend(items)
+      last_item = items[-1]
+    expected_items.sort()
+    return (lists_of_items, expected_items)
+
+  def run_merge(self, merge_type, pattern, lists_of_items, list_of_files, expected_results):
+    stats = Counter()
+    start = time.time()
+    if lists_of_items:
+      iters = [PagingBlockIterator(BlockIterator([ComparisonCountingInt(i, stats) for i in l]),
+                                   stats) for l in lists_of_items]
+    else:
+      assert list_of_files
+      iters = [PagingBlockIterator(FileBlockIterator(f, stats),
+                                   stats) for f in list_of_files]
+    logging.info("Starting merge with {}".format(merge_type.__name__))
+    merge_iter = merge_type(iters)
+    logging.info("Initialized iterator")
+    results = []
+    num_results = 0
+    t1 = start
+    while merge_iter.has_next():
+      n = merge_iter.next().val
+      num_results += 1
+      if expected_results:
+        results.append(n)
+
+      t2 = time.time()
+      if t2 - t1 > 10:
+        logging.info("Merged {} elements ({} eps) {}".format(
+          num_results,
+          num_results / (t2 - start),
+          repr(stats)))
+        t1 = t2
+    elapsed = time.time() - start
+    logging.info("Merged {} elements".format(num_results))
+    if expected_results:
+      self.assertEqual(expected_results, results)
+    logging.info("{} with {} input: {}s {}".format(
+      merge_type.__name__,
+      pattern,
+      elapsed,
+      repr(stats)))
+
+  def _do_test(self, pattern):
+    lists_of_items, expected_items = self.generate_input(pattern)
+
+    # Commented out because it's too slow with the current parameters.
+    #
+    # self.run_merge(NaiveMergeIterator, pattern, lists_of_items, None, expected_items)
+    self.run_merge(SingleHeapMergeIterator, pattern, lists_of_items, None, expected_items)
+    self.run_merge(DoubleHeapMergeIterator, pattern, lists_of_items, None, expected_items)
+    self.run_merge(TripleHeapMergeIterator, pattern, lists_of_items, None, expected_items)
+
+  def test_overlapping_input(self):
+    self._do_test('overlapping')
+
+  def test_nonoverlapping_input(self):
+    self._do_test('non-overlapping')
+
+  def test_half_overlapping_input(self):
+    self._do_test('half-overlapping')
+
+  def test_real_input(self):
+    list_of_files = glob.glob("rowset_keys_*.gz")
+    if len(list_of_files) == 0:
+      self.skipTest("No real input found")
+
+    # Commented out because it's too slow with the current parameters.
+    #
+    # self.run_merge(NaiveMergeIterator, "real", None, list_of_files, None)
+    self.run_merge(SingleHeapMergeIterator, "real", None, list_of_files, None)
+    self.run_merge(DoubleHeapMergeIterator, "real", None, list_of_files, None)
+    self.run_merge(TripleHeapMergeIterator, "real", None, list_of_files, None)
+
+if __name__ == "__main__":
+  logging.basicConfig(level=logging.INFO,
+                      format='%(asctime)s %(levelname)s: %(message)s')
+  unittest.main()


[kudu] 01/02: tool: fixes for kudu local_replica dump rowset

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 84e1e8de3da32d3a4c20930272f237e0c35aa735
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Mar 12 16:33:10 2019 -0700

    tool: fixes for kudu local_replica dump rowset
    
    This patch makes several adjustments to 'kudu local_replica dump rowset':
    - The existing 'metadata_only' and 'nrows' controls were being ignored.
    - The existing 'rowset_index' control wasn't working properly.
    - I changed the "what to dump" contols to 'dump_all_columns' and
      'dump_metadata'. When 'dump_all_columns' is false, the row keys are dumped
      in a format that's comparable and ASCII-compatible (currently hex).
    
    This functionality helped me dump a tablet's keys (grouped by rowset), which
    I then used for a series of MergeIterator experiments.
    
    Change-Id: Ib50ab4e7b2aa0fec60ce0718d16823945a05cb7f
    Reviewed-on: http://gerrit.cloudera.org:8080/12976
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tools/kudu-tool-test.cc            | 133 ++++++++++++++++++++--------
 src/kudu/tools/tool_action_local_replica.cc |  87 ++++++++++++++----
 2 files changed, 163 insertions(+), 57 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 50ff0d6..2c9a002 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -73,6 +73,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/numbers.h"
@@ -1725,13 +1726,15 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
   ASSERT_OK(harness.Open());
   LocalTabletWriter writer(harness.tablet().get(), &kSchema);
   KuduPartialRow row(&kSchemaWithIds);
-  for (int i = 0; i< 10; i++) {
-    ASSERT_OK(row.SetInt32(0, i));
-    ASSERT_OK(row.SetInt32(1, i*10));
-    ASSERT_OK(row.SetStringCopy(2, "HelloWorld"));
-    writer.Insert(row);
+  for (int num_rowsets = 0; num_rowsets < 3; num_rowsets++) {
+    for (int i = 0; i < 10; i++) {
+      ASSERT_OK(row.SetInt32(0, num_rowsets * 10 + i));
+      ASSERT_OK(row.SetInt32(1, num_rowsets * 10 * 10 + i));
+      ASSERT_OK(row.SetStringCopy(2, "HelloWorld"));
+      writer.Insert(row);
+    }
+    harness.tablet()->Flush();
   }
-  harness.tablet()->Flush();
   harness.tablet()->Shutdown();
   string fs_paths = "--fs_wal_dir=" + kTestDir + " "
       "--fs_data_dirs=" + kTestDir;
@@ -1785,6 +1788,25 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
     string expected = "Could not find rowset " + SimpleItoa(kRowId) +
         " in tablet id " + kTestTablet;
     ASSERT_STR_CONTAINS(stderr, expected);
+
+    NO_FATALS(RunActionStdoutString(
+        Substitute("local_replica dump rowset --nodump_all_columns "
+                   "--nodump_metadata --nrows=15 $0 $1",
+                   kTestTablet, fs_paths), &stdout));
+
+    SCOPED_TRACE(stdout);
+    ASSERT_STR_CONTAINS(stdout, "Dumping rowset 0");
+    ASSERT_STR_CONTAINS(stdout, "Dumping rowset 1");
+    ASSERT_STR_CONTAINS(stdout, "Dumping rowset 2");
+    ASSERT_STR_NOT_CONTAINS(stdout, "RowSet metadata");
+    for (int row_idx = 0; row_idx < 30; row_idx++) {
+      string row_key = StringPrintf("800000%02x", row_idx);
+      if (row_idx < 15) {
+        ASSERT_STR_CONTAINS(stdout, row_key);
+      } else {
+        ASSERT_STR_NOT_CONTAINS(stdout, row_key);
+      }
+    }
   }
   {
     TabletMetadata* meta = harness.tablet()->metadata();
@@ -1834,22 +1856,38 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
  KuduTableTestId | ffffffffffffffffffffffffffffffff | 0         | BLOOM            | 4.1K
  KuduTableTestId | ffffffffffffffffffffffffffffffff | 0         | PK               | 0B
  KuduTableTestId | ffffffffffffffffffffffffffffffff | 0         | *                | 4.6K
- KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | c10 (key)        | 164B
- KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | c11 (int_val)    | 113B
- KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | c12 (string_val) | 138B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | c10 (key)        | 184B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | c11 (int_val)    | 129B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | c12 (string_val) | 158B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | REDO             | 0B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | UNDO             | 181B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | BLOOM            | 4.1K
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | PK               | 0B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 1         | *                | 4.7K
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | c10 (key)        | 184B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | c11 (int_val)    | 129B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | c12 (string_val) | 158B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | REDO             | 0B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | UNDO             | 181B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | BLOOM            | 4.1K
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | PK               | 0B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | 2         | *                | 4.7K
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | c10 (key)        | 543B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | c11 (int_val)    | 364B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | c12 (string_val) | 472B
  KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | REDO             | 0B
- KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | UNDO             | 169B
- KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | BLOOM            | 4.1K
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | UNDO             | 492B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | BLOOM            | 12.2K
  KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | PK               | 0B
- KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | *                | 4.6K
- KuduTableTestId | *                                | *         | c10 (key)        | 164B
- KuduTableTestId | *                                | *         | c11 (int_val)    | 113B
- KuduTableTestId | *                                | *         | c12 (string_val) | 138B
+ KuduTableTestId | ffffffffffffffffffffffffffffffff | *         | *                | 14.1K
+ KuduTableTestId | *                                | *         | c10 (key)        | 543B
+ KuduTableTestId | *                                | *         | c11 (int_val)    | 364B
+ KuduTableTestId | *                                | *         | c12 (string_val) | 472B
  KuduTableTestId | *                                | *         | REDO             | 0B
- KuduTableTestId | *                                | *         | UNDO             | 169B
- KuduTableTestId | *                                | *         | BLOOM            | 4.1K
+ KuduTableTestId | *                                | *         | UNDO             | 492B
+ KuduTableTestId | *                                | *         | BLOOM            | 12.2K
  KuduTableTestId | *                                | *         | PK               | 0B
- KuduTableTestId | *                                | *         | *                | 4.6K
+ KuduTableTestId | *                                | *         | *                | 14.1K
 )";
     // Preprocess stdout and our expected table so that we are less
     // sensitive to small variations in encodings, id assignment, etc.
@@ -1891,14 +1929,19 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
 
   // Test 'kudu fs list' rowset group.
   {
-    string stdout;
-    NO_FATALS(RunActionStdoutString(
+    vector<string> stdout;
+    NO_FATALS(RunActionStdoutLines(
           Substitute("fs list $0 --columns=table,tablet-id,rowset-id --format=csv",
                      fs_paths),
           &stdout));
 
     SCOPED_TRACE(stdout);
-    EXPECT_EQ(stdout, "KuduTableTest,ffffffffffffffffffffffffffffffff,0");
+    ASSERT_EQ(3, stdout.size());
+    for (int rowset_idx = 0; rowset_idx < 3; rowset_idx++) {
+      EXPECT_EQ(stdout[rowset_idx],
+                Substitute("KuduTableTest,ffffffffffffffffffffffffffffffff,$0",
+                           rowset_idx));
+    }
   }
   // Test 'kudu fs list' block group.
   {
@@ -1911,12 +1954,19 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
           &stdout));
 
     SCOPED_TRACE(stdout);
-    ASSERT_EQ(5, stdout.size());
-    EXPECT_EQ(stdout[0], Substitute("KuduTableTest,$0,0,column,key", kTestTablet));
-    EXPECT_EQ(stdout[1], Substitute("KuduTableTest,$0,0,column,int_val", kTestTablet));
-    EXPECT_EQ(stdout[2], Substitute("KuduTableTest,$0,0,column,string_val", kTestTablet));
-    EXPECT_EQ(stdout[3], Substitute("KuduTableTest,$0,0,undo,", kTestTablet));
-    EXPECT_EQ(stdout[4], Substitute("KuduTableTest,$0,0,bloom,", kTestTablet));
+    ASSERT_EQ(15, stdout.size());
+    for (int rowset_idx = 0; rowset_idx < 3; rowset_idx++) {
+      EXPECT_EQ(stdout[rowset_idx * 5 + 0],
+                Substitute("KuduTableTest,$0,$1,column,key", kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 1],
+                Substitute("KuduTableTest,$0,$1,column,int_val", kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 2],
+                Substitute("KuduTableTest,$0,$1,column,string_val", kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 3],
+                Substitute("KuduTableTest,$0,$1,undo,", kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 4],
+                Substitute("KuduTableTest,$0,$1,bloom,", kTestTablet, rowset_idx));
+    }
   }
 
   // Test 'kudu fs list' cfile group.
@@ -1931,17 +1981,24 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
           &stdout));
 
     SCOPED_TRACE(stdout);
-    ASSERT_EQ(5, stdout.size());
-    EXPECT_EQ(stdout[0],
-              Substitute("KuduTableTest,$0,0,column,key,BIT_SHUFFLE,10", kTestTablet));
-    EXPECT_EQ(stdout[1],
-              Substitute("KuduTableTest,$0,0,column,int_val,BIT_SHUFFLE,10", kTestTablet));
-    EXPECT_EQ(stdout[2],
-              Substitute("KuduTableTest,$0,0,column,string_val,DICT_ENCODING,10", kTestTablet));
-    EXPECT_EQ(stdout[3],
-              Substitute("KuduTableTest,$0,0,undo,,PLAIN_ENCODING,10", kTestTablet));
-    EXPECT_EQ(stdout[4],
-              Substitute("KuduTableTest,$0,0,bloom,,PLAIN_ENCODING,0", kTestTablet));
+    ASSERT_EQ(15, stdout.size());
+    for (int rowset_idx = 0; rowset_idx < 3; rowset_idx++) {
+      EXPECT_EQ(stdout[rowset_idx * 5 + 0],
+                Substitute("KuduTableTest,$0,$1,column,key,BIT_SHUFFLE,10",
+                           kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 1],
+                Substitute("KuduTableTest,$0,$1,column,int_val,BIT_SHUFFLE,10",
+                           kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 2],
+                Substitute("KuduTableTest,$0,$1,column,string_val,DICT_ENCODING,10",
+                           kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 3],
+                Substitute("KuduTableTest,$0,$1,undo,,PLAIN_ENCODING,10",
+                           kTestTablet, rowset_idx));
+      EXPECT_EQ(stdout[rowset_idx * 5 + 4],
+                Substitute("KuduTableTest,$0,$1,bloom,,PLAIN_ENCODING,0",
+                           kTestTablet, rowset_idx));
+    }
   }
 }
 
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 2173acb..038baed 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <iostream>
@@ -31,7 +32,9 @@
 #include <glog/logging.h>
 
 #include "kudu/common/common.pb.h"
+#include "kudu/common/iterator.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus.pb.h"
@@ -48,8 +51,10 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/numbers.h"
@@ -60,6 +65,7 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/tablet/diskrowset.h"
 #include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/tablet/rowset_metadata.h"
 #include "kudu/tablet/tablet_mem_trackers.h"
 #include "kudu/tablet/tablet_metadata.h"
@@ -70,16 +76,20 @@
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 
-DEFINE_bool(dump_data, false,
-            "Dump the data for each column in the rowset.");
-DEFINE_bool(metadata_only, false,
-            "Only dump the block metadata when printing blocks.");
-DEFINE_int64(nrows, 0, "Number of rows to dump");
+DEFINE_bool(dump_all_columns, true,
+            "If true, dumped rows include all of the columns in the rowset. If "
+            "false, dumped rows include just the key columns (in a comparable format).");
+DEFINE_bool(dump_metadata, true,
+            "If true, dumps rowset metadata before dumping data. If false, "
+            "only dumps the data.");
+DEFINE_int64(nrows, -1, "Number of rows to dump. If negative, dumps all rows.");
 DEFINE_bool(list_detail, false,
             "Print partition info for the local replicas");
 DEFINE_int64(rowset_index, -1,
@@ -99,6 +109,7 @@ using consensus::ConsensusMetadataManager;
 using consensus::OpId;
 using consensus::RaftConfigPB;
 using consensus::RaftPeerPB;
+using fs::IOContext;
 using fs::ReadableBlock;
 using log::LogEntryPB;
 using log::LogEntryReader;
@@ -118,6 +129,7 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 using tablet::DiskRowSet;
+using tablet::RowIteratorOptions;
 using tablet::RowSetMetadata;
 using tablet::TabletMetadata;
 using tablet::TabletDataState;
@@ -620,31 +632,64 @@ Status ListLocalReplicas(const RunnerContext& context) {
   return Status::OK();
 }
 
-Status DumpRowSetInternal(const shared_ptr<RowSetMetadata>& rs_meta,
-                          int indent) {
+Status DumpRowSetInternal(const IOContext& ctx,
+                          const shared_ptr<RowSetMetadata>& rs_meta,
+                          int indent,
+                          int64_t* rows_left) {
   tablet::RowSetDataPB pb;
   rs_meta->ToProtobuf(&pb);
 
-  cout << Indent(indent) << "RowSet metadata: " << pb_util::SecureDebugString(pb)
-       << endl << endl;
+  if (FLAGS_dump_metadata) {
+    cout << Indent(indent) << "RowSet metadata: " << pb_util::SecureDebugString(pb)
+         << endl << endl;
+  }
 
   scoped_refptr<log::LogAnchorRegistry> log_reg(new log::LogAnchorRegistry());
   shared_ptr<DiskRowSet> rs;
   RETURN_NOT_OK(DiskRowSet::Open(rs_meta,
                                  log_reg.get(),
                                  tablet::TabletMemTrackers(),
-                                 nullptr,
+                                 &ctx,
                                  &rs));
   vector<string> lines;
-  RETURN_NOT_OK(rs->DebugDump(&lines));
-  for (const auto& l : lines) {
-    cout << l << endl;
+  if (FLAGS_dump_all_columns) {
+    RETURN_NOT_OK(rs->DebugDump(&lines));
+  } else {
+    Schema key_proj = rs_meta->tablet_schema().CreateKeyProjection();
+    RowIteratorOptions opts;
+    opts.projection = &key_proj;
+    opts.io_context = &ctx;
+    unique_ptr<RowwiseIterator> it;
+    RETURN_NOT_OK(rs->NewRowIterator(opts, &it));
+    RETURN_NOT_OK(it->Init(nullptr));
+
+    Arena arena(1024);
+    RowBlock block(&key_proj, 100, &arena);
+    faststring key;
+    while (it->HasNext()) {
+      RETURN_NOT_OK(it->NextBlock(&block));
+      for (int i = 0; i < block.nrows(); i++) {
+        key_proj.EncodeComparableKey(block.row(i), &key);
+        lines.emplace_back(strings::b2a_hex(key.ToString()));
+      }
+    }
+  }
+
+  // Respect 'rows_left' when dumping the output.
+  int64_t limit = *rows_left >= 0 ?
+                  std::min<int64_t>(*rows_left, lines.size()) : lines.size();
+  for (int i = 0; i < limit; i++) {
+    cout << lines[i] << endl;
   }
 
+  if (*rows_left >= 0) {
+    *rows_left -= limit;
+  }
   return Status::OK();
 }
 
 Status DumpRowSet(const RunnerContext& context) {
+  const int kIndent = 2;
   unique_ptr<FsManager> fs_manager;
   RETURN_NOT_OK(FsInit(&fs_manager));
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
@@ -657,11 +702,15 @@ Status DumpRowSet(const RunnerContext& context) {
     return Status::OK();
   }
 
+  IOContext ctx;
+  ctx.tablet_id = meta->tablet_id();
+  int64_t rows_left = FLAGS_nrows;
+
   // If rowset index is provided, only dump that rowset.
   if (FLAGS_rowset_index != -1) {
-    for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets())  {
+    for (const auto& rs_meta : meta->rowsets())  {
       if (rs_meta->id() == FLAGS_rowset_index) {
-        return Status::OK();
+        return DumpRowSetInternal(ctx, rs_meta, kIndent, &rows_left);
       }
     }
     return Status::InvalidArgument(
@@ -671,9 +720,9 @@ Status DumpRowSet(const RunnerContext& context) {
 
   // Rowset index not provided, dump all rowsets
   size_t idx = 0;
-  for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets())  {
+  for (const auto& rs_meta : meta->rowsets())  {
     cout << endl << "Dumping rowset " << idx++ << endl << kSeparatorLine;
-    RETURN_NOT_OK(DumpRowSetInternal(rs_meta, 2));
+    RETURN_NOT_OK(DumpRowSetInternal(ctx, rs_meta, kIndent, &rows_left));
   }
   return Status::OK();
 }
@@ -734,11 +783,11 @@ unique_ptr<Mode> BuildDumpMode() {
       ActionBuilder("rowset", &DumpRowSet)
       .Description("Dump the rowset contents of a local replica")
       .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
-      .AddOptionalParameter("dump_data")
+      .AddOptionalParameter("dump_all_columns")
+      .AddOptionalParameter("dump_metadata")
       .AddOptionalParameter("fs_data_dirs")
       .AddOptionalParameter("fs_metadata_dir")
       .AddOptionalParameter("fs_wal_dir")
-      .AddOptionalParameter("metadata_only")
       .AddOptionalParameter("nrows")
       .AddOptionalParameter("rowset_index")
       .Build();