You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2016/01/05 20:52:19 UTC

[19/50] [abbrv] hadoop git commit: [partial-ns] Import HDFSDB.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/filter_block_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/filter_block_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/filter_block_test.cc
new file mode 100644
index 0000000..8c4a474
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/filter_block_test.cc
@@ -0,0 +1,128 @@
+// Copyright (c) 2012 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "table/filter_block.h"
+
+#include "leveldb/filter_policy.h"
+#include "util/coding.h"
+#include "util/hash.h"
+#include "util/logging.h"
+#include "util/testharness.h"
+#include "util/testutil.h"
+
+namespace leveldb {
+
+// For testing: emit an array with one hash value per key
+class TestHashFilter : public FilterPolicy {
+ public:
+  virtual const char* Name() const {
+    return "TestHashFilter";
+  }
+
+  virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const {
+    for (int i = 0; i < n; i++) {
+      uint32_t h = Hash(keys[i].data(), keys[i].size(), 1);
+      PutFixed32(dst, h);
+    }
+  }
+
+  virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const {
+    uint32_t h = Hash(key.data(), key.size(), 1);
+    for (size_t i = 0; i + 4 <= filter.size(); i += 4) {
+      if (h == DecodeFixed32(filter.data() + i)) {
+        return true;
+      }
+    }
+    return false;
+  }
+};
+
+class FilterBlockTest {
+ public:
+  TestHashFilter policy_;
+};
+
+TEST(FilterBlockTest, EmptyBuilder) {
+  FilterBlockBuilder builder(&policy_);
+  Slice block = builder.Finish();
+  ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block));
+  FilterBlockReader reader(&policy_, block);
+  ASSERT_TRUE(reader.KeyMayMatch(0, "foo"));
+  ASSERT_TRUE(reader.KeyMayMatch(100000, "foo"));
+}
+
+TEST(FilterBlockTest, SingleChunk) {
+  FilterBlockBuilder builder(&policy_);
+  builder.StartBlock(100);
+  builder.AddKey("foo");
+  builder.AddKey("bar");
+  builder.AddKey("box");
+  builder.StartBlock(200);
+  builder.AddKey("box");
+  builder.StartBlock(300);
+  builder.AddKey("hello");
+  Slice block = builder.Finish();
+  FilterBlockReader reader(&policy_, block);
+  ASSERT_TRUE(reader.KeyMayMatch(100, "foo"));
+  ASSERT_TRUE(reader.KeyMayMatch(100, "bar"));
+  ASSERT_TRUE(reader.KeyMayMatch(100, "box"));
+  ASSERT_TRUE(reader.KeyMayMatch(100, "hello"));
+  ASSERT_TRUE(reader.KeyMayMatch(100, "foo"));
+  ASSERT_TRUE(! reader.KeyMayMatch(100, "missing"));
+  ASSERT_TRUE(! reader.KeyMayMatch(100, "other"));
+}
+
+TEST(FilterBlockTest, MultiChunk) {
+  FilterBlockBuilder builder(&policy_);
+
+  // First filter
+  builder.StartBlock(0);
+  builder.AddKey("foo");
+  builder.StartBlock(2000);
+  builder.AddKey("bar");
+
+  // Second filter
+  builder.StartBlock(3100);
+  builder.AddKey("box");
+
+  // Third filter is empty
+
+  // Last filter
+  builder.StartBlock(9000);
+  builder.AddKey("box");
+  builder.AddKey("hello");
+
+  Slice block = builder.Finish();
+  FilterBlockReader reader(&policy_, block);
+
+  // Check first filter
+  ASSERT_TRUE(reader.KeyMayMatch(0, "foo"));
+  ASSERT_TRUE(reader.KeyMayMatch(2000, "bar"));
+  ASSERT_TRUE(! reader.KeyMayMatch(0, "box"));
+  ASSERT_TRUE(! reader.KeyMayMatch(0, "hello"));
+
+  // Check second filter
+  ASSERT_TRUE(reader.KeyMayMatch(3100, "box"));
+  ASSERT_TRUE(! reader.KeyMayMatch(3100, "foo"));
+  ASSERT_TRUE(! reader.KeyMayMatch(3100, "bar"));
+  ASSERT_TRUE(! reader.KeyMayMatch(3100, "hello"));
+
+  // Check third filter (empty)
+  ASSERT_TRUE(! reader.KeyMayMatch(4100, "foo"));
+  ASSERT_TRUE(! reader.KeyMayMatch(4100, "bar"));
+  ASSERT_TRUE(! reader.KeyMayMatch(4100, "box"));
+  ASSERT_TRUE(! reader.KeyMayMatch(4100, "hello"));
+
+  // Check last filter
+  ASSERT_TRUE(reader.KeyMayMatch(9000, "box"));
+  ASSERT_TRUE(reader.KeyMayMatch(9000, "hello"));
+  ASSERT_TRUE(! reader.KeyMayMatch(9000, "foo"));
+  ASSERT_TRUE(! reader.KeyMayMatch(9000, "bar"));
+}
+
+}  // namespace leveldb
+
+int main(int argc, char** argv) {
+  return leveldb::test::RunAllTests();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.cc
new file mode 100644
index 0000000..cda1dec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.cc
@@ -0,0 +1,145 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "table/format.h"
+
+#include "leveldb/env.h"
+#include "port/port.h"
+#include "table/block.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+
+namespace leveldb {
+
+void BlockHandle::EncodeTo(std::string* dst) const {
+  // Sanity check that all fields have been set
+  assert(offset_ != ~static_cast<uint64_t>(0));
+  assert(size_ != ~static_cast<uint64_t>(0));
+  PutVarint64(dst, offset_);
+  PutVarint64(dst, size_);
+}
+
+Status BlockHandle::DecodeFrom(Slice* input) {
+  if (GetVarint64(input, &offset_) &&
+      GetVarint64(input, &size_)) {
+    return Status::OK();
+  } else {
+    return Status::Corruption("bad block handle");
+  }
+}
+
+void Footer::EncodeTo(std::string* dst) const {
+#ifndef NDEBUG
+  const size_t original_size = dst->size();
+#endif
+  metaindex_handle_.EncodeTo(dst);
+  index_handle_.EncodeTo(dst);
+  dst->resize(2 * BlockHandle::kMaxEncodedLength);  // Padding
+  PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber & 0xffffffffu));
+  PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber >> 32));
+  assert(dst->size() == original_size + kEncodedLength);
+}
+
+Status Footer::DecodeFrom(Slice* input) {
+  const char* magic_ptr = input->data() + kEncodedLength - 8;
+  const uint32_t magic_lo = DecodeFixed32(magic_ptr);
+  const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4);
+  const uint64_t magic = ((static_cast<uint64_t>(magic_hi) << 32) |
+                          (static_cast<uint64_t>(magic_lo)));
+  if (magic != kTableMagicNumber) {
+    return Status::InvalidArgument("not an sstable (bad magic number)");
+  }
+
+  Status result = metaindex_handle_.DecodeFrom(input);
+  if (result.ok()) {
+    result = index_handle_.DecodeFrom(input);
+  }
+  if (result.ok()) {
+    // We skip over any leftover data (just padding for now) in "input"
+    const char* end = magic_ptr + 8;
+    *input = Slice(end, input->data() + input->size() - end);
+  }
+  return result;
+}
+
+Status ReadBlock(RandomAccessFile* file,
+                 const ReadOptions& options,
+                 const BlockHandle& handle,
+                 BlockContents* result) {
+  result->data = Slice();
+  result->cachable = false;
+  result->heap_allocated = false;
+
+  // Read the block contents as well as the type/crc footer.
+  // See table_builder.cc for the code that built this structure.
+  size_t n = static_cast<size_t>(handle.size());
+  char* buf = new char[n + kBlockTrailerSize];
+  Slice contents;
+  Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf);
+  if (!s.ok()) {
+    delete[] buf;
+    return s;
+  }
+  if (contents.size() != n + kBlockTrailerSize) {
+    delete[] buf;
+    return Status::Corruption("truncated block read");
+  }
+
+  // Check the crc of the type and the block contents
+  const char* data = contents.data();    // Pointer to where Read put the data
+  if (options.verify_checksums) {
+    const uint32_t crc = crc32c::Unmask(DecodeFixed32(data + n + 1));
+    const uint32_t actual = crc32c::Value(data, n + 1);
+    if (actual != crc) {
+      delete[] buf;
+      s = Status::Corruption("block checksum mismatch");
+      return s;
+    }
+  }
+
+  switch (data[n]) {
+    case kNoCompression:
+      if (data != buf) {
+        // File implementation gave us pointer to some other data.
+        // Use it directly under the assumption that it will be live
+        // while the file is open.
+        delete[] buf;
+        result->data = Slice(data, n);
+        result->heap_allocated = false;
+        result->cachable = false;  // Do not double-cache
+      } else {
+        result->data = Slice(buf, n);
+        result->heap_allocated = true;
+        result->cachable = true;
+      }
+
+      // Ok
+      break;
+    case kSnappyCompression: {
+      size_t ulength = 0;
+      if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
+        delete[] buf;
+        return Status::Corruption("corrupted compressed block contents");
+      }
+      char* ubuf = new char[ulength];
+      if (!port::Snappy_Uncompress(data, n, ubuf)) {
+        delete[] buf;
+        delete[] ubuf;
+        return Status::Corruption("corrupted compressed block contents");
+      }
+      delete[] buf;
+      result->data = Slice(ubuf, ulength);
+      result->heap_allocated = true;
+      result->cachable = true;
+      break;
+    }
+    default:
+      delete[] buf;
+      return Status::Corruption("bad block type");
+  }
+
+  return Status::OK();
+}
+
+}  // namespace leveldb

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.h
new file mode 100644
index 0000000..6c0b80c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/format.h
@@ -0,0 +1,108 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef STORAGE_LEVELDB_TABLE_FORMAT_H_
+#define STORAGE_LEVELDB_TABLE_FORMAT_H_
+
+#include <string>
+#include <stdint.h>
+#include "leveldb/slice.h"
+#include "leveldb/status.h"
+#include "leveldb/table_builder.h"
+
+namespace leveldb {
+
+class Block;
+class RandomAccessFile;
+struct ReadOptions;
+
+// BlockHandle is a pointer to the extent of a file that stores a data
+// block or a meta block.
+class BlockHandle {
+ public:
+  BlockHandle();
+
+  // The offset of the block in the file.
+  uint64_t offset() const { return offset_; }
+  void set_offset(uint64_t offset) { offset_ = offset; }
+
+  // The size of the stored block
+  uint64_t size() const { return size_; }
+  void set_size(uint64_t size) { size_ = size; }
+
+  void EncodeTo(std::string* dst) const;
+  Status DecodeFrom(Slice* input);
+
+  // Maximum encoding length of a BlockHandle
+  enum { kMaxEncodedLength = 10 + 10 };
+
+ private:
+  uint64_t offset_;
+  uint64_t size_;
+};
+
+// Footer encapsulates the fixed information stored at the tail
+// end of every table file.
+class Footer {
+ public:
+  Footer() { }
+
+  // The block handle for the metaindex block of the table
+  const BlockHandle& metaindex_handle() const { return metaindex_handle_; }
+  void set_metaindex_handle(const BlockHandle& h) { metaindex_handle_ = h; }
+
+  // The block handle for the index block of the table
+  const BlockHandle& index_handle() const {
+    return index_handle_;
+  }
+  void set_index_handle(const BlockHandle& h) {
+    index_handle_ = h;
+  }
+
+  void EncodeTo(std::string* dst) const;
+  Status DecodeFrom(Slice* input);
+
+  // Encoded length of a Footer.  Note that the serialization of a
+  // Footer will always occupy exactly this many bytes.  It consists
+  // of two block handles and a magic number.
+  enum {
+    kEncodedLength = 2*BlockHandle::kMaxEncodedLength + 8
+  };
+
+ private:
+  BlockHandle metaindex_handle_;
+  BlockHandle index_handle_;
+};
+
+// kTableMagicNumber was picked by running
+//    echo http://code.google.com/p/leveldb/ | sha1sum
+// and taking the leading 64 bits.
+static const uint64_t kTableMagicNumber = 0xdb4775248b80fb57ull;
+
+// 1-byte type + 32-bit crc
+static const size_t kBlockTrailerSize = 5;
+
+struct BlockContents {
+  Slice data;           // Actual contents of data
+  bool cachable;        // True iff data can be cached
+  bool heap_allocated;  // True iff caller should delete[] data.data()
+};
+
+// Read the block identified by "handle" from "file".  On failure
+// return non-OK.  On success fill *result and return OK.
+extern Status ReadBlock(RandomAccessFile* file,
+                        const ReadOptions& options,
+                        const BlockHandle& handle,
+                        BlockContents* result);
+
+// Implementation details follow.  Clients should ignore,
+
+inline BlockHandle::BlockHandle()
+    : offset_(~static_cast<uint64_t>(0)),
+      size_(~static_cast<uint64_t>(0)) {
+}
+
+}  // namespace leveldb
+
+#endif  // STORAGE_LEVELDB_TABLE_FORMAT_H_

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator.cc
new file mode 100644
index 0000000..3d1c87f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator.cc
@@ -0,0 +1,67 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "leveldb/iterator.h"
+
+namespace leveldb {
+
+Iterator::Iterator() {
+  cleanup_.function = NULL;
+  cleanup_.next = NULL;
+}
+
+Iterator::~Iterator() {
+  if (cleanup_.function != NULL) {
+    (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2);
+    for (Cleanup* c = cleanup_.next; c != NULL; ) {
+      (*c->function)(c->arg1, c->arg2);
+      Cleanup* next = c->next;
+      delete c;
+      c = next;
+    }
+  }
+}
+
+void Iterator::RegisterCleanup(CleanupFunction func, void* arg1, void* arg2) {
+  assert(func != NULL);
+  Cleanup* c;
+  if (cleanup_.function == NULL) {
+    c = &cleanup_;
+  } else {
+    c = new Cleanup;
+    c->next = cleanup_.next;
+    cleanup_.next = c;
+  }
+  c->function = func;
+  c->arg1 = arg1;
+  c->arg2 = arg2;
+}
+
+namespace {
+class EmptyIterator : public Iterator {
+ public:
+  EmptyIterator(const Status& s) : status_(s) { }
+  virtual bool Valid() const { return false; }
+  virtual void Seek(const Slice& target) { }
+  virtual void SeekToFirst() { }
+  virtual void SeekToLast() { }
+  virtual void Next() { assert(false); }
+  virtual void Prev() { assert(false); }
+  Slice key() const { assert(false); return Slice(); }
+  Slice value() const { assert(false); return Slice(); }
+  virtual Status status() const { return status_; }
+ private:
+  Status status_;
+};
+}  // namespace
+
+Iterator* NewEmptyIterator() {
+  return new EmptyIterator(Status::OK());
+}
+
+Iterator* NewErrorIterator(const Status& status) {
+  return new EmptyIterator(status);
+}
+
+}  // namespace leveldb

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator_wrapper.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator_wrapper.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator_wrapper.h
new file mode 100644
index 0000000..9e16b3d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/iterator_wrapper.h
@@ -0,0 +1,63 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_
+#define STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_
+
+namespace leveldb {
+
+// A internal wrapper class with an interface similar to Iterator that
+// caches the valid() and key() results for an underlying iterator.
+// This can help avoid virtual function calls and also gives better
+// cache locality.
+class IteratorWrapper {
+ public:
+  IteratorWrapper(): iter_(NULL), valid_(false) { }
+  explicit IteratorWrapper(Iterator* iter): iter_(NULL) {
+    Set(iter);
+  }
+  ~IteratorWrapper() { delete iter_; }
+  Iterator* iter() const { return iter_; }
+
+  // Takes ownership of "iter" and will delete it when destroyed, or
+  // when Set() is invoked again.
+  void Set(Iterator* iter) {
+    delete iter_;
+    iter_ = iter;
+    if (iter_ == NULL) {
+      valid_ = false;
+    } else {
+      Update();
+    }
+  }
+
+
+  // Iterator interface methods
+  bool Valid() const        { return valid_; }
+  Slice key() const         { assert(Valid()); return key_; }
+  Slice value() const       { assert(Valid()); return iter_->value(); }
+  // Methods below require iter() != NULL
+  Status status() const     { assert(iter_); return iter_->status(); }
+  void Next()               { assert(iter_); iter_->Next();        Update(); }
+  void Prev()               { assert(iter_); iter_->Prev();        Update(); }
+  void Seek(const Slice& k) { assert(iter_); iter_->Seek(k);       Update(); }
+  void SeekToFirst()        { assert(iter_); iter_->SeekToFirst(); Update(); }
+  void SeekToLast()         { assert(iter_); iter_->SeekToLast();  Update(); }
+
+ private:
+  void Update() {
+    valid_ = iter_->Valid();
+    if (valid_) {
+      key_ = iter_->key();
+    }
+  }
+
+  Iterator* iter_;
+  bool valid_;
+  Slice key_;
+};
+
+}  // namespace leveldb
+
+#endif  // STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.cc
new file mode 100644
index 0000000..2dde4dc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.cc
@@ -0,0 +1,197 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "table/merger.h"
+
+#include "leveldb/comparator.h"
+#include "leveldb/iterator.h"
+#include "table/iterator_wrapper.h"
+
+namespace leveldb {
+
+namespace {
+class MergingIterator : public Iterator {
+ public:
+  MergingIterator(const Comparator* comparator, Iterator** children, int n)
+      : comparator_(comparator),
+        children_(new IteratorWrapper[n]),
+        n_(n),
+        current_(NULL),
+        direction_(kForward) {
+    for (int i = 0; i < n; i++) {
+      children_[i].Set(children[i]);
+    }
+  }
+
+  virtual ~MergingIterator() {
+    delete[] children_;
+  }
+
+  virtual bool Valid() const {
+    return (current_ != NULL);
+  }
+
+  virtual void SeekToFirst() {
+    for (int i = 0; i < n_; i++) {
+      children_[i].SeekToFirst();
+    }
+    FindSmallest();
+    direction_ = kForward;
+  }
+
+  virtual void SeekToLast() {
+    for (int i = 0; i < n_; i++) {
+      children_[i].SeekToLast();
+    }
+    FindLargest();
+    direction_ = kReverse;
+  }
+
+  virtual void Seek(const Slice& target) {
+    for (int i = 0; i < n_; i++) {
+      children_[i].Seek(target);
+    }
+    FindSmallest();
+    direction_ = kForward;
+  }
+
+  virtual void Next() {
+    assert(Valid());
+
+    // Ensure that all children are positioned after key().
+    // If we are moving in the forward direction, it is already
+    // true for all of the non-current_ children since current_ is
+    // the smallest child and key() == current_->key().  Otherwise,
+    // we explicitly position the non-current_ children.
+    if (direction_ != kForward) {
+      for (int i = 0; i < n_; i++) {
+        IteratorWrapper* child = &children_[i];
+        if (child != current_) {
+          child->Seek(key());
+          if (child->Valid() &&
+              comparator_->Compare(key(), child->key()) == 0) {
+            child->Next();
+          }
+        }
+      }
+      direction_ = kForward;
+    }
+
+    current_->Next();
+    FindSmallest();
+  }
+
+  virtual void Prev() {
+    assert(Valid());
+
+    // Ensure that all children are positioned before key().
+    // If we are moving in the reverse direction, it is already
+    // true for all of the non-current_ children since current_ is
+    // the largest child and key() == current_->key().  Otherwise,
+    // we explicitly position the non-current_ children.
+    if (direction_ != kReverse) {
+      for (int i = 0; i < n_; i++) {
+        IteratorWrapper* child = &children_[i];
+        if (child != current_) {
+          child->Seek(key());
+          if (child->Valid()) {
+            // Child is at first entry >= key().  Step back one to be < key()
+            child->Prev();
+          } else {
+            // Child has no entries >= key().  Position at last entry.
+            child->SeekToLast();
+          }
+        }
+      }
+      direction_ = kReverse;
+    }
+
+    current_->Prev();
+    FindLargest();
+  }
+
+  virtual Slice key() const {
+    assert(Valid());
+    return current_->key();
+  }
+
+  virtual Slice value() const {
+    assert(Valid());
+    return current_->value();
+  }
+
+  virtual Status status() const {
+    Status status;
+    for (int i = 0; i < n_; i++) {
+      status = children_[i].status();
+      if (!status.ok()) {
+        break;
+      }
+    }
+    return status;
+  }
+
+ private:
+  void FindSmallest();
+  void FindLargest();
+
+  // We might want to use a heap in case there are lots of children.
+  // For now we use a simple array since we expect a very small number
+  // of children in leveldb.
+  const Comparator* comparator_;
+  IteratorWrapper* children_;
+  int n_;
+  IteratorWrapper* current_;
+
+  // Which direction is the iterator moving?
+  enum Direction {
+    kForward,
+    kReverse
+  };
+  Direction direction_;
+};
+
+void MergingIterator::FindSmallest() {
+  IteratorWrapper* smallest = NULL;
+  for (int i = 0; i < n_; i++) {
+    IteratorWrapper* child = &children_[i];
+    if (child->Valid()) {
+      if (smallest == NULL) {
+        smallest = child;
+      } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
+        smallest = child;
+      }
+    }
+  }
+  current_ = smallest;
+}
+
+void MergingIterator::FindLargest() {
+  IteratorWrapper* largest = NULL;
+  for (int i = n_-1; i >= 0; i--) {
+    IteratorWrapper* child = &children_[i];
+    if (child->Valid()) {
+      if (largest == NULL) {
+        largest = child;
+      } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
+        largest = child;
+      }
+    }
+  }
+  current_ = largest;
+}
+}  // namespace
+
+Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
+  assert(n >= 0);
+  if (n == 0) {
+    return NewEmptyIterator();
+  } else if (n == 1) {
+    return list[0];
+  } else {
+    return new MergingIterator(cmp, list, n);
+  }
+}
+
+}  // namespace leveldb

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.h
new file mode 100644
index 0000000..91ddd80
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/merger.h
@@ -0,0 +1,26 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef STORAGE_LEVELDB_TABLE_MERGER_H_
+#define STORAGE_LEVELDB_TABLE_MERGER_H_
+
+namespace leveldb {
+
+class Comparator;
+class Iterator;
+
+// Return an iterator that provided the union of the data in
+// children[0,n-1].  Takes ownership of the child iterators and
+// will delete them when the result iterator is deleted.
+//
+// The result does no duplicate suppression.  I.e., if a particular
+// key is present in K child iterators, it will be yielded K times.
+//
+// REQUIRES: n >= 0
+extern Iterator* NewMergingIterator(
+    const Comparator* comparator, Iterator** children, int n);
+
+}  // namespace leveldb
+
+#endif  // STORAGE_LEVELDB_TABLE_MERGER_H_

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table.cc
new file mode 100644
index 0000000..71c1756
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table.cc
@@ -0,0 +1,275 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "leveldb/table.h"
+
+#include "leveldb/cache.h"
+#include "leveldb/comparator.h"
+#include "leveldb/env.h"
+#include "leveldb/filter_policy.h"
+#include "leveldb/options.h"
+#include "table/block.h"
+#include "table/filter_block.h"
+#include "table/format.h"
+#include "table/two_level_iterator.h"
+#include "util/coding.h"
+
+namespace leveldb {
+
+struct Table::Rep {
+  ~Rep() {
+    delete filter;
+    delete [] filter_data;
+    delete index_block;
+  }
+
+  Options options;
+  Status status;
+  RandomAccessFile* file;
+  uint64_t cache_id;
+  FilterBlockReader* filter;
+  const char* filter_data;
+
+  BlockHandle metaindex_handle;  // Handle to metaindex_block: saved from footer
+  Block* index_block;
+};
+
+Status Table::Open(const Options& options,
+                   RandomAccessFile* file,
+                   uint64_t size,
+                   Table** table) {
+  *table = NULL;
+  if (size < Footer::kEncodedLength) {
+    return Status::InvalidArgument("file is too short to be an sstable");
+  }
+
+  char footer_space[Footer::kEncodedLength];
+  Slice footer_input;
+  Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
+                        &footer_input, footer_space);
+  if (!s.ok()) return s;
+
+  Footer footer;
+  s = footer.DecodeFrom(&footer_input);
+  if (!s.ok()) return s;
+
+  // Read the index block
+  BlockContents contents;
+  Block* index_block = NULL;
+  if (s.ok()) {
+    s = ReadBlock(file, ReadOptions(), footer.index_handle(), &contents);
+    if (s.ok()) {
+      index_block = new Block(contents);
+    }
+  }
+
+  if (s.ok()) {
+    // We've successfully read the footer and the index block: we're
+    // ready to serve requests.
+    Rep* rep = new Table::Rep;
+    rep->options = options;
+    rep->file = file;
+    rep->metaindex_handle = footer.metaindex_handle();
+    rep->index_block = index_block;
+    rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
+    rep->filter_data = NULL;
+    rep->filter = NULL;
+    *table = new Table(rep);
+    (*table)->ReadMeta(footer);
+  } else {
+    if (index_block) delete index_block;
+  }
+
+  return s;
+}
+
+void Table::ReadMeta(const Footer& footer) {
+  if (rep_->options.filter_policy == NULL) {
+    return;  // Do not need any metadata
+  }
+
+  // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
+  // it is an empty block.
+  ReadOptions opt;
+  BlockContents contents;
+  if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) {
+    // Do not propagate errors since meta info is not needed for operation
+    return;
+  }
+  Block* meta = new Block(contents);
+
+  Iterator* iter = meta->NewIterator(BytewiseComparator());
+  std::string key = "filter.";
+  key.append(rep_->options.filter_policy->Name());
+  iter->Seek(key);
+  if (iter->Valid() && iter->key() == Slice(key)) {
+    ReadFilter(iter->value());
+  }
+  delete iter;
+  delete meta;
+}
+
+void Table::ReadFilter(const Slice& filter_handle_value) {
+  Slice v = filter_handle_value;
+  BlockHandle filter_handle;
+  if (!filter_handle.DecodeFrom(&v).ok()) {
+    return;
+  }
+
+  // We might want to unify with ReadBlock() if we start
+  // requiring checksum verification in Table::Open.
+  ReadOptions opt;
+  BlockContents block;
+  if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) {
+    return;
+  }
+  if (block.heap_allocated) {
+    rep_->filter_data = block.data.data();     // Will need to delete later
+  }
+  rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data);
+}
+
+Table::~Table() {
+  delete rep_;
+}
+
+static void DeleteBlock(void* arg, void* ignored) {
+  delete reinterpret_cast<Block*>(arg);
+}
+
+static void DeleteCachedBlock(const Slice& key, void* value) {
+  Block* block = reinterpret_cast<Block*>(value);
+  delete block;
+}
+
+static void ReleaseBlock(void* arg, void* h) {
+  Cache* cache = reinterpret_cast<Cache*>(arg);
+  Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
+  cache->Release(handle);
+}
+
+// Convert an index iterator value (i.e., an encoded BlockHandle)
+// into an iterator over the contents of the corresponding block.
+Iterator* Table::BlockReader(void* arg,
+                             const ReadOptions& options,
+                             const Slice& index_value) {
+  Table* table = reinterpret_cast<Table*>(arg);
+  Cache* block_cache = table->rep_->options.block_cache;
+  Block* block = NULL;
+  Cache::Handle* cache_handle = NULL;
+
+  BlockHandle handle;
+  Slice input = index_value;
+  Status s = handle.DecodeFrom(&input);
+  // We intentionally allow extra stuff in index_value so that we
+  // can add more features in the future.
+
+  if (s.ok()) {
+    BlockContents contents;
+    if (block_cache != NULL) {
+      char cache_key_buffer[16];
+      EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
+      EncodeFixed64(cache_key_buffer+8, handle.offset());
+      Slice key(cache_key_buffer, sizeof(cache_key_buffer));
+      cache_handle = block_cache->Lookup(key);
+      if (cache_handle != NULL) {
+        block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
+      } else {
+        s = ReadBlock(table->rep_->file, options, handle, &contents);
+        if (s.ok()) {
+          block = new Block(contents);
+          if (contents.cachable && options.fill_cache) {
+            cache_handle = block_cache->Insert(
+                key, block, block->size(), &DeleteCachedBlock);
+          }
+        }
+      }
+    } else {
+      s = ReadBlock(table->rep_->file, options, handle, &contents);
+      if (s.ok()) {
+        block = new Block(contents);
+      }
+    }
+  }
+
+  Iterator* iter;
+  if (block != NULL) {
+    iter = block->NewIterator(table->rep_->options.comparator);
+    if (cache_handle == NULL) {
+      iter->RegisterCleanup(&DeleteBlock, block, NULL);
+    } else {
+      iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
+    }
+  } else {
+    iter = NewErrorIterator(s);
+  }
+  return iter;
+}
+
+Iterator* Table::NewIterator(const ReadOptions& options) const {
+  return NewTwoLevelIterator(
+      rep_->index_block->NewIterator(rep_->options.comparator),
+      &Table::BlockReader, const_cast<Table*>(this), options);
+}
+
+Status Table::InternalGet(const ReadOptions& options, const Slice& k,
+                          void* arg,
+                          void (*saver)(void*, const Slice&, const Slice&)) {
+  Status s;
+  Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
+  iiter->Seek(k);
+  if (iiter->Valid()) {
+    Slice handle_value = iiter->value();
+    FilterBlockReader* filter = rep_->filter;
+    BlockHandle handle;
+    if (filter != NULL &&
+        handle.DecodeFrom(&handle_value).ok() &&
+        !filter->KeyMayMatch(handle.offset(), k)) {
+      // Not found
+    } else {
+      Iterator* block_iter = BlockReader(this, options, iiter->value());
+      block_iter->Seek(k);
+      if (block_iter->Valid()) {
+        (*saver)(arg, block_iter->key(), block_iter->value());
+      }
+      s = block_iter->status();
+      delete block_iter;
+    }
+  }
+  if (s.ok()) {
+    s = iiter->status();
+  }
+  delete iiter;
+  return s;
+}
+
+
+uint64_t Table::ApproximateOffsetOf(const Slice& key) const {
+  Iterator* index_iter =
+      rep_->index_block->NewIterator(rep_->options.comparator);
+  index_iter->Seek(key);
+  uint64_t result;
+  if (index_iter->Valid()) {
+    BlockHandle handle;
+    Slice input = index_iter->value();
+    Status s = handle.DecodeFrom(&input);
+    if (s.ok()) {
+      result = handle.offset();
+    } else {
+      // Strange: we can't decode the block handle in the index block.
+      // We'll just return the offset of the metaindex block, which is
+      // close to the whole file size for this case.
+      result = rep_->metaindex_handle.offset();
+    }
+  } else {
+    // key is past the last key in the file.  Approximate the offset
+    // by returning the offset of the metaindex block (which is
+    // right near the end of the file).
+    result = rep_->metaindex_handle.offset();
+  }
+  delete index_iter;
+  return result;
+}
+
+}  // namespace leveldb

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_builder.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_builder.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_builder.cc
new file mode 100644
index 0000000..62002c8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_builder.cc
@@ -0,0 +1,270 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "leveldb/table_builder.h"
+
+#include <assert.h>
+#include "leveldb/comparator.h"
+#include "leveldb/env.h"
+#include "leveldb/filter_policy.h"
+#include "leveldb/options.h"
+#include "table/block_builder.h"
+#include "table/filter_block.h"
+#include "table/format.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+
+namespace leveldb {
+
+struct TableBuilder::Rep {
+  Options options;
+  Options index_block_options;
+  WritableFile* file;
+  uint64_t offset;
+  Status status;
+  BlockBuilder data_block;
+  BlockBuilder index_block;
+  std::string last_key;
+  int64_t num_entries;
+  bool closed;          // Either Finish() or Abandon() has been called.
+  FilterBlockBuilder* filter_block;
+
+  // We do not emit the index entry for a block until we have seen the
+  // first key for the next data block.  This allows us to use shorter
+  // keys in the index block.  For example, consider a block boundary
+  // between the keys "the quick brown fox" and "the who".  We can use
+  // "the r" as the key for the index block entry since it is >= all
+  // entries in the first block and < all entries in subsequent
+  // blocks.
+  //
+  // Invariant: r->pending_index_entry is true only if data_block is empty.
+  bool pending_index_entry;
+  BlockHandle pending_handle;  // Handle to add to index block
+
+  std::string compressed_output;
+
+  Rep(const Options& opt, WritableFile* f)
+      : options(opt),
+        index_block_options(opt),
+        file(f),
+        offset(0),
+        data_block(&options),
+        index_block(&index_block_options),
+        num_entries(0),
+        closed(false),
+        filter_block(opt.filter_policy == NULL ? NULL
+                     : new FilterBlockBuilder(opt.filter_policy)),
+        pending_index_entry(false) {
+    index_block_options.block_restart_interval = 1;
+  }
+};
+
+TableBuilder::TableBuilder(const Options& options, WritableFile* file)
+    : rep_(new Rep(options, file)) {
+  if (rep_->filter_block != NULL) {
+    rep_->filter_block->StartBlock(0);
+  }
+}
+
+TableBuilder::~TableBuilder() {
+  assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
+  delete rep_->filter_block;
+  delete rep_;
+}
+
+Status TableBuilder::ChangeOptions(const Options& options) {
+  // Note: if more fields are added to Options, update
+  // this function to catch changes that should not be allowed to
+  // change in the middle of building a Table.
+  if (options.comparator != rep_->options.comparator) {
+    return Status::InvalidArgument("changing comparator while building table");
+  }
+
+  // Note that any live BlockBuilders point to rep_->options and therefore
+  // will automatically pick up the updated options.
+  rep_->options = options;
+  rep_->index_block_options = options;
+  rep_->index_block_options.block_restart_interval = 1;
+  return Status::OK();
+}
+
+void TableBuilder::Add(const Slice& key, const Slice& value) {
+  Rep* r = rep_;
+  assert(!r->closed);
+  if (!ok()) return;
+  if (r->num_entries > 0) {
+    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
+  }
+
+  if (r->pending_index_entry) {
+    assert(r->data_block.empty());
+    r->options.comparator->FindShortestSeparator(&r->last_key, key);
+    std::string handle_encoding;
+    r->pending_handle.EncodeTo(&handle_encoding);
+    r->index_block.Add(r->last_key, Slice(handle_encoding));
+    r->pending_index_entry = false;
+  }
+
+  if (r->filter_block != NULL) {
+    r->filter_block->AddKey(key);
+  }
+
+  r->last_key.assign(key.data(), key.size());
+  r->num_entries++;
+  r->data_block.Add(key, value);
+
+  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
+  if (estimated_block_size >= r->options.block_size) {
+    Flush();
+  }
+}
+
+void TableBuilder::Flush() {
+  Rep* r = rep_;
+  assert(!r->closed);
+  if (!ok()) return;
+  if (r->data_block.empty()) return;
+  assert(!r->pending_index_entry);
+  WriteBlock(&r->data_block, &r->pending_handle);
+  if (ok()) {
+    r->pending_index_entry = true;
+    r->status = r->file->Flush();
+  }
+  if (r->filter_block != NULL) {
+    r->filter_block->StartBlock(r->offset);
+  }
+}
+
+void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
+  // File format contains a sequence of blocks where each block has:
+  //    block_data: uint8[n]
+  //    type: uint8
+  //    crc: uint32
+  assert(ok());
+  Rep* r = rep_;
+  Slice raw = block->Finish();
+
+  Slice block_contents;
+  CompressionType type = r->options.compression;
+  // TODO(postrelease): Support more compression options: zlib?
+  switch (type) {
+    case kNoCompression:
+      block_contents = raw;
+      break;
+
+    case kSnappyCompression: {
+      std::string* compressed = &r->compressed_output;
+      if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
+          compressed->size() < raw.size() - (raw.size() / 8u)) {
+        block_contents = *compressed;
+      } else {
+        // Snappy not supported, or compressed less than 12.5%, so just
+        // store uncompressed form
+        block_contents = raw;
+        type = kNoCompression;
+      }
+      break;
+    }
+  }
+  WriteRawBlock(block_contents, type, handle);
+  r->compressed_output.clear();
+  block->Reset();
+}
+
+void TableBuilder::WriteRawBlock(const Slice& block_contents,
+                                 CompressionType type,
+                                 BlockHandle* handle) {
+  Rep* r = rep_;
+  handle->set_offset(r->offset);
+  handle->set_size(block_contents.size());
+  r->status = r->file->Append(block_contents);
+  if (r->status.ok()) {
+    char trailer[kBlockTrailerSize];
+    trailer[0] = type;
+    uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
+    crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
+    EncodeFixed32(trailer+1, crc32c::Mask(crc));
+    r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
+    if (r->status.ok()) {
+      r->offset += block_contents.size() + kBlockTrailerSize;
+    }
+  }
+}
+
+Status TableBuilder::status() const {
+  return rep_->status;
+}
+
+Status TableBuilder::Finish() {
+  Rep* r = rep_;
+  Flush();
+  assert(!r->closed);
+  r->closed = true;
+
+  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
+
+  // Write filter block
+  if (ok() && r->filter_block != NULL) {
+    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
+                  &filter_block_handle);
+  }
+
+  // Write metaindex block
+  if (ok()) {
+    BlockBuilder meta_index_block(&r->options);
+    if (r->filter_block != NULL) {
+      // Add mapping from "filter.Name" to location of filter data
+      std::string key = "filter.";
+      key.append(r->options.filter_policy->Name());
+      std::string handle_encoding;
+      filter_block_handle.EncodeTo(&handle_encoding);
+      meta_index_block.Add(key, handle_encoding);
+    }
+
+    // TODO(postrelease): Add stats and other meta blocks
+    WriteBlock(&meta_index_block, &metaindex_block_handle);
+  }
+
+  // Write index block
+  if (ok()) {
+    if (r->pending_index_entry) {
+      r->options.comparator->FindShortSuccessor(&r->last_key);
+      std::string handle_encoding;
+      r->pending_handle.EncodeTo(&handle_encoding);
+      r->index_block.Add(r->last_key, Slice(handle_encoding));
+      r->pending_index_entry = false;
+    }
+    WriteBlock(&r->index_block, &index_block_handle);
+  }
+
+  // Write footer
+  if (ok()) {
+    Footer footer;
+    footer.set_metaindex_handle(metaindex_block_handle);
+    footer.set_index_handle(index_block_handle);
+    std::string footer_encoding;
+    footer.EncodeTo(&footer_encoding);
+    r->status = r->file->Append(footer_encoding);
+    if (r->status.ok()) {
+      r->offset += footer_encoding.size();
+    }
+  }
+  return r->status;
+}
+
+void TableBuilder::Abandon() {
+  Rep* r = rep_;
+  assert(!r->closed);
+  r->closed = true;
+}
+
+uint64_t TableBuilder::NumEntries() const {
+  return rep_->num_entries;
+}
+
+uint64_t TableBuilder::FileSize() const {
+  return rep_->offset;
+}
+
+}  // namespace leveldb

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_test.cc
new file mode 100644
index 0000000..c723bf8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/table_test.cc
@@ -0,0 +1,868 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "leveldb/table.h"
+
+#include <map>
+#include <string>
+#include "db/dbformat.h"
+#include "db/memtable.h"
+#include "db/write_batch_internal.h"
+#include "leveldb/db.h"
+#include "leveldb/env.h"
+#include "leveldb/iterator.h"
+#include "leveldb/table_builder.h"
+#include "table/block.h"
+#include "table/block_builder.h"
+#include "table/format.h"
+#include "util/random.h"
+#include "util/testharness.h"
+#include "util/testutil.h"
+
+namespace leveldb {
+
+// Return reverse of "key".
+// Used to test non-lexicographic comparators.
+static std::string Reverse(const Slice& key) {
+  std::string str(key.ToString());
+  std::string rev("");
+  for (std::string::reverse_iterator rit = str.rbegin();
+       rit != str.rend(); ++rit) {
+    rev.push_back(*rit);
+  }
+  return rev;
+}
+
+namespace {
+class ReverseKeyComparator : public Comparator {
+ public:
+  virtual const char* Name() const {
+    return "leveldb.ReverseBytewiseComparator";
+  }
+
+  virtual int Compare(const Slice& a, const Slice& b) const {
+    return BytewiseComparator()->Compare(Reverse(a), Reverse(b));
+  }
+
+  virtual void FindShortestSeparator(
+      std::string* start,
+      const Slice& limit) const {
+    std::string s = Reverse(*start);
+    std::string l = Reverse(limit);
+    BytewiseComparator()->FindShortestSeparator(&s, l);
+    *start = Reverse(s);
+  }
+
+  virtual void FindShortSuccessor(std::string* key) const {
+    std::string s = Reverse(*key);
+    BytewiseComparator()->FindShortSuccessor(&s);
+    *key = Reverse(s);
+  }
+};
+}  // namespace
+static ReverseKeyComparator reverse_key_comparator;
+
+static void Increment(const Comparator* cmp, std::string* key) {
+  if (cmp == BytewiseComparator()) {
+    key->push_back('\0');
+  } else {
+    assert(cmp == &reverse_key_comparator);
+    std::string rev = Reverse(*key);
+    rev.push_back('\0');
+    *key = Reverse(rev);
+  }
+}
+
+// An STL comparator that uses a Comparator
+namespace {
+struct STLLessThan {
+  const Comparator* cmp;
+
+  STLLessThan() : cmp(BytewiseComparator()) { }
+  STLLessThan(const Comparator* c) : cmp(c) { }
+  bool operator()(const std::string& a, const std::string& b) const {
+    return cmp->Compare(Slice(a), Slice(b)) < 0;
+  }
+};
+}  // namespace
+
+class StringSink: public WritableFile {
+ public:
+  ~StringSink() { }
+
+  const std::string& contents() const { return contents_; }
+
+  virtual Status Close() { return Status::OK(); }
+  virtual Status Flush() { return Status::OK(); }
+  virtual Status Sync() { return Status::OK(); }
+
+  virtual Status Append(const Slice& data) {
+    contents_.append(data.data(), data.size());
+    return Status::OK();
+  }
+
+ private:
+  std::string contents_;
+};
+
+
+class StringSource: public RandomAccessFile {
+ public:
+  StringSource(const Slice& contents)
+      : contents_(contents.data(), contents.size()) {
+  }
+
+  virtual ~StringSource() { }
+
+  uint64_t Size() const { return contents_.size(); }
+
+  virtual Status Read(uint64_t offset, size_t n, Slice* result,
+                       char* scratch) const {
+    if (offset > contents_.size()) {
+      return Status::InvalidArgument("invalid Read offset");
+    }
+    if (offset + n > contents_.size()) {
+      n = contents_.size() - offset;
+    }
+    memcpy(scratch, &contents_[offset], n);
+    *result = Slice(scratch, n);
+    return Status::OK();
+  }
+
+ private:
+  std::string contents_;
+};
+
+typedef std::map<std::string, std::string, STLLessThan> KVMap;
+
+// Helper class for tests to unify the interface between
+// BlockBuilder/TableBuilder and Block/Table.
+class Constructor {
+ public:
+  explicit Constructor(const Comparator* cmp) : data_(STLLessThan(cmp)) { }
+  virtual ~Constructor() { }
+
+  void Add(const std::string& key, const Slice& value) {
+    data_[key] = value.ToString();
+  }
+
+  // Finish constructing the data structure with all the keys that have
+  // been added so far.  Returns the keys in sorted order in "*keys"
+  // and stores the key/value pairs in "*kvmap"
+  void Finish(const Options& options,
+              std::vector<std::string>* keys,
+              KVMap* kvmap) {
+    *kvmap = data_;
+    keys->clear();
+    for (KVMap::const_iterator it = data_.begin();
+         it != data_.end();
+         ++it) {
+      keys->push_back(it->first);
+    }
+    data_.clear();
+    Status s = FinishImpl(options, *kvmap);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+  }
+
+  // Construct the data structure from the data in "data"
+  virtual Status FinishImpl(const Options& options, const KVMap& data) = 0;
+
+  virtual Iterator* NewIterator() const = 0;
+
+  virtual const KVMap& data() { return data_; }
+
+  virtual DB* db() const { return NULL; }  // Overridden in DBConstructor
+
+ private:
+  KVMap data_;
+};
+
+class BlockConstructor: public Constructor {
+ public:
+  explicit BlockConstructor(const Comparator* cmp)
+      : Constructor(cmp),
+        comparator_(cmp),
+        block_(NULL) { }
+  ~BlockConstructor() {
+    delete block_;
+  }
+  virtual Status FinishImpl(const Options& options, const KVMap& data) {
+    delete block_;
+    block_ = NULL;
+    BlockBuilder builder(&options);
+
+    for (KVMap::const_iterator it = data.begin();
+         it != data.end();
+         ++it) {
+      builder.Add(it->first, it->second);
+    }
+    // Open the block
+    data_ = builder.Finish().ToString();
+    BlockContents contents;
+    contents.data = data_;
+    contents.cachable = false;
+    contents.heap_allocated = false;
+    block_ = new Block(contents);
+    return Status::OK();
+  }
+  virtual Iterator* NewIterator() const {
+    return block_->NewIterator(comparator_);
+  }
+
+ private:
+  const Comparator* comparator_;
+  std::string data_;
+  Block* block_;
+
+  BlockConstructor();
+};
+
+class TableConstructor: public Constructor {
+ public:
+  TableConstructor(const Comparator* cmp)
+      : Constructor(cmp),
+        source_(NULL), table_(NULL) {
+  }
+  ~TableConstructor() {
+    Reset();
+  }
+  virtual Status FinishImpl(const Options& options, const KVMap& data) {
+    Reset();
+    StringSink sink;
+    TableBuilder builder(options, &sink);
+
+    for (KVMap::const_iterator it = data.begin();
+         it != data.end();
+         ++it) {
+      builder.Add(it->first, it->second);
+      ASSERT_TRUE(builder.status().ok());
+    }
+    Status s = builder.Finish();
+    ASSERT_TRUE(s.ok()) << s.ToString();
+
+    ASSERT_EQ(sink.contents().size(), builder.FileSize());
+
+    // Open the table
+    source_ = new StringSource(sink.contents());
+    Options table_options;
+    table_options.comparator = options.comparator;
+    return Table::Open(table_options, source_, sink.contents().size(), &table_);
+  }
+
+  virtual Iterator* NewIterator() const {
+    return table_->NewIterator(ReadOptions());
+  }
+
+  uint64_t ApproximateOffsetOf(const Slice& key) const {
+    return table_->ApproximateOffsetOf(key);
+  }
+
+ private:
+  void Reset() {
+    delete table_;
+    delete source_;
+    table_ = NULL;
+    source_ = NULL;
+  }
+
+  StringSource* source_;
+  Table* table_;
+
+  TableConstructor();
+};
+
+// A helper class that converts internal format keys into user keys
+class KeyConvertingIterator: public Iterator {
+ public:
+  explicit KeyConvertingIterator(Iterator* iter) : iter_(iter) { }
+  virtual ~KeyConvertingIterator() { delete iter_; }
+  virtual bool Valid() const { return iter_->Valid(); }
+  virtual void Seek(const Slice& target) {
+    ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue);
+    std::string encoded;
+    AppendInternalKey(&encoded, ikey);
+    iter_->Seek(encoded);
+  }
+  virtual void SeekToFirst() { iter_->SeekToFirst(); }
+  virtual void SeekToLast() { iter_->SeekToLast(); }
+  virtual void Next() { iter_->Next(); }
+  virtual void Prev() { iter_->Prev(); }
+
+  virtual Slice key() const {
+    assert(Valid());
+    ParsedInternalKey key;
+    if (!ParseInternalKey(iter_->key(), &key)) {
+      status_ = Status::Corruption("malformed internal key");
+      return Slice("corrupted key");
+    }
+    return key.user_key;
+  }
+
+  virtual Slice value() const { return iter_->value(); }
+  virtual Status status() const {
+    return status_.ok() ? iter_->status() : status_;
+  }
+
+ private:
+  mutable Status status_;
+  Iterator* iter_;
+
+  // No copying allowed
+  KeyConvertingIterator(const KeyConvertingIterator&);
+  void operator=(const KeyConvertingIterator&);
+};
+
+class MemTableConstructor: public Constructor {
+ public:
+  explicit MemTableConstructor(const Comparator* cmp)
+      : Constructor(cmp),
+        internal_comparator_(cmp) {
+    memtable_ = new MemTable(internal_comparator_);
+    memtable_->Ref();
+  }
+  ~MemTableConstructor() {
+    memtable_->Unref();
+  }
+  virtual Status FinishImpl(const Options& options, const KVMap& data) {
+    memtable_->Unref();
+    memtable_ = new MemTable(internal_comparator_);
+    memtable_->Ref();
+    int seq = 1;
+    for (KVMap::const_iterator it = data.begin();
+         it != data.end();
+         ++it) {
+      memtable_->Add(seq, kTypeValue, it->first, it->second);
+      seq++;
+    }
+    return Status::OK();
+  }
+  virtual Iterator* NewIterator() const {
+    return new KeyConvertingIterator(memtable_->NewIterator());
+  }
+
+ private:
+  InternalKeyComparator internal_comparator_;
+  MemTable* memtable_;
+};
+
+class DBConstructor: public Constructor {
+ public:
+  explicit DBConstructor(const Comparator* cmp)
+      : Constructor(cmp),
+        comparator_(cmp) {
+    db_ = NULL;
+    NewDB();
+  }
+  ~DBConstructor() {
+    delete db_;
+  }
+  virtual Status FinishImpl(const Options& options, const KVMap& data) {
+    delete db_;
+    db_ = NULL;
+    NewDB();
+    for (KVMap::const_iterator it = data.begin();
+         it != data.end();
+         ++it) {
+      WriteBatch batch;
+      batch.Put(it->first, it->second);
+      ASSERT_TRUE(db_->Write(WriteOptions(), &batch).ok());
+    }
+    return Status::OK();
+  }
+  virtual Iterator* NewIterator() const {
+    return db_->NewIterator(ReadOptions());
+  }
+
+  virtual DB* db() const { return db_; }
+
+ private:
+  void NewDB() {
+    std::string name = test::TmpDir() + "/table_testdb";
+
+    Options options;
+    options.comparator = comparator_;
+    Status status = DestroyDB(name, options);
+    ASSERT_TRUE(status.ok()) << status.ToString();
+
+    options.create_if_missing = true;
+    options.error_if_exists = true;
+    options.write_buffer_size = 10000;  // Something small to force merging
+    status = DB::Open(options, name, &db_);
+    ASSERT_TRUE(status.ok()) << status.ToString();
+  }
+
+  const Comparator* comparator_;
+  DB* db_;
+};
+
+enum TestType {
+  TABLE_TEST,
+  BLOCK_TEST,
+  MEMTABLE_TEST,
+  DB_TEST
+};
+
+struct TestArgs {
+  TestType type;
+  bool reverse_compare;
+  int restart_interval;
+};
+
+static const TestArgs kTestArgList[] = {
+  { TABLE_TEST, false, 16 },
+  { TABLE_TEST, false, 1 },
+  { TABLE_TEST, false, 1024 },
+  { TABLE_TEST, true, 16 },
+  { TABLE_TEST, true, 1 },
+  { TABLE_TEST, true, 1024 },
+
+  { BLOCK_TEST, false, 16 },
+  { BLOCK_TEST, false, 1 },
+  { BLOCK_TEST, false, 1024 },
+  { BLOCK_TEST, true, 16 },
+  { BLOCK_TEST, true, 1 },
+  { BLOCK_TEST, true, 1024 },
+
+  // Restart interval does not matter for memtables
+  { MEMTABLE_TEST, false, 16 },
+  { MEMTABLE_TEST, true, 16 },
+
+  // Do not bother with restart interval variations for DB
+  { DB_TEST, false, 16 },
+  { DB_TEST, true, 16 },
+};
+static const int kNumTestArgs = sizeof(kTestArgList) / sizeof(kTestArgList[0]);
+
+class Harness {
+ public:
+  Harness() : constructor_(NULL) { }
+
+  void Init(const TestArgs& args) {
+    delete constructor_;
+    constructor_ = NULL;
+    options_ = Options();
+
+    options_.block_restart_interval = args.restart_interval;
+    // Use shorter block size for tests to exercise block boundary
+    // conditions more.
+    options_.block_size = 256;
+    if (args.reverse_compare) {
+      options_.comparator = &reverse_key_comparator;
+    }
+    switch (args.type) {
+      case TABLE_TEST:
+        constructor_ = new TableConstructor(options_.comparator);
+        break;
+      case BLOCK_TEST:
+        constructor_ = new BlockConstructor(options_.comparator);
+        break;
+      case MEMTABLE_TEST:
+        constructor_ = new MemTableConstructor(options_.comparator);
+        break;
+      case DB_TEST:
+        constructor_ = new DBConstructor(options_.comparator);
+        break;
+    }
+  }
+
+  ~Harness() {
+    delete constructor_;
+  }
+
+  void Add(const std::string& key, const std::string& value) {
+    constructor_->Add(key, value);
+  }
+
+  void Test(Random* rnd) {
+    std::vector<std::string> keys;
+    KVMap data;
+    constructor_->Finish(options_, &keys, &data);
+
+    TestForwardScan(keys, data);
+    TestBackwardScan(keys, data);
+    TestRandomAccess(rnd, keys, data);
+  }
+
+  void TestForwardScan(const std::vector<std::string>& keys,
+                       const KVMap& data) {
+    Iterator* iter = constructor_->NewIterator();
+    ASSERT_TRUE(!iter->Valid());
+    iter->SeekToFirst();
+    for (KVMap::const_iterator model_iter = data.begin();
+         model_iter != data.end();
+         ++model_iter) {
+      ASSERT_EQ(ToString(data, model_iter), ToString(iter));
+      iter->Next();
+    }
+    ASSERT_TRUE(!iter->Valid());
+    delete iter;
+  }
+
+  void TestBackwardScan(const std::vector<std::string>& keys,
+                        const KVMap& data) {
+    Iterator* iter = constructor_->NewIterator();
+    ASSERT_TRUE(!iter->Valid());
+    iter->SeekToLast();
+    for (KVMap::const_reverse_iterator model_iter = data.rbegin();
+         model_iter != data.rend();
+         ++model_iter) {
+      ASSERT_EQ(ToString(data, model_iter), ToString(iter));
+      iter->Prev();
+    }
+    ASSERT_TRUE(!iter->Valid());
+    delete iter;
+  }
+
+  void TestRandomAccess(Random* rnd,
+                        const std::vector<std::string>& keys,
+                        const KVMap& data) {
+    static const bool kVerbose = false;
+    Iterator* iter = constructor_->NewIterator();
+    ASSERT_TRUE(!iter->Valid());
+    KVMap::const_iterator model_iter = data.begin();
+    if (kVerbose) fprintf(stderr, "---\n");
+    for (int i = 0; i < 200; i++) {
+      const int toss = rnd->Uniform(5);
+      switch (toss) {
+        case 0: {
+          if (iter->Valid()) {
+            if (kVerbose) fprintf(stderr, "Next\n");
+            iter->Next();
+            ++model_iter;
+            ASSERT_EQ(ToString(data, model_iter), ToString(iter));
+          }
+          break;
+        }
+
+        case 1: {
+          if (kVerbose) fprintf(stderr, "SeekToFirst\n");
+          iter->SeekToFirst();
+          model_iter = data.begin();
+          ASSERT_EQ(ToString(data, model_iter), ToString(iter));
+          break;
+        }
+
+        case 2: {
+          std::string key = PickRandomKey(rnd, keys);
+          model_iter = data.lower_bound(key);
+          if (kVerbose) fprintf(stderr, "Seek '%s'\n",
+                                EscapeString(key).c_str());
+          iter->Seek(Slice(key));
+          ASSERT_EQ(ToString(data, model_iter), ToString(iter));
+          break;
+        }
+
+        case 3: {
+          if (iter->Valid()) {
+            if (kVerbose) fprintf(stderr, "Prev\n");
+            iter->Prev();
+            if (model_iter == data.begin()) {
+              model_iter = data.end();   // Wrap around to invalid value
+            } else {
+              --model_iter;
+            }
+            ASSERT_EQ(ToString(data, model_iter), ToString(iter));
+          }
+          break;
+        }
+
+        case 4: {
+          if (kVerbose) fprintf(stderr, "SeekToLast\n");
+          iter->SeekToLast();
+          if (keys.empty()) {
+            model_iter = data.end();
+          } else {
+            std::string last = data.rbegin()->first;
+            model_iter = data.lower_bound(last);
+          }
+          ASSERT_EQ(ToString(data, model_iter), ToString(iter));
+          break;
+        }
+      }
+    }
+    delete iter;
+  }
+
+  std::string ToString(const KVMap& data, const KVMap::const_iterator& it) {
+    if (it == data.end()) {
+      return "END";
+    } else {
+      return "'" + it->first + "->" + it->second + "'";
+    }
+  }
+
+  std::string ToString(const KVMap& data,
+                       const KVMap::const_reverse_iterator& it) {
+    if (it == data.rend()) {
+      return "END";
+    } else {
+      return "'" + it->first + "->" + it->second + "'";
+    }
+  }
+
+  std::string ToString(const Iterator* it) {
+    if (!it->Valid()) {
+      return "END";
+    } else {
+      return "'" + it->key().ToString() + "->" + it->value().ToString() + "'";
+    }
+  }
+
+  std::string PickRandomKey(Random* rnd, const std::vector<std::string>& keys) {
+    if (keys.empty()) {
+      return "foo";
+    } else {
+      const int index = rnd->Uniform(keys.size());
+      std::string result = keys[index];
+      switch (rnd->Uniform(3)) {
+        case 0:
+          // Return an existing key
+          break;
+        case 1: {
+          // Attempt to return something smaller than an existing key
+          if (result.size() > 0 && result[result.size()-1] > '\0') {
+            result[result.size()-1]--;
+          }
+          break;
+        }
+        case 2: {
+          // Return something larger than an existing key
+          Increment(options_.comparator, &result);
+          break;
+        }
+      }
+      return result;
+    }
+  }
+
+  // Returns NULL if not running against a DB
+  DB* db() const { return constructor_->db(); }
+
+ private:
+  Options options_;
+  Constructor* constructor_;
+};
+
+// Test empty table/block.
+TEST(Harness, Empty) {
+  for (int i = 0; i < kNumTestArgs; i++) {
+    Init(kTestArgList[i]);
+    Random rnd(test::RandomSeed() + 1);
+    Test(&rnd);
+  }
+}
+
+// Special test for a block with no restart entries.  The C++ leveldb
+// code never generates such blocks, but the Java version of leveldb
+// seems to.
+TEST(Harness, ZeroRestartPointsInBlock) {
+  char data[sizeof(uint32_t)];
+  memset(data, 0, sizeof(data));
+  BlockContents contents;
+  contents.data = Slice(data, sizeof(data));
+  contents.cachable = false;
+  contents.heap_allocated = false;
+  Block block(contents);
+  Iterator* iter = block.NewIterator(BytewiseComparator());
+  iter->SeekToFirst();
+  ASSERT_TRUE(!iter->Valid());
+  iter->SeekToLast();
+  ASSERT_TRUE(!iter->Valid());
+  iter->Seek("foo");
+  ASSERT_TRUE(!iter->Valid());
+  delete iter;
+}
+
+// Test the empty key
+TEST(Harness, SimpleEmptyKey) {
+  for (int i = 0; i < kNumTestArgs; i++) {
+    Init(kTestArgList[i]);
+    Random rnd(test::RandomSeed() + 1);
+    Add("", "v");
+    Test(&rnd);
+  }
+}
+
+TEST(Harness, SimpleSingle) {
+  for (int i = 0; i < kNumTestArgs; i++) {
+    Init(kTestArgList[i]);
+    Random rnd(test::RandomSeed() + 2);
+    Add("abc", "v");
+    Test(&rnd);
+  }
+}
+
+TEST(Harness, SimpleMulti) {
+  for (int i = 0; i < kNumTestArgs; i++) {
+    Init(kTestArgList[i]);
+    Random rnd(test::RandomSeed() + 3);
+    Add("abc", "v");
+    Add("abcd", "v");
+    Add("ac", "v2");
+    Test(&rnd);
+  }
+}
+
+TEST(Harness, SimpleSpecialKey) {
+  for (int i = 0; i < kNumTestArgs; i++) {
+    Init(kTestArgList[i]);
+    Random rnd(test::RandomSeed() + 4);
+    Add("\xff\xff", "v3");
+    Test(&rnd);
+  }
+}
+
+TEST(Harness, Randomized) {
+  for (int i = 0; i < kNumTestArgs; i++) {
+    Init(kTestArgList[i]);
+    Random rnd(test::RandomSeed() + 5);
+    for (int num_entries = 0; num_entries < 2000;
+         num_entries += (num_entries < 50 ? 1 : 200)) {
+      if ((num_entries % 10) == 0) {
+        fprintf(stderr, "case %d of %d: num_entries = %d\n",
+                (i + 1), int(kNumTestArgs), num_entries);
+      }
+      for (int e = 0; e < num_entries; e++) {
+        std::string v;
+        Add(test::RandomKey(&rnd, rnd.Skewed(4)),
+            test::RandomString(&rnd, rnd.Skewed(5), &v).ToString());
+      }
+      Test(&rnd);
+    }
+  }
+}
+
+TEST(Harness, RandomizedLongDB) {
+  Random rnd(test::RandomSeed());
+  TestArgs args = { DB_TEST, false, 16 };
+  Init(args);
+  int num_entries = 100000;
+  for (int e = 0; e < num_entries; e++) {
+    std::string v;
+    Add(test::RandomKey(&rnd, rnd.Skewed(4)),
+        test::RandomString(&rnd, rnd.Skewed(5), &v).ToString());
+  }
+  Test(&rnd);
+
+  // We must have created enough data to force merging
+  int files = 0;
+  for (int level = 0; level < config::kNumLevels; level++) {
+    std::string value;
+    char name[100];
+    snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);
+    ASSERT_TRUE(db()->GetProperty(name, &value));
+    files += atoi(value.c_str());
+  }
+  ASSERT_GT(files, 0);
+}
+
+class MemTableTest { };
+
+TEST(MemTableTest, Simple) {
+  InternalKeyComparator cmp(BytewiseComparator());
+  MemTable* memtable = new MemTable(cmp);
+  memtable->Ref();
+  WriteBatch batch;
+  WriteBatchInternal::SetSequence(&batch, 100);
+  batch.Put(std::string("k1"), std::string("v1"));
+  batch.Put(std::string("k2"), std::string("v2"));
+  batch.Put(std::string("k3"), std::string("v3"));
+  batch.Put(std::string("largekey"), std::string("vlarge"));
+  ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable).ok());
+
+  Iterator* iter = memtable->NewIterator();
+  iter->SeekToFirst();
+  while (iter->Valid()) {
+    fprintf(stderr, "key: '%s' -> '%s'\n",
+            iter->key().ToString().c_str(),
+            iter->value().ToString().c_str());
+    iter->Next();
+  }
+
+  delete iter;
+  memtable->Unref();
+}
+
+static bool Between(uint64_t val, uint64_t low, uint64_t high) {
+  bool result = (val >= low) && (val <= high);
+  if (!result) {
+    fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
+            (unsigned long long)(val),
+            (unsigned long long)(low),
+            (unsigned long long)(high));
+  }
+  return result;
+}
+
+class TableTest { };
+
+TEST(TableTest, ApproximateOffsetOfPlain) {
+  TableConstructor c(BytewiseComparator());
+  c.Add("k01", "hello");
+  c.Add("k02", "hello2");
+  c.Add("k03", std::string(10000, 'x'));
+  c.Add("k04", std::string(200000, 'x'));
+  c.Add("k05", std::string(300000, 'x'));
+  c.Add("k06", "hello3");
+  c.Add("k07", std::string(100000, 'x'));
+  std::vector<std::string> keys;
+  KVMap kvmap;
+  Options options;
+  options.block_size = 1024;
+  options.compression = kNoCompression;
+  c.Finish(options, &keys, &kvmap);
+
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"),       0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"),       0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01a"),      0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"),       0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"),       0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"),   10000,  11000));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04a"), 210000, 211000));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k05"),  210000, 211000));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"),  510000, 511000));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"),  510000, 511000));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"),  610000, 612000));
+
+}
+
+static bool SnappyCompressionSupported() {
+  std::string out;
+  Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
+  return port::Snappy_Compress(in.data(), in.size(), &out);
+}
+
+TEST(TableTest, ApproximateOffsetOfCompressed) {
+  if (!SnappyCompressionSupported()) {
+    fprintf(stderr, "skipping compression tests\n");
+    return;
+  }
+
+  Random rnd(301);
+  TableConstructor c(BytewiseComparator());
+  std::string tmp;
+  c.Add("k01", "hello");
+  c.Add("k02", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
+  c.Add("k03", "hello3");
+  c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
+  std::vector<std::string> keys;
+  KVMap kvmap;
+  Options options;
+  options.block_size = 1024;
+  options.compression = kSnappyCompression;
+  c.Finish(options, &keys, &kvmap);
+
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"),       0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"),       0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"),       0,      0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"),    2000,   3000));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"),    2000,   3000));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"),    4000,   6000));
+}
+
+}  // namespace leveldb
+
+int main(int argc, char** argv) {
+  return leveldb::test::RunAllTests();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.cc
new file mode 100644
index 0000000..7822eba
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.cc
@@ -0,0 +1,182 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "table/two_level_iterator.h"
+
+#include "leveldb/table.h"
+#include "table/block.h"
+#include "table/format.h"
+#include "table/iterator_wrapper.h"
+
+namespace leveldb {
+
+namespace {
+
+typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&);
+
+class TwoLevelIterator: public Iterator {
+ public:
+  TwoLevelIterator(
+    Iterator* index_iter,
+    BlockFunction block_function,
+    void* arg,
+    const ReadOptions& options);
+
+  virtual ~TwoLevelIterator();
+
+  virtual void Seek(const Slice& target);
+  virtual void SeekToFirst();
+  virtual void SeekToLast();
+  virtual void Next();
+  virtual void Prev();
+
+  virtual bool Valid() const {
+    return data_iter_.Valid();
+  }
+  virtual Slice key() const {
+    assert(Valid());
+    return data_iter_.key();
+  }
+  virtual Slice value() const {
+    assert(Valid());
+    return data_iter_.value();
+  }
+  virtual Status status() const {
+    // It'd be nice if status() returned a const Status& instead of a Status
+    if (!index_iter_.status().ok()) {
+      return index_iter_.status();
+    } else if (data_iter_.iter() != NULL && !data_iter_.status().ok()) {
+      return data_iter_.status();
+    } else {
+      return status_;
+    }
+  }
+
+ private:
+  void SaveError(const Status& s) {
+    if (status_.ok() && !s.ok()) status_ = s;
+  }
+  void SkipEmptyDataBlocksForward();
+  void SkipEmptyDataBlocksBackward();
+  void SetDataIterator(Iterator* data_iter);
+  void InitDataBlock();
+
+  BlockFunction block_function_;
+  void* arg_;
+  const ReadOptions options_;
+  Status status_;
+  IteratorWrapper index_iter_;
+  IteratorWrapper data_iter_; // May be NULL
+  // If data_iter_ is non-NULL, then "data_block_handle_" holds the
+  // "index_value" passed to block_function_ to create the data_iter_.
+  std::string data_block_handle_;
+};
+
+TwoLevelIterator::TwoLevelIterator(
+    Iterator* index_iter,
+    BlockFunction block_function,
+    void* arg,
+    const ReadOptions& options)
+    : block_function_(block_function),
+      arg_(arg),
+      options_(options),
+      index_iter_(index_iter),
+      data_iter_(NULL) {
+}
+
+TwoLevelIterator::~TwoLevelIterator() {
+}
+
+void TwoLevelIterator::Seek(const Slice& target) {
+  index_iter_.Seek(target);
+  InitDataBlock();
+  if (data_iter_.iter() != NULL) data_iter_.Seek(target);
+  SkipEmptyDataBlocksForward();
+}
+
+void TwoLevelIterator::SeekToFirst() {
+  index_iter_.SeekToFirst();
+  InitDataBlock();
+  if (data_iter_.iter() != NULL) data_iter_.SeekToFirst();
+  SkipEmptyDataBlocksForward();
+}
+
+void TwoLevelIterator::SeekToLast() {
+  index_iter_.SeekToLast();
+  InitDataBlock();
+  if (data_iter_.iter() != NULL) data_iter_.SeekToLast();
+  SkipEmptyDataBlocksBackward();
+}
+
+void TwoLevelIterator::Next() {
+  assert(Valid());
+  data_iter_.Next();
+  SkipEmptyDataBlocksForward();
+}
+
+void TwoLevelIterator::Prev() {
+  assert(Valid());
+  data_iter_.Prev();
+  SkipEmptyDataBlocksBackward();
+}
+
+
+void TwoLevelIterator::SkipEmptyDataBlocksForward() {
+  while (data_iter_.iter() == NULL || !data_iter_.Valid()) {
+    // Move to next block
+    if (!index_iter_.Valid()) {
+      SetDataIterator(NULL);
+      return;
+    }
+    index_iter_.Next();
+    InitDataBlock();
+    if (data_iter_.iter() != NULL) data_iter_.SeekToFirst();
+  }
+}
+
+void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
+  while (data_iter_.iter() == NULL || !data_iter_.Valid()) {
+    // Move to next block
+    if (!index_iter_.Valid()) {
+      SetDataIterator(NULL);
+      return;
+    }
+    index_iter_.Prev();
+    InitDataBlock();
+    if (data_iter_.iter() != NULL) data_iter_.SeekToLast();
+  }
+}
+
+void TwoLevelIterator::SetDataIterator(Iterator* data_iter) {
+  if (data_iter_.iter() != NULL) SaveError(data_iter_.status());
+  data_iter_.Set(data_iter);
+}
+
+void TwoLevelIterator::InitDataBlock() {
+  if (!index_iter_.Valid()) {
+    SetDataIterator(NULL);
+  } else {
+    Slice handle = index_iter_.value();
+    if (data_iter_.iter() != NULL && handle.compare(data_block_handle_) == 0) {
+      // data_iter_ is already constructed with this iterator, so
+      // no need to change anything
+    } else {
+      Iterator* iter = (*block_function_)(arg_, options_, handle);
+      data_block_handle_.assign(handle.data(), handle.size());
+      SetDataIterator(iter);
+    }
+  }
+}
+
+}  // namespace
+
+Iterator* NewTwoLevelIterator(
+    Iterator* index_iter,
+    BlockFunction block_function,
+    void* arg,
+    const ReadOptions& options) {
+  return new TwoLevelIterator(index_iter, block_function, arg, options);
+}
+
+}  // namespace leveldb

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.h
new file mode 100644
index 0000000..629ca34
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/table/two_level_iterator.h
@@ -0,0 +1,34 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_
+#define STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_
+
+#include "leveldb/iterator.h"
+
+namespace leveldb {
+
+struct ReadOptions;
+
+// Return a new two level iterator.  A two-level iterator contains an
+// index iterator whose values point to a sequence of blocks where
+// each block is itself a sequence of key,value pairs.  The returned
+// two-level iterator yields the concatenation of all key/value pairs
+// in the sequence of blocks.  Takes ownership of "index_iter" and
+// will delete it when no longer needed.
+//
+// Uses a supplied function to convert an index_iter value into
+// an iterator over the contents of the corresponding block.
+extern Iterator* NewTwoLevelIterator(
+    Iterator* index_iter,
+    Iterator* (*block_function)(
+        void* arg,
+        const ReadOptions& options,
+        const Slice& index_value),
+    void* arg,
+    const ReadOptions& options);
+
+}  // namespace leveldb
+
+#endif  // STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.cc
new file mode 100644
index 0000000..9367f71
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.cc
@@ -0,0 +1,68 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "util/arena.h"
+#include <assert.h>
+
+namespace leveldb {
+
+static const int kBlockSize = 4096;
+
+Arena::Arena() {
+  blocks_memory_ = 0;
+  alloc_ptr_ = NULL;  // First allocation will allocate a block
+  alloc_bytes_remaining_ = 0;
+}
+
+Arena::~Arena() {
+  for (size_t i = 0; i < blocks_.size(); i++) {
+    delete[] blocks_[i];
+  }
+}
+
+char* Arena::AllocateFallback(size_t bytes) {
+  if (bytes > kBlockSize / 4) {
+    // Object is more than a quarter of our block size.  Allocate it separately
+    // to avoid wasting too much space in leftover bytes.
+    char* result = AllocateNewBlock(bytes);
+    return result;
+  }
+
+  // We waste the remaining space in the current block.
+  alloc_ptr_ = AllocateNewBlock(kBlockSize);
+  alloc_bytes_remaining_ = kBlockSize;
+
+  char* result = alloc_ptr_;
+  alloc_ptr_ += bytes;
+  alloc_bytes_remaining_ -= bytes;
+  return result;
+}
+
+char* Arena::AllocateAligned(size_t bytes) {
+  const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
+  assert((align & (align-1)) == 0);   // Pointer size should be a power of 2
+  size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
+  size_t slop = (current_mod == 0 ? 0 : align - current_mod);
+  size_t needed = bytes + slop;
+  char* result;
+  if (needed <= alloc_bytes_remaining_) {
+    result = alloc_ptr_ + slop;
+    alloc_ptr_ += needed;
+    alloc_bytes_remaining_ -= needed;
+  } else {
+    // AllocateFallback always returned aligned memory
+    result = AllocateFallback(bytes);
+  }
+  assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
+  return result;
+}
+
+char* Arena::AllocateNewBlock(size_t block_bytes) {
+  char* result = new char[block_bytes];
+  blocks_memory_ += block_bytes;
+  blocks_.push_back(result);
+  return result;
+}
+
+}  // namespace leveldb

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.h
new file mode 100644
index 0000000..73bbf1c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena.h
@@ -0,0 +1,68 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef STORAGE_LEVELDB_UTIL_ARENA_H_
+#define STORAGE_LEVELDB_UTIL_ARENA_H_
+
+#include <vector>
+#include <assert.h>
+#include <stddef.h>
+#include <stdint.h>
+
+namespace leveldb {
+
+class Arena {
+ public:
+  Arena();
+  ~Arena();
+
+  // Return a pointer to a newly allocated memory block of "bytes" bytes.
+  char* Allocate(size_t bytes);
+
+  // Allocate memory with the normal alignment guarantees provided by malloc
+  char* AllocateAligned(size_t bytes);
+
+  // Returns an estimate of the total memory usage of data allocated
+  // by the arena (including space allocated but not yet used for user
+  // allocations).
+  size_t MemoryUsage() const {
+    return blocks_memory_ + blocks_.capacity() * sizeof(char*);
+  }
+
+ private:
+  char* AllocateFallback(size_t bytes);
+  char* AllocateNewBlock(size_t block_bytes);
+
+  // Allocation state
+  char* alloc_ptr_;
+  size_t alloc_bytes_remaining_;
+
+  // Array of new[] allocated memory blocks
+  std::vector<char*> blocks_;
+
+  // Bytes of memory in blocks allocated so far
+  size_t blocks_memory_;
+
+  // No copying allowed
+  Arena(const Arena&);
+  void operator=(const Arena&);
+};
+
+inline char* Arena::Allocate(size_t bytes) {
+  // The semantics of what to return are a bit messy if we allow
+  // 0-byte allocations, so we disallow them here (we don't need
+  // them for our internal use).
+  assert(bytes > 0);
+  if (bytes <= alloc_bytes_remaining_) {
+    char* result = alloc_ptr_;
+    alloc_ptr_ += bytes;
+    alloc_bytes_remaining_ -= bytes;
+    return result;
+  }
+  return AllocateFallback(bytes);
+}
+
+}  // namespace leveldb
+
+#endif  // STORAGE_LEVELDB_UTIL_ARENA_H_

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena_test.cc
new file mode 100644
index 0000000..58e870e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/arena_test.cc
@@ -0,0 +1,68 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "util/arena.h"
+
+#include "util/random.h"
+#include "util/testharness.h"
+
+namespace leveldb {
+
+class ArenaTest { };
+
+TEST(ArenaTest, Empty) {
+  Arena arena;
+}
+
+TEST(ArenaTest, Simple) {
+  std::vector<std::pair<size_t, char*> > allocated;
+  Arena arena;
+  const int N = 100000;
+  size_t bytes = 0;
+  Random rnd(301);
+  for (int i = 0; i < N; i++) {
+    size_t s;
+    if (i % (N / 10) == 0) {
+      s = i;
+    } else {
+      s = rnd.OneIn(4000) ? rnd.Uniform(6000) :
+          (rnd.OneIn(10) ? rnd.Uniform(100) : rnd.Uniform(20));
+    }
+    if (s == 0) {
+      // Our arena disallows size 0 allocations.
+      s = 1;
+    }
+    char* r;
+    if (rnd.OneIn(10)) {
+      r = arena.AllocateAligned(s);
+    } else {
+      r = arena.Allocate(s);
+    }
+
+    for (size_t b = 0; b < s; b++) {
+      // Fill the "i"th allocation with a known bit pattern
+      r[b] = i % 256;
+    }
+    bytes += s;
+    allocated.push_back(std::make_pair(s, r));
+    ASSERT_GE(arena.MemoryUsage(), bytes);
+    if (i > N/10) {
+      ASSERT_LE(arena.MemoryUsage(), bytes * 1.10);
+    }
+  }
+  for (size_t i = 0; i < allocated.size(); i++) {
+    size_t num_bytes = allocated[i].first;
+    const char* p = allocated[i].second;
+    for (size_t b = 0; b < num_bytes; b++) {
+      // Check the "i"th allocation for the known bit pattern
+      ASSERT_EQ(int(p[b]) & 0xff, i % 256);
+    }
+  }
+}
+
+}  // namespace leveldb
+
+int main(int argc, char** argv) {
+  return leveldb::test::RunAllTests();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/bloom.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/bloom.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/bloom.cc
new file mode 100644
index 0000000..d7941cd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/util/bloom.cc
@@ -0,0 +1,95 @@
+// Copyright (c) 2012 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "leveldb/filter_policy.h"
+
+#include "leveldb/slice.h"
+#include "util/hash.h"
+
+namespace leveldb {
+
+namespace {
+static uint32_t BloomHash(const Slice& key) {
+  return Hash(key.data(), key.size(), 0xbc9f1d34);
+}
+
+class BloomFilterPolicy : public FilterPolicy {
+ private:
+  size_t bits_per_key_;
+  size_t k_;
+
+ public:
+  explicit BloomFilterPolicy(int bits_per_key)
+      : bits_per_key_(bits_per_key) {
+    // We intentionally round down to reduce probing cost a little bit
+    k_ = static_cast<size_t>(bits_per_key * 0.69);  // 0.69 =~ ln(2)
+    if (k_ < 1) k_ = 1;
+    if (k_ > 30) k_ = 30;
+  }
+
+  virtual const char* Name() const {
+    return "leveldb.BuiltinBloomFilter";
+  }
+
+  virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const {
+    // Compute bloom filter size (in both bits and bytes)
+    size_t bits = n * bits_per_key_;
+
+    // For small n, we can see a very high false positive rate.  Fix it
+    // by enforcing a minimum bloom filter length.
+    if (bits < 64) bits = 64;
+
+    size_t bytes = (bits + 7) / 8;
+    bits = bytes * 8;
+
+    const size_t init_size = dst->size();
+    dst->resize(init_size + bytes, 0);
+    dst->push_back(static_cast<char>(k_));  // Remember # of probes in filter
+    char* array = &(*dst)[init_size];
+    for (size_t i = 0; i < n; i++) {
+      // Use double-hashing to generate a sequence of hash values.
+      // See analysis in [Kirsch,Mitzenmacher 2006].
+      uint32_t h = BloomHash(keys[i]);
+      const uint32_t delta = (h >> 17) | (h << 15);  // Rotate right 17 bits
+      for (size_t j = 0; j < k_; j++) {
+        const uint32_t bitpos = h % bits;
+        array[bitpos/8] |= (1 << (bitpos % 8));
+        h += delta;
+      }
+    }
+  }
+
+  virtual bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const {
+    const size_t len = bloom_filter.size();
+    if (len < 2) return false;
+
+    const char* array = bloom_filter.data();
+    const size_t bits = (len - 1) * 8;
+
+    // Use the encoded k so that we can read filters generated by
+    // bloom filters created using different parameters.
+    const size_t k = array[len-1];
+    if (k > 30) {
+      // Reserved for potentially new encodings for short bloom filters.
+      // Consider it a match.
+      return true;
+    }
+
+    uint32_t h = BloomHash(key);
+    const uint32_t delta = (h >> 17) | (h << 15);  // Rotate right 17 bits
+    for (size_t j = 0; j < k; j++) {
+      const uint32_t bitpos = h % bits;
+      if ((array[bitpos/8] & (1 << (bitpos % 8))) == 0) return false;
+      h += delta;
+    }
+    return true;
+  }
+};
+}
+
+const FilterPolicy* NewBloomFilterPolicy(int bits_per_key) {
+  return new BloomFilterPolicy(bits_per_key);
+}
+
+}  // namespace leveldb