You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2017/06/06 18:39:17 UTC

[1/2] spark git commit: [SPARK-20641][CORE] Add key-value store abstraction and LevelDB implementation.

Repository: spark
Updated Branches:
  refs/heads/master b61a401da -> 0cba49512


http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
new file mode 100644
index 0000000..5e33606
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * A set of small benchmarks for the LevelDB implementation.
+ *
+ * The benchmarks are run over two different types (one with just a natural index, and one
+ * with a ref index), over a set of 2^20 elements, and the following tests are performed:
+ *
+ * - write (then update) elements in sequential natural key order
+ * - write (then update) elements in random natural key order
+ * - iterate over natural index, ascending and descending
+ * - iterate over ref index, ascending and descending
+ */
+@Ignore
+public class LevelDBBenchmark {
+
+  private static final int COUNT = 1024;
+  private static final AtomicInteger IDGEN = new AtomicInteger();
+  private static final MetricRegistry metrics = new MetricRegistry();
+  private static final Timer dbCreation = metrics.timer("dbCreation");
+  private static final Timer dbClose = metrics.timer("dbClose");
+
+  private LevelDB db;
+  private File dbpath;
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    try(Timer.Context ctx = dbCreation.time()) {
+      db = new LevelDB(dbpath);
+    }
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      try(Timer.Context ctx = dbClose.time()) {
+        db.close();
+      }
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @AfterClass
+  public static void report() {
+    if (metrics.getTimers().isEmpty()) {
+      return;
+    }
+
+    int headingPrefix = 0;
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      headingPrefix = Math.max(e.getKey().length(), headingPrefix);
+    }
+    headingPrefix += 4;
+
+    StringBuilder heading = new StringBuilder();
+    for (int i = 0; i < headingPrefix; i++) {
+      heading.append(" ");
+    }
+    heading.append("\tcount");
+    heading.append("\tmean");
+    heading.append("\tmin");
+    heading.append("\tmax");
+    heading.append("\t95th");
+    System.out.println(heading);
+
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      StringBuilder row = new StringBuilder();
+      row.append(e.getKey());
+      for (int i = 0; i < headingPrefix - e.getKey().length(); i++) {
+        row.append(" ");
+      }
+
+      Snapshot s = e.getValue().getSnapshot();
+      row.append("\t").append(e.getValue().getCount());
+      row.append("\t").append(toMs(s.getMean()));
+      row.append("\t").append(toMs(s.getMin()));
+      row.append("\t").append(toMs(s.getMax()));
+      row.append("\t").append(toMs(s.get95thPercentile()));
+
+      System.out.println(row);
+    }
+
+    Slf4jReporter.forRegistry(metrics).outputTo(LoggerFactory.getLogger(LevelDBBenchmark.class))
+      .build().report();
+  }
+
+  private static String toMs(double nanos) {
+    return String.format("%.3f", nanos / 1000 / 1000);
+  }
+
+  @Test
+  public void sequentialWritesNoIndex() throws Exception {
+    List<SimpleType> entries = createSimpleType();
+    writeAll(entries, "sequentialWritesNoIndex");
+    writeAll(entries, "sequentialUpdatesNoIndex");
+    deleteNoIndex(entries, "sequentialDeleteNoIndex");
+  }
+
+  @Test
+  public void randomWritesNoIndex() throws Exception {
+    List<SimpleType> entries = createSimpleType();
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomWritesNoIndex");
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomUpdatesNoIndex");
+
+    Collections.shuffle(entries);
+    deleteNoIndex(entries, "randomDeletesNoIndex");
+  }
+
+  @Test
+  public void sequentialWritesIndexedType() throws Exception {
+    List<IndexedType> entries = createIndexedType();
+    writeAll(entries, "sequentialWritesIndexed");
+    writeAll(entries, "sequentialUpdatesIndexed");
+    deleteIndexed(entries, "sequentialDeleteIndexed");
+  }
+
+  @Test
+  public void randomWritesIndexedTypeAndIteration() throws Exception {
+    List<IndexedType> entries = createIndexedType();
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomWritesIndexed");
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomUpdatesIndexed");
+
+    // Run iteration benchmarks here since we've gone through the trouble of writing all
+    // the data already.
+    KVStoreView<?> view = db.view(IndexedType.class);
+    iterate(view, "naturalIndex");
+    iterate(view.reverse(), "naturalIndexDescending");
+    iterate(view.index("name"), "refIndex");
+    iterate(view.index("name").reverse(), "refIndexDescending");
+
+    Collections.shuffle(entries);
+    deleteIndexed(entries, "randomDeleteIndexed");
+  }
+
+  private void iterate(KVStoreView<?> view, String name) throws Exception {
+    Timer create = metrics.timer(name + "CreateIterator");
+    Timer iter = metrics.timer(name + "Iteration");
+    KVStoreIterator<?> it = null;
+    {
+      // Create the iterator several times, just to have multiple data points.
+      for (int i = 0; i < 1024; i++) {
+        if (it != null) {
+          it.close();
+        }
+        try(Timer.Context ctx = create.time()) {
+          it = view.closeableIterator();
+        }
+      }
+    }
+
+    for (; it.hasNext(); ) {
+      try(Timer.Context ctx = iter.time()) {
+        it.next();
+      }
+    }
+  }
+
+  private void writeAll(List<?> entries, String timerName) throws Exception {
+    Timer timer = newTimer(timerName);
+    for (Object o : entries) {
+      try(Timer.Context ctx = timer.time()) {
+        db.write(o);
+      }
+    }
+  }
+
+  private void deleteNoIndex(List<SimpleType> entries, String timerName) throws Exception {
+    Timer delete = newTimer(timerName);
+    for (SimpleType i : entries) {
+      try(Timer.Context ctx = delete.time()) {
+        db.delete(i.getClass(), i.key);
+      }
+    }
+  }
+
+  private void deleteIndexed(List<IndexedType> entries, String timerName) throws Exception {
+    Timer delete = newTimer(timerName);
+    for (IndexedType i : entries) {
+      try(Timer.Context ctx = delete.time()) {
+        db.delete(i.getClass(), i.key);
+      }
+    }
+  }
+
+  private List<SimpleType> createSimpleType() {
+    List<SimpleType> entries = new ArrayList<>();
+    for (int i = 0; i < COUNT; i++) {
+      SimpleType t = new SimpleType();
+      t.key = IDGEN.getAndIncrement();
+      t.name = "name" + (t.key % 1024);
+      entries.add(t);
+    }
+    return entries;
+  }
+
+  private List<IndexedType> createIndexedType() {
+    List<IndexedType> entries = new ArrayList<>();
+    for (int i = 0; i < COUNT; i++) {
+      IndexedType t = new IndexedType();
+      t.key = IDGEN.getAndIncrement();
+      t.name = "name" + (t.key % 1024);
+      entries.add(t);
+    }
+    return entries;
+  }
+
+  private Timer newTimer(String name) {
+    assertNull("Timer already exists: " + name, metrics.getTimers().get(name));
+    return metrics.timer(name);
+  }
+
+  public static class SimpleType {
+
+    @KVIndex
+    public int key;
+
+    public String name;
+
+  }
+
+  public static class IndexedType {
+
+    @KVIndex
+    public int key;
+
+    @KVIndex("name")
+    public String name;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
new file mode 100644
index 0000000..9340971
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+
+public class LevelDBIteratorSuite extends DBIteratorSuite {
+
+  private static File dbpath;
+  private static LevelDB db;
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Override
+  protected KVStore createStore() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    db = new LevelDB(dbpath);
+    return db;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
new file mode 100644
index 0000000..ee1c397
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.commons.io.FileUtils;
+import org.iq80.leveldb.DBIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBSuite {
+
+  private LevelDB db;
+  private File dbpath;
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    db = new LevelDB(dbpath);
+  }
+
+  @Test
+  public void testReopenAndVersionCheckDb() throws Exception {
+    db.close();
+    db = null;
+    assertTrue(dbpath.exists());
+
+    db = new LevelDB(dbpath);
+    assertEquals(LevelDB.STORE_VERSION,
+      db.serializer.deserializeLong(db.db().get(LevelDB.STORE_VERSION_KEY)));
+    db.db().put(LevelDB.STORE_VERSION_KEY, db.serializer.serialize(LevelDB.STORE_VERSION + 1));
+    db.close();
+    db = null;
+
+    try {
+      db = new LevelDB(dbpath);
+      fail("Should have failed version check.");
+    } catch (UnsupportedStoreVersionException e) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testObjectWriteReadDelete() throws Exception {
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    try {
+      db.read(CustomType1.class, t.key);
+      fail("Expected exception for non-existant object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    db.write(t);
+    assertEquals(t, db.read(t.getClass(), t.key));
+    assertEquals(1L, db.count(t.getClass()));
+
+    db.delete(t.getClass(), t.key);
+    try {
+      db.read(t.getClass(), t.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    // Look into the actual DB and make sure that all the keys related to the type have been
+    // removed.
+    assertEquals(0, countKeys(t.getClass()));
+  }
+
+  @Test
+  public void testMultipleObjectWriteReadDelete() throws Exception {
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.id = "id";
+    t1.name = "name1";
+    t1.child = "child1";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.id = "id";
+    t2.name = "name2";
+    t2.child = "child2";
+
+    db.write(t1);
+    db.write(t2);
+
+    assertEquals(t1, db.read(t1.getClass(), t1.key));
+    assertEquals(t2, db.read(t2.getClass(), t2.key));
+    assertEquals(2L, db.count(t1.getClass()));
+
+    // There should be one "id" index entry with two values.
+    assertEquals(2, db.count(t1.getClass(), "id", t1.id));
+
+    // Delete the first entry; now there should be 3 remaining keys, since one of the "name"
+    // index entries should have been removed.
+    db.delete(t1.getClass(), t1.key);
+
+    // Make sure there's a single entry in the "id" index now.
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+
+    // Delete the remaining entry, make sure all data is gone.
+    db.delete(t2.getClass(), t2.key);
+    assertEquals(0, countKeys(t2.getClass()));
+  }
+
+  @Test
+  public void testMultipleTypesWriteReadDelete() throws Exception {
+    CustomType1 t1 = new CustomType1();
+    t1.key = "1";
+    t1.id = "id";
+    t1.name = "name1";
+    t1.child = "child1";
+
+    IntKeyType t2 = new IntKeyType();
+    t2.key = 2;
+    t2.id = "2";
+    t2.values = Arrays.asList("value1", "value2");
+
+    ArrayKeyIndexType t3 = new ArrayKeyIndexType();
+    t3.key = new int[] { 42, 84 };
+    t3.id = new String[] { "id1", "id2" };
+
+    db.write(t1);
+    db.write(t2);
+    db.write(t3);
+
+    assertEquals(t1, db.read(t1.getClass(), t1.key));
+    assertEquals(t2, db.read(t2.getClass(), t2.key));
+    assertEquals(t3, db.read(t3.getClass(), t3.key));
+
+    // There should be one "id" index with a single entry for each type.
+    assertEquals(1, db.count(t1.getClass(), "id", t1.id));
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
+
+    // Delete the first entry; this should not affect the entries for the second type.
+    db.delete(t1.getClass(), t1.key);
+    assertEquals(0, countKeys(t1.getClass()));
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
+
+    // Delete the remaining entries, make sure all data is gone.
+    db.delete(t2.getClass(), t2.key);
+    assertEquals(0, countKeys(t2.getClass()));
+
+    db.delete(t3.getClass(), t3.key);
+    assertEquals(0, countKeys(t3.getClass()));
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    assertNull(db.getMetadata(CustomType1.class));
+
+    CustomType1 t = new CustomType1();
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    db.setMetadata(t);
+    assertEquals(t, db.getMetadata(CustomType1.class));
+
+    db.setMetadata(null);
+    assertNull(db.getMetadata(CustomType1.class));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    db.write(t);
+
+    t.name = "anotherName";
+
+    db.write(t);
+
+    assertEquals(1, db.count(t.getClass()));
+    assertEquals(1, db.count(t.getClass(), "name", "anotherName"));
+    assertEquals(0, db.count(t.getClass(), "name", "name"));
+  }
+
+  @Test
+  public void testSkip() throws Exception {
+    for (int i = 0; i < 10; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "key" + i;
+      t.id = "id" + i;
+      t.name = "name" + i;
+      t.child = "child" + i;
+
+      db.write(t);
+    }
+
+    KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator();
+    assertTrue(it.hasNext());
+    assertTrue(it.skip(5));
+    assertEquals("key5", it.next().key);
+    assertTrue(it.skip(3));
+    assertEquals("key9", it.next().key);
+    assertFalse(it.hasNext());
+  }
+
+  private int countKeys(Class<?> type) throws Exception {
+    byte[] prefix = db.getTypeInfo(type).keyPrefix();
+    int count = 0;
+
+    DBIterator it = db.db().iterator();
+    it.seek(prefix);
+
+    while (it.hasNext()) {
+      byte[] key = it.next().getKey();
+      if (LevelDBIterator.startsWith(key, prefix)) {
+        count++;
+      }
+    }
+
+    return count;
+  }
+
+  public static class IntKeyType {
+
+    @KVIndex
+    public int key;
+
+    @KVIndex("id")
+    public String id;
+
+    public List<String> values;
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof IntKeyType) {
+        IntKeyType other = (IntKeyType) o;
+        return key == other.key && id.equals(other.id) && values.equals(other.values);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return id.hashCode();
+    }
+
+  }
+
+  public static class ArrayKeyIndexType {
+
+    @KVIndex
+    public int[] key;
+
+    @KVIndex("id")
+    public String[] id;
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof ArrayKeyIndexType) {
+        ArrayKeyIndexType other = (ArrayKeyIndexType) o;
+        return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return key.hashCode();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
new file mode 100644
index 0000000..8e61965
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBTypeInfoSuite {
+
+  @Test
+  public void testIndexAnnotation() throws Exception {
+    KVTypeInfo ti = new KVTypeInfo(CustomType1.class);
+    assertEquals(5, ti.indices().count());
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key";
+    t1.id = "id";
+    t1.name = "name";
+    t1.num = 42;
+    t1.child = "child";
+
+    assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1));
+    assertEquals(t1.id, ti.getIndexValue("id", t1));
+    assertEquals(t1.name, ti.getIndexValue("name", t1));
+    assertEquals(t1.num, ti.getIndexValue("int", t1));
+    assertEquals(t1.child, ti.getIndexValue("child", t1));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex() throws Exception {
+    newTypeInfo(NoNaturalIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex2() throws Exception {
+    newTypeInfo(NoNaturalIndex2.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDuplicateIndex() throws Exception {
+    newTypeInfo(DuplicateIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyIndexName() throws Exception {
+    newTypeInfo(EmptyIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexName() throws Exception {
+    newTypeInfo(IllegalIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexMethod() throws Exception {
+    newTypeInfo(IllegalIndexMethod.class);
+  }
+
+  @Test
+  public void testKeyClashes() throws Exception {
+    LevelDBTypeInfo ti = newTypeInfo(CustomType1.class);
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.name = "a";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.name = "aa";
+
+    CustomType1 t3 = new CustomType1();
+    t3.key = "key3";
+    t3.name = "aaa";
+
+    // Make sure entries with conflicting names are sorted correctly.
+    assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t2));
+    assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t3));
+    assertBefore(ti.index("name").entityKey(null, t2), ti.index("name").entityKey(null, t3));
+  }
+
+  @Test
+  public void testNumEncoding() throws Exception {
+    LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next();
+
+    assertEquals("+=00000001", new String(idx.toKey(1), UTF_8));
+    assertEquals("+=00000010", new String(idx.toKey(16), UTF_8));
+    assertEquals("+=7fffffff", new String(idx.toKey(Integer.MAX_VALUE), UTF_8));
+
+    assertBefore(idx.toKey(1), idx.toKey(2));
+    assertBefore(idx.toKey(-1), idx.toKey(2));
+    assertBefore(idx.toKey(-11), idx.toKey(2));
+    assertBefore(idx.toKey(-11), idx.toKey(-1));
+    assertBefore(idx.toKey(1), idx.toKey(11));
+    assertBefore(idx.toKey(Integer.MIN_VALUE), idx.toKey(Integer.MAX_VALUE));
+
+    assertBefore(idx.toKey(1L), idx.toKey(2L));
+    assertBefore(idx.toKey(-1L), idx.toKey(2L));
+    assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE));
+
+    assertBefore(idx.toKey((short) 1), idx.toKey((short) 2));
+    assertBefore(idx.toKey((short) -1), idx.toKey((short) 2));
+    assertBefore(idx.toKey(Short.MIN_VALUE), idx.toKey(Short.MAX_VALUE));
+
+    assertBefore(idx.toKey((byte) 1), idx.toKey((byte) 2));
+    assertBefore(idx.toKey((byte) -1), idx.toKey((byte) 2));
+    assertBefore(idx.toKey(Byte.MIN_VALUE), idx.toKey(Byte.MAX_VALUE));
+
+    byte prefix = LevelDBTypeInfo.ENTRY_PREFIX;
+    assertSame(new byte[] { prefix, LevelDBTypeInfo.FALSE }, idx.toKey(false));
+    assertSame(new byte[] { prefix, LevelDBTypeInfo.TRUE }, idx.toKey(true));
+  }
+
+  @Test
+  public void testArrayIndices() throws Exception {
+    LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next();
+
+    assertBefore(idx.toKey(new String[] { "str1" }), idx.toKey(new String[] { "str2" }));
+    assertBefore(idx.toKey(new String[] { "str1", "str2" }),
+      idx.toKey(new String[] { "str1", "str3" }));
+
+    assertBefore(idx.toKey(new int[] { 1 }), idx.toKey(new int[] { 2 }));
+    assertBefore(idx.toKey(new int[] { 1, 2 }), idx.toKey(new int[] { 1, 3 }));
+  }
+
+  private LevelDBTypeInfo newTypeInfo(Class<?> type) throws Exception {
+    return new LevelDBTypeInfo(null, type, type.getName().getBytes(UTF_8));
+  }
+
+  private void assertBefore(byte[] key1, byte[] key2) {
+    assertBefore(new String(key1, UTF_8), new String(key2, UTF_8));
+  }
+
+  private void assertBefore(String str1, String str2) {
+    assertTrue(String.format("%s < %s failed", str1, str2), str1.compareTo(str2) < 0);
+  }
+
+  private void assertSame(byte[] key1, byte[] key2) {
+    assertEquals(new String(key1, UTF_8), new String(key2, UTF_8));
+  }
+
+  public static class NoNaturalIndex {
+
+    public String id;
+
+  }
+
+  public static class NoNaturalIndex2 {
+
+    @KVIndex("id")
+    public String id;
+
+  }
+
+  public static class DuplicateIndex {
+
+    @KVIndex
+    public String key;
+
+    @KVIndex("id")
+    public String id;
+
+    @KVIndex("id")
+    public String id2;
+
+  }
+
+  public static class EmptyIndexName {
+
+    @KVIndex("")
+    public String id;
+
+  }
+
+  public static class IllegalIndexName {
+
+    @KVIndex("__invalid")
+    public String id;
+
+  }
+
+  public static class IllegalIndexMethod {
+
+    @KVIndex("id")
+    public String id(boolean illegalParam) {
+      return null;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/resources/log4j.properties b/common/kvstore/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e8da774
--- /dev/null
+++ b/common/kvstore/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=DEBUG, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Silence verbose logs from 3rd-party libraries.
+log4j.logger.io.netty=INFO

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0533a8d..6835ea1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
 
   <modules>
     <module>common/sketch</module>
+    <module>common/kvstore</module>
     <module>common/network-common</module>
     <module>common/network-shuffle</module>
     <module>common/unsafe</module>
@@ -442,6 +443,11 @@
         <version>${commons.httpcore.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.fusesource.leveldbjni</groupId>
+        <artifactId>leveldbjni-all</artifactId>
+        <version>1.8</version>
+      </dependency>
+      <dependency>
         <groupId>org.seleniumhq.selenium</groupId>
         <artifactId>selenium-java</artifactId>
         <version>${selenium.version}</version>
@@ -590,6 +596,11 @@
       </dependency>
       <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-core</artifactId>
+        <version>${fasterxml.jackson.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-databind</artifactId>
         <version>${fasterxml.jackson.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b5362ec..89b0c7a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -50,10 +50,10 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
-    core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, _*
+    core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _*
   ) = Seq(
     "core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe",
-    "tags", "sketch"
+    "tags", "sketch", "kvstore"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
   val optionallyEnabledProjects@Seq(mesos, yarn, sparkGangliaLgpl,
@@ -310,7 +310,7 @@ object SparkBuild extends PomBuild {
   val mimaProjects = allProjects.filterNot { x =>
     Seq(
       spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
-      unsafe, tags, sqlKafka010
+      unsafe, tags, sqlKafka010, kvstore
     ).contains(x)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-20641][CORE] Add key-value store abstraction and LevelDB implementation.

Posted by ir...@apache.org.
[SPARK-20641][CORE] Add key-value store abstraction and LevelDB implementation.

This change adds an abstraction and LevelDB implementation for a key-value
store that will be used to store UI and SHS data.

The interface is described in KVStore.java (see javadoc). Specifics
of the LevelDB implementation are discussed in the javadocs of both
LevelDB.java and LevelDBTypeInfo.java.

Included also are a few small benchmarks just to get some idea of
latency. Because they're too slow for regular unit test runs, they're
disabled by default.

Tested with the included unit tests, and also as part of the overall feature
implementation (including running SHS with hundreds of apps).

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #17902 from vanzin/shs-ng/M1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cba4951
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cba4951
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cba4951

Branch: refs/heads/master
Commit: 0cba495120bc5a889ceeb8d66713a053d7561be2
Parents: b61a401
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jun 6 13:39:10 2017 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Tue Jun 6 13:39:10 2017 -0500

----------------------------------------------------------------------
 common/kvstore/pom.xml                          | 101 ++++
 .../java/org/apache/spark/kvstore/KVIndex.java  |  82 +++
 .../java/org/apache/spark/kvstore/KVStore.java  | 129 +++++
 .../apache/spark/kvstore/KVStoreIterator.java   |  47 ++
 .../apache/spark/kvstore/KVStoreSerializer.java |  86 ++++
 .../org/apache/spark/kvstore/KVStoreView.java   | 126 +++++
 .../org/apache/spark/kvstore/KVTypeInfo.java    | 156 ++++++
 .../java/org/apache/spark/kvstore/LevelDB.java  | 308 +++++++++++
 .../apache/spark/kvstore/LevelDBIterator.java   | 278 ++++++++++
 .../apache/spark/kvstore/LevelDBTypeInfo.java   | 516 +++++++++++++++++++
 .../UnsupportedStoreVersionException.java       |  27 +
 .../org/apache/spark/kvstore/CustomType1.java   |  63 +++
 .../apache/spark/kvstore/DBIteratorSuite.java   | 506 ++++++++++++++++++
 .../apache/spark/kvstore/LevelDBBenchmark.java  | 280 ++++++++++
 .../spark/kvstore/LevelDBIteratorSuite.java     |  48 ++
 .../org/apache/spark/kvstore/LevelDBSuite.java  | 312 +++++++++++
 .../spark/kvstore/LevelDBTypeInfoSuite.java     | 207 ++++++++
 .../kvstore/src/test/resources/log4j.properties |  27 +
 pom.xml                                         |  11 +
 project/SparkBuild.scala                        |   6 +-
 20 files changed, 3313 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/pom.xml
----------------------------------------------------------------------
diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
new file mode 100644
index 0000000..d00cf27
--- /dev/null
+++ b/common/kvstore/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-kvstore_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Local DB</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>kvstore</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!--
+      This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
+      them will yield errors.
+    -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
new file mode 100644
index 0000000..8b88990
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Tags a field to be indexed when storing an object.
+ *
+ * <p>
+ * Types are required to have a natural index that uniquely identifies instances in the store.
+ * The default value of the annotation identifies the natural index for the type.
+ * </p>
+ *
+ * <p>
+ * Indexes allow for more efficient sorting of data read from the store. By annotating a field or
+ * "getter" method with this annotation, an index will be created that will provide sorting based on
+ * the string value of that field.
+ * </p>
+ *
+ * <p>
+ * Note that creating indices means more space will be needed, and maintenance operations like
+ * updating or deleting a value will become more expensive.
+ * </p>
+ *
+ * <p>
+ * Indices are restricted to String, integral types (byte, short, int, long, boolean), and arrays
+ * of those values.
+ * </p>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+public @interface KVIndex {
+
+  public static final String NATURAL_INDEX_NAME = "__main__";
+
+  /**
+   * The name of the index to be created for the annotated entity. Must be unique within
+   * the class. Index names are not allowed to start with an underscore (that's reserved for
+   * internal use). The default value is the natural index name (which is always a copy index
+   * regardless of the annotation's values).
+   */
+  String value() default NATURAL_INDEX_NAME;
+
+  /**
+   * The name of the parent index of this index. By default there is no parent index, so the
+   * generated data can be retrieved without having to provide a parent value.
+   *
+   * <p>
+   * If a parent index is defined, iterating over the data using the index will require providing
+   * a single value for the parent index. This serves as a rudimentary way to provide relationships
+   * between entities in the store.
+   * </p>
+   */
+  String parent() default "";
+
+  /**
+   * Whether to copy the instance's data to the index, instead of just storing a pointer to the
+   * data. The default behavior is to just store a reference; that saves disk space but is slower
+   * to read, since there's a level of indirection.
+   */
+  boolean copy() default false;
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
new file mode 100644
index 0000000..3be4b82
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * <p>
+ * There are two main features provided by the implementations of this interface:
+ * </p>
+ *
+ * <h3>Serialization</h3>
+ *
+ * <p>
+ * If the underlying data store requires serialization, data will be serialized to and deserialized
+ * using a {@link KVStoreSerializer}, which can be customized by the application. The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for controlling the serialization of
+ * app-defined types.
+ * </p>
+ *
+ * <p>
+ * Data is also automatically compressed to save disk space.
+ * </p>
+ *
+ * <h3>Automatic Key Management</h3>
+ *
+ * <p>
+ * When using the built-in key management, the implementation will automatically create unique
+ * keys for each type written to the store. Keys are based on the type name, and always start
+ * with the "+" prefix character (so that it's easy to use both manual and automatic key
+ * management APIs without conflicts).
+ * </p>
+ *
+ * <p>
+ * Another feature of automatic key management is indexing; by annotating fields or methods of
+ * objects written to the store with {@link KVIndex}, indices are created to sort the data
+ * by the values of those properties. This makes it possible to provide sorting without having
+ * to load all instances of those types from the store.
+ * </p>
+ *
+ * <p>
+ * KVStore instances are thread-safe for both reads and writes.
+ * </p>
+ */
+public interface KVStore extends Closeable {
+
+  /**
+   * Returns app-specific metadata from the store, or null if it's not currently set.
+   *
+   * <p>
+   * The metadata type is application-specific. This is a convenience method so that applications
+   * don't need to define their own keys for this information.
+   * </p>
+   */
+  <T> T getMetadata(Class<T> klass) throws Exception;
+
+  /**
+   * Writes the given value in the store metadata key.
+   */
+  void setMetadata(Object value) throws Exception;
+
+  /**
+   * Read a specific instance of an object.
+   *
+   * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys
+   *                   are not allowed.
+   * @throws NoSuchElementException If an element with the given key does not exist.
+   */
+  <T> T read(Class<T> klass, Object naturalKey) throws Exception;
+
+  /**
+   * Writes the given object to the store, including indexed fields. Indices are updated based
+   * on the annotated fields of the object's class.
+   *
+   * <p>
+   * Writes may be slower when the object already exists in the store, since it will involve
+   * updating existing indices.
+   * </p>
+   *
+   * @param value The object to write.
+   */
+  void write(Object value) throws Exception;
+
+  /**
+   * Removes an object and all data related to it, like index entries, from the store.
+   *
+   * @param type The object's type.
+   * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys
+   *                   are not allowed.
+   * @throws NoSuchElementException If an element with the given key does not exist.
+   */
+  void delete(Class<?> type, Object naturalKey) throws Exception;
+
+  /**
+   * Returns a configurable view for iterating over entities of the given type.
+   */
+  <T> KVStoreView<T> view(Class<T> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type currently in the store.
+   */
+  long count(Class<?> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type which match the given indexed value.
+   */
+  long count(Class<?> type, String index, Object indexedValue) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
new file mode 100644
index 0000000..3efdec9
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * An iterator for KVStore.
+ *
+ * <p>
+ * Iterators may keep references to resources that need to be closed. It's recommended that users
+ * explicitly close iterators after they're used.
+ * </p>
+ */
+public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable {
+
+  /**
+   * Retrieve multiple elements from the store.
+   *
+   * @param max Maximum number of elements to retrieve.
+   */
+  List<T> next(int max);
+
+  /**
+   * Skip in the iterator.
+   *
+   * @return Whether there are items left after skipping.
+   */
+  boolean skip(long n);
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
new file mode 100644
index 0000000..b84ec91
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB store.
+ *
+ * <p>
+ * The serializer is based on Jackson, so values are written as JSON. It also allows "naked strings"
+ * and integers to be written as values directly, which will be written as UTF-8 strings.
+ * </p>
+ */
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application requires a specific
+   * configuration of the mapper, it can subclass this serializer and add custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+    this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+    if (o instanceof String) {
+      return ((String) o).getBytes(UTF_8);
+    } else {
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      GZIPOutputStream out = new GZIPOutputStream(bytes);
+      try {
+        mapper.writeValue(out, o);
+      } finally {
+        out.close();
+      }
+      return bytes.toByteArray();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
+    if (klass.equals(String.class)) {
+      return (T) new String(data, UTF_8);
+    } else {
+      GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
+      try {
+        return mapper.readValue(in, klass);
+      } finally {
+        in.close();
+      }
+    }
+  }
+
+  final byte[] serialize(long value) {
+    return String.valueOf(value).getBytes(UTF_8);
+  }
+
+  final long deserializeLong(byte[] data) {
+    return Long.parseLong(new String(data, UTF_8));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
new file mode 100644
index 0000000..b761640
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A configurable view that allows iterating over values in a {@link KVStore}.
+ *
+ * <p>
+ * The different methods can be used to configure the behavior of the iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * </p>
+ *
+ * <p>
+ * The iterators returned by this view are of type {@link KVStoreIterator}; they auto-close
+ * when used in a for loop that exhausts their contents, but when used manually, they need
+ * to be closed explicitly unless all elements are read.
+ * </p>
+ */
+public abstract class KVStoreView<T> implements Iterable<T> {
+
+  final Class<T> type;
+
+  boolean ascending = true;
+  String index = KVIndex.NATURAL_INDEX_NAME;
+  Object first = null;
+  Object last = null;
+  Object parent = null;
+  long skip = 0L;
+  long max = Long.MAX_VALUE;
+
+  public KVStoreView(Class<T> type) {
+    this.type = type;
+  }
+
+  /**
+   * Reverses the order of iteration. By default, iterates in ascending order.
+   */
+  public KVStoreView<T> reverse() {
+    ascending = !ascending;
+    return this;
+  }
+
+  /**
+   * Iterates according to the given index.
+   */
+  public KVStoreView<T> index(String name) {
+    this.index = Preconditions.checkNotNull(name);
+    return this;
+  }
+
+  /**
+   * Defines the value of the parent index when iterating over a child index. Only elements that
+   * match the parent index's value will be included in the iteration.
+   *
+   * <p>
+   * Required for iterating over child indices, will generate an error if iterating over a
+   * parent-less index.
+   * </p>
+   */
+  public KVStoreView<T> parent(Object value) {
+    this.parent = value;
+    return this;
+  }
+
+  /**
+   * Iterates starting at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> first(Object value) {
+    this.first = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> last(Object value) {
+    this.last = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration after a number of elements has been retrieved.
+   */
+  public KVStoreView<T> max(long max) {
+    Preconditions.checkArgument(max > 0L, "max must be positive.");
+    this.max = max;
+    return this;
+  }
+
+  /**
+   * Skips a number of elements at the start of iteration. Skipped elements are not accounted
+   * when using {@link #max(long)}.
+   */
+  public KVStoreView<T> skip(long n) {
+    this.skip = n;
+    return this;
+  }
+
+  /**
+   * Returns an iterator for the current configuration.
+   */
+  public KVStoreIterator<T> closeableIterator() throws Exception {
+    return (KVStoreIterator<T>) iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
new file mode 100644
index 0000000..90f2ff0
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wrapper around types managed in a KVStore, providing easy access to their indexed fields.
+ */
+public class KVTypeInfo {
+
+  private final Class<?> type;
+  private final Map<String, KVIndex> indices;
+  private final Map<String, Accessor> accessors;
+
+  public KVTypeInfo(Class<?> type) throws Exception {
+    this.type = type;
+    this.accessors = new HashMap<>();
+    this.indices = new HashMap<>();
+
+    for (Field f : type.getDeclaredFields()) {
+      KVIndex idx = f.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        indices.put(idx.value(), idx);
+        f.setAccessible(true);
+        accessors.put(idx.value(), new FieldAccessor(f));
+      }
+    }
+
+    for (Method m : type.getDeclaredMethods()) {
+      KVIndex idx = m.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        Preconditions.checkArgument(m.getParameterTypes().length == 0,
+          "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName());
+        indices.put(idx.value(), idx);
+        m.setAccessible(true);
+        accessors.put(idx.value(), new MethodAccessor(m));
+      }
+    }
+
+    Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
+        "No natural index defined for type %s.", type.getName());
+    Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
+        "Natural index of %s cannot have a parent.", type.getName());
+
+    for (KVIndex idx : indices.values()) {
+      if (!idx.parent().isEmpty()) {
+        KVIndex parent = indices.get(idx.parent());
+        Preconditions.checkArgument(parent != null,
+          "Cannot find parent %s of index %s.", idx.parent(), idx.value());
+        Preconditions.checkArgument(parent.parent().isEmpty(),
+          "Parent index %s of index %s cannot be itself a child index.", idx.parent(), idx.value());
+      }
+    }
+  }
+
+  private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) {
+    Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(),
+      "No name provided for index in type %s.", type.getName());
+    Preconditions.checkArgument(
+      !idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
+      "Index name %s (in type %s) is not allowed.", idx.value(), type.getName());
+    Preconditions.checkArgument(idx.parent().isEmpty() || !idx.parent().equals(idx.value()),
+      "Index %s cannot be parent of itself.", idx.value());
+    Preconditions.checkArgument(!indices.containsKey(idx.value()),
+      "Duplicate index %s for type %s.", idx.value(), type.getName());
+  }
+
+  public Class<?> getType() {
+    return type;
+  }
+
+  public Object getIndexValue(String indexName, Object instance) throws Exception {
+    return getAccessor(indexName).get(instance);
+  }
+
+  public Stream<KVIndex> indices() {
+    return indices.values().stream();
+  }
+
+  Accessor getAccessor(String indexName) {
+    Accessor a = accessors.get(indexName);
+    Preconditions.checkArgument(a != null, "No index %s.", indexName);
+    return a;
+  }
+
+  Accessor getParentAccessor(String indexName) {
+    KVIndex index = indices.get(indexName);
+    return index.parent().isEmpty() ? null : getAccessor(index.parent());
+  }
+
+  /**
+   * Abstracts the difference between invoking a Field and a Method.
+   */
+  interface Accessor {
+
+    Object get(Object instance) throws Exception;
+
+  }
+
+  private class FieldAccessor implements Accessor {
+
+    private final Field field;
+
+    FieldAccessor(Field field) {
+      this.field = field;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return field.get(instance);
+    }
+
+  }
+
+  private class MethodAccessor implements Accessor {
+
+    private final Method method;
+
+    MethodAccessor(Method method) {
+      this.method = method;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return method.invoke(instance);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
new file mode 100644
index 0000000..08b22fd
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  final AtomicReference<DB> _db;
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
+
+  public LevelDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+
+    Options options = new Options();
+    options.createIfMissing(!path.exists());
+    this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    LevelDBTypeInfo ti = getTypeInfo(value.getClass());
+
+    try (WriteBatch batch = db().createWriteBatch()) {
+      byte[] data = serializer.serialize(value);
+      synchronized (ti) {
+        Object existing;
+        try {
+          existing = get(ti.naturalIndex().entityKey(null, value), value.getClass());
+        } catch (NoSuchElementException e) {
+          existing = null;
+        }
+
+        PrefixCache cache = new PrefixCache(value);
+        byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
+        for (LevelDBTypeInfo.Index idx : ti.indices()) {
+          byte[] prefix = cache.getPrefix(idx);
+          idx.add(batch, value, existing, data, naturalKey, prefix);
+        }
+        db().write(batch);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    try (WriteBatch batch = db().createWriteBatch()) {
+      LevelDBTypeInfo ti = getTypeInfo(type);
+      byte[] key = ti.naturalIndex().start(null, naturalKey);
+      synchronized (ti) {
+        byte[] data = db().get(key);
+        if (data != null) {
+          Object existing = serializer.deserialize(data, type);
+          PrefixCache cache = new PrefixCache(existing);
+          byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
+          for (LevelDBTypeInfo.Index idx : ti.indices()) {
+            idx.remove(batch, existing, keyBytes, cache.getPrefix(idx));
+          }
+          db().write(batch);
+        }
+      }
+    } catch (NoSuchElementException nse) {
+      // Ignore.
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    return new KVStoreView<T>(type) {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          return new LevelDBIterator<>(LevelDB.this, this);
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
+    return idx.getCount(idx.end(null));
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index);
+    return idx.getCount(idx.end(null, indexedValue));
+  }
+
+  @Override
+  public void close() throws IOException {
+    DB _db = this._db.getAndSet(null);
+    if (_db == null) {
+      return;
+    }
+
+    try {
+      _db.close();
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  /** Returns metadata about indices for the given type. */
+  LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
+    LevelDBTypeInfo ti = types.get(type);
+    if (ti == null) {
+      LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type));
+      ti = types.putIfAbsent(type, tmp);
+      if (ti == null) {
+        ti = tmp;
+      }
+    }
+    return ti;
+  }
+
+  /**
+   * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't
+   * prevent methods that retrieved the instance from using it after close, but hopefully will
+   * catch most cases; otherwise, we'll need some kind of locking.
+   */
+  DB db() {
+    DB _db = this._db.get();
+    if (_db == null) {
+      throw new IllegalStateException("DB is closed.");
+    }
+    return _db;
+  }
+
+  private byte[] getTypeAlias(Class<?> klass) throws Exception {
+    byte[] alias = typeAliases.get(klass.getName());
+    if (alias == null) {
+      synchronized (typeAliases) {
+        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
+        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
+        if (alias == null) {
+          alias = tmp;
+          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
+        }
+      }
+    }
+    return alias;
+  }
+
+  /** Needs to be public for Jackson. */
+  public static class TypeAliases {
+
+    public Map<String, byte[]> aliases;
+
+    TypeAliases(Map<String, byte[]> aliases) {
+      this.aliases = aliases;
+    }
+
+    TypeAliases() {
+      this(null);
+    }
+
+  }
+
+  private static class PrefixCache {
+
+    private final Object entity;
+    private final Map<LevelDBTypeInfo.Index, byte[]> prefixes;
+
+    PrefixCache(Object entity) {
+      this.entity = entity;
+      this.prefixes = new HashMap<>();
+    }
+
+    byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception {
+      byte[] prefix = null;
+      if (idx.isChild()) {
+        prefix = prefixes.get(idx.parent());
+        if (prefix == null) {
+          prefix = idx.parent().childPrefix(idx.parent().getValue(entity));
+          prefixes.put(idx.parent(), prefix);
+        }
+      }
+      return prefix;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
new file mode 100644
index 0000000..a5d0f9f
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.DBIterator;
+
+class LevelDBIterator<T> implements KVStoreIterator<T> {
+
+  private final LevelDB db;
+  private final boolean ascending;
+  private final DBIterator it;
+  private final Class<T> type;
+  private final LevelDBTypeInfo ti;
+  private final LevelDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().iterator();
+    this.type = params.type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.hasNext()) {
+        // When descending, the caller may have set up the start of iteration at a non-existant
+        // entry that is guaranteed to be after the desired entry. For example, if you have a
+        // compound key (a, b) where b is a, integer, you may seek to the end of the elements that
+        // have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not
+        // exist in the database. So need to check here whether the next value actually belongs to
+        // the set being returned by the iterator before advancing.
+        byte[] nextKey = it.peekNext().getKey();
+        if (compare(nextKey, indexKeyPrefix) <= 0) {
+          it.next();
+        }
+      }
+    }
+    this.end = end;
+
+    if (params.skip > 0) {
+      skip(params.skip);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!checkedNext && !closed) {
+      next = loadNext();
+      checkedNext = true;
+    }
+    if (!closed && next == null) {
+      try {
+        close();
+      } catch (IOException ioe) {
+        throw Throwables.propagate(ioe);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    checkedNext = false;
+
+    try {
+      T ret;
+      if (index == null || index.isCopy()) {
+        ret = db.serializer.deserialize(next, type);
+      } else {
+        byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next);
+        ret = db.get(key, type);
+      }
+      next = null;
+      return ret;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<T> next(int max) {
+    List<T> list = new ArrayList<>(max);
+    while (hasNext() && list.size() < max) {
+      list.add(next());
+    }
+    return list;
+  }
+
+  @Override
+  public boolean skip(long n) {
+    long skipped = 0;
+    while (skipped < n) {
+      if (next != null) {
+        checkedNext = false;
+        next = null;
+        skipped++;
+        continue;
+      }
+
+      boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+      if (!hasNext) {
+        checkedNext = true;
+        return false;
+      }
+
+      Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
+      if (!isEndMarker(e.getKey())) {
+        skipped++;
+      }
+    }
+
+    return hasNext();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (!closed) {
+      it.close();
+      closed = true;
+    }
+  }
+
+  private byte[] loadNext() {
+    if (count >= max) {
+      return null;
+    }
+
+    try {
+      while (true) {
+        boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+        if (!hasNext) {
+          return null;
+        }
+
+        Map.Entry<byte[], byte[]> nextEntry;
+        try {
+          // Avoid races if another thread is updating the DB.
+          nextEntry = ascending ? it.next() : it.prev();
+        } catch (NoSuchElementException e) {
+          return null;
+        }
+
+        byte[] nextKey = nextEntry.getKey();
+        // Next key is not part of the index, stop.
+        if (!startsWith(nextKey, indexKeyPrefix)) {
+          return null;
+        }
+
+        // If the next key is an end marker, then skip it.
+        if (isEndMarker(nextKey)) {
+          continue;
+        }
+
+        // If there's a known end key and iteration has gone past it, stop.
+        if (end != null) {
+          int comp = compare(nextKey, end) * (ascending ? 1 : -1);
+          if (comp > 0) {
+            return null;
+          }
+        }
+
+        count++;
+
+        // Next element is part of the iteration, return it.
+        return nextEntry.getValue();
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @VisibleForTesting
+  static boolean startsWith(byte[] key, byte[] prefix) {
+    if (key.length < prefix.length) {
+      return false;
+    }
+
+    for (int i = 0; i < prefix.length; i++) {
+      if (key[i] != prefix[i]) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean isEndMarker(byte[] key) {
+    return (key.length > 2 &&
+        key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
+        key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
+  }
+
+  static int compare(byte[] a, byte[] b) {
+    int diff = 0;
+    int minLen = Math.min(a.length, b.length);
+    for (int i = 0; i < minLen; i++) {
+      diff += (a[i] - b[i]);
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return a.length - b.length;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
new file mode 100644
index 0000000..3ab17db
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * <p>
+ * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * </p>
+ *
+ * <p>
+ * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the hierarchy, with each
+ * component separated by a NULL byte.
+ * </p>
+ *
+ * <pre>
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ *     +NATURAL_KEY
+ *     -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ *     +INDEX_VALUE
+ *       +NATURAL_KEY
+ *     -INDEX_VALUE
+ *     .INDEX_VALUE
+ *       CHILD_INDEX_NAME
+ *         +CHILD_INDEX_VALUE
+ *           NATURAL_KEY_OR_DATA
+ *         -
+ *   -INDEX_NAME
+ * </pre>
+ *
+ * <p>
+ * Entity data (either the entity's natural key or a copy of the data) is stored in all keys
+ * that end with "+<something>". A count of all objects that match a particular top-level index
+ * value is kept at the end marker ("-<something>"). A count is also kept at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a particular type.
+ * </p>
+ *
+ * <p>
+ * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd
+ * have these keys and values in the store for two instances, one with natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * </p>
+ *
+ * <pre>
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -       [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -       [count of all Foo with "bar=yes" ]
+ * </pre>
+ *
+ * <p>
+ * Note that all indexed values are prepended with "+", even if the index itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB
+ * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part
+ * of the full LevelDB key is generally referred to as the "index value" of the entity.
+ * </p>
+ *
+ * <p>
+ * Child indices are stored after their parent index. In the example above, let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have value "no" for this field,
+ * the data in the store would look something like the following:
+ * </p>
+ *
+ * <pre>
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index type]
+ * ...
+ * </pre>
+ */
+class LevelDBTypeInfo {
+
+  static final byte[] END_MARKER = new byte[] { '-' };
+  static final byte ENTRY_PREFIX = (byte) '+';
+  static final byte KEY_SEPARATOR = 0x0;
+  static byte TRUE = (byte) '1';
+  static byte FALSE = (byte) '0';
+
+  private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
+  private static final byte POSITIVE_MARKER = (byte) '=';
+  private static final byte NEGATIVE_MARKER = (byte) '*';
+  private static final byte[] HEX_BYTES = new byte[] {
+    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
+  };
+
+  private final LevelDB db;
+  private final Class<?> type;
+  private final Map<String, Index> indices;
+  private final byte[] typePrefix;
+
+  LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
+    this.db = db;
+    this.type = type;
+    this.indices = new HashMap<>();
+
+    KVTypeInfo ti = new KVTypeInfo(type);
+
+    // First create the parent indices, then the child indices.
+    ti.indices().forEach(idx -> {
+      if (idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
+      }
+    });
+    ti.indices().forEach(idx -> {
+      if (!idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
+          indices.get(idx.parent())));
+      }
+    });
+
+    this.typePrefix = alias;
+  }
+
+  Class<?> type() {
+    return type;
+  }
+
+  byte[] keyPrefix() {
+    return typePrefix;
+  }
+
+  Index naturalIndex() {
+    return index(KVIndex.NATURAL_INDEX_NAME);
+  }
+
+  Index index(String name) {
+    Index i = indices.get(name);
+    Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name,
+      type.getName());
+    return i;
+  }
+
+  Collection<Index> indices() {
+    return indices.values();
+  }
+
+  byte[] buildKey(byte[]... components) {
+    return buildKey(true, components);
+  }
+
+  byte[] buildKey(boolean addTypePrefix, byte[]... components) {
+    int len = 0;
+    if (addTypePrefix) {
+      len += typePrefix.length + 1;
+    }
+    for (byte[] comp : components) {
+      len += comp.length;
+    }
+    len += components.length - 1;
+
+    byte[] dest = new byte[len];
+    int written = 0;
+
+    if (addTypePrefix) {
+      System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
+      dest[typePrefix.length] = KEY_SEPARATOR;
+      written += typePrefix.length + 1;
+    }
+
+    for (byte[] comp : components) {
+      System.arraycopy(comp, 0, dest, written, comp.length);
+      written += comp.length;
+      if (written < dest.length) {
+        dest[written] = KEY_SEPARATOR;
+        written++;
+      }
+    }
+
+    return dest;
+  }
+
+  /**
+   * Models a single index in LevelDB. See top-level class's javadoc for a description of how the
+   * keys are generated.
+   */
+  class Index {
+
+    private final boolean copy;
+    private final boolean isNatural;
+    private final byte[] name;
+    private final KVTypeInfo.Accessor accessor;
+    private final Index parent;
+
+    private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
+      byte[] name = self.value().getBytes(UTF_8);
+      if (parent != null) {
+        byte[] child = new byte[name.length + 1];
+        child[0] = SECONDARY_IDX_PREFIX;
+        System.arraycopy(name, 0, child, 1, name.length);
+      }
+
+      this.name = name;
+      this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
+      this.copy = isNatural || self.copy();
+      this.accessor = accessor;
+      this.parent = parent;
+    }
+
+    boolean isCopy() {
+      return copy;
+    }
+
+    boolean isChild() {
+      return parent != null;
+    }
+
+    Index parent() {
+      return parent;
+    }
+
+    /**
+     * Creates a key prefix for child indices of this index. This allows the prefix to be
+     * calculated only once, avoiding redundant work when multiple child indices of the
+     * same parent index exist.
+     */
+    byte[] childPrefix(Object value) throws Exception {
+      Preconditions.checkState(parent == null, "Not a parent index.");
+      return buildKey(name, toParentKey(value));
+    }
+
+    /**
+     * Gets the index value for a particular entity (which is the value of the field or method
+     * tagged with the index annotation). This is used as part of the LevelDB key where the
+     * entity (or its id) is stored.
+     */
+    Object getValue(Object entity) throws Exception {
+      return accessor.get(entity);
+    }
+
+    private void checkParent(byte[] prefix) {
+      if (prefix != null) {
+        Preconditions.checkState(parent != null, "Parent prefix provided for parent index.");
+      } else {
+        Preconditions.checkState(parent == null, "Parent prefix missing for child index.");
+      }
+    }
+
+    /** The prefix for all keys that belong to this index. */
+    byte[] keyPrefix(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
+    }
+
+    /**
+     * The key where to start ascending iteration for entities whose value for the indexed field
+     * match the given value.
+     */
+    byte[] start(byte[] prefix, Object value) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value))
+        : buildKey(name, toKey(value));
+    }
+
+    /** The key for the index's end marker. */
+    byte[] end(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
+        : buildKey(name, END_MARKER);
+    }
+
+    /** The key for the end marker for entries with the given value. */
+    byte[] end(byte[] prefix, Object value) throws Exception {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER)
+        : buildKey(name, toKey(value), END_MARKER);
+    }
+
+    /** The full key in the index that identifies the given entity. */
+    byte[] entityKey(byte[] prefix, Object entity) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
+        name, type.getName());
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity)));
+      }
+      return entityKey;
+    }
+
+    private void updateCount(WriteBatch batch, byte[] key, long delta) throws Exception {
+      long updated = getCount(key) + delta;
+      if (updated > 0) {
+        batch.put(key, db.serializer.serialize(updated));
+      } else {
+        batch.delete(key);
+      }
+    }
+
+    private void addOrRemove(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
+        name, type.getName());
+
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, naturalKey);
+      }
+
+      boolean needCountUpdate = (existing == null);
+
+      // Check whether there's a need to update the index. The index needs to be updated in two
+      // cases:
+      //
+      // - There is no existing value for the entity, so a new index value will be added.
+      // - If there is a previously stored value for the entity, and the index value for the
+      //   current index does not match the new value, the old entry needs to be deleted and
+      //   the new one added.
+      //
+      // Natural indices don't need to be checked, because by definition both old and new entities
+      // will have the same key. The put() call is all that's needed in that case.
+      //
+      // Also check whether we need to update the counts. If the indexed value is changing, we
+      // need to decrement the count at the old index value, and the new indexed value count needs
+      // to be incremented.
+      if (existing != null && !isNatural) {
+        byte[] oldPrefix = null;
+        Object oldIndexedValue = getValue(existing);
+        boolean removeExisting = !indexValue.equals(oldIndexedValue);
+        if (!removeExisting && isChild()) {
+          oldPrefix = parent().childPrefix(parent().getValue(existing));
+          removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0;
+        }
+
+        if (removeExisting) {
+          if (oldPrefix == null && isChild()) {
+            oldPrefix = parent().childPrefix(parent().getValue(existing));
+          }
+
+          byte[] oldKey = entityKey(oldPrefix, existing);
+          batch.delete(oldKey);
+
+          // If the indexed value has changed, we need to update the counts at the old and new
+          // end markers for the indexed value.
+          if (!isChild()) {
+            byte[] oldCountKey = end(null, oldIndexedValue);
+            updateCount(batch, oldCountKey, -1L);
+            needCountUpdate = true;
+          }
+        }
+      }
+
+      if (data != null) {
+        byte[] stored = copy ? data : naturalKey;
+        batch.put(entityKey, stored);
+      } else {
+        batch.delete(entityKey);
+      }
+
+      if (needCountUpdate && !isChild()) {
+        long delta = data != null ? 1L : -1L;
+        byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
+        updateCount(batch, countKey, delta);
+      }
+    }
+
+    /**
+     * Add an entry to the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being added to the index.
+     * @param existing The entity being replaced in the index, or null.
+     * @param data Serialized entity to store (when storing the entity, not a reference).
+     * @param naturalKey The value's natural key (to avoid re-computing it for every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void add(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, existing, data, naturalKey, prefix);
+    }
+
+    /**
+     * Remove a value from the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being removed, to identify the index entry to modify.
+     * @param naturalKey The value's natural key (to avoid re-computing it for every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void remove(
+        WriteBatch batch,
+        Object entity,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, null, null, naturalKey, prefix);
+    }
+
+    long getCount(byte[] key) throws Exception {
+      byte[] data = db.db().get(key);
+      return data != null ? db.serializer.deserializeLong(data) : 0;
+    }
+
+    byte[] toParentKey(Object value) {
+      return toKey(value, SECONDARY_IDX_PREFIX);
+    }
+
+    byte[] toKey(Object value) {
+      return toKey(value, ENTRY_PREFIX);
+    }
+
+    /**
+     * Translates a value to be used as part of the store key.
+     *
+     * Integral numbers are encoded as a string in a way that preserves lexicographical
+     * ordering. The string is prepended with a marker telling whether the number is negative
+     * or positive ("*" for negative and "=" for positive are used since "-" and "+" have the
+     * opposite of the desired order), and then the number is encoded into a hex string (so
+     * it occupies twice the number of bytes as the original type).
+     *
+     * Arrays are encoded by encoding each element separately, separated by KEY_SEPARATOR.
+     */
+    byte[] toKey(Object value, byte prefix) {
+      final byte[] result;
+
+      if (value instanceof String) {
+        byte[] str = ((String) value).getBytes(UTF_8);
+        result = new byte[str.length + 1];
+        result[0] = prefix;
+        System.arraycopy(str, 0, result, 1, str.length);
+      } else if (value instanceof Boolean) {
+        result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
+      } else if (value.getClass().isArray()) {
+        int length = Array.getLength(value);
+        byte[][] components = new byte[length][];
+        for (int i = 0; i < length; i++) {
+          components[i] = toKey(Array.get(value, i));
+        }
+        result = buildKey(false, components);
+      } else {
+        int bytes;
+
+        if (value instanceof Integer) {
+          bytes = Integer.SIZE;
+        } else if (value instanceof Long) {
+          bytes = Long.SIZE;
+        } else if (value instanceof Short) {
+          bytes = Short.SIZE;
+        } else if (value instanceof Byte) {
+          bytes = Byte.SIZE;
+        } else {
+          throw new IllegalArgumentException(String.format("Type %s not allowed as key.",
+            value.getClass().getName()));
+        }
+
+        bytes = bytes / Byte.SIZE;
+
+        byte[] key = new byte[bytes * 2 + 2];
+        long longValue = ((Number) value).longValue();
+        key[0] = prefix;
+        key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
+
+        for (int i = 0; i < key.length - 2; i++) {
+          int masked = (int) ((longValue >>> (4 * i)) & 0xF);
+          key[key.length - i - 1] = HEX_BYTES[masked];
+        }
+
+        result = key;
+      }
+
+      return result;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
new file mode 100644
index 0000000..2ed246e
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when the store implementation is not compatible with the underlying data.
+ */
+public class UnsupportedStoreVersionException extends IOException {
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
new file mode 100644
index 0000000..afb72b8
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import com.google.common.base.Objects;
+
+public class CustomType1 {
+
+  @KVIndex
+  public String key;
+
+  @KVIndex("id")
+  public String id;
+
+  @KVIndex(value = "name", copy = true)
+  public String name;
+
+  @KVIndex("int")
+  public int num;
+
+  @KVIndex(value = "child", parent = "id")
+  public String child;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof CustomType1) {
+      CustomType1 other = (CustomType1) o;
+      return id.equals(other.id) && name.equals(other.name);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("key", key)
+      .add("id", id)
+      .add("name", name)
+      .add("num", num)
+      .toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
new file mode 100644
index 0000000..8549712
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DBIteratorSuite.class);
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List<CustomType1> allEntries;
+  private static List<CustomType1> clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator<CustomType1> {
+    /**
+     * Returns a comparator that falls back to natural order if this comparator's ordering
+     * returns equality for two elements. Used to mimic how the index sorts things internally.
+     */
+    default BaseComparator fallback() {
+      return (t1, t2) -> {
+        int result = BaseComparator.this.compare(t1, t2);
+        if (result != 0) {
+          return result;
+        }
+
+        return t1.key.compareTo(t2.key);
+      };
+    }
+
+    /** Reverses the order of this comparator. */
+    default BaseComparator reverse() {
+      return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+    }
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, before all tests are
+   * run. Any state can be safely stored in static variables and cleaned up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @BeforeClass
+  public static void setupClass() {
+    long seed = RND.nextLong();
+    LOG.info("Random seed: {}", seed);
+    RND.setSeed(seed);
+  }
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+    allEntries = null;
+    db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    if (db != null) {
+      return;
+    }
+
+    db = createStore();
+
+    int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+    allEntries = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "key" + i;
+      t.id = "id" + i;
+      t.name = "name" + RND.nextInt(MAX_ENTRIES);
+      t.num = RND.nextInt(MAX_ENTRIES);
+      t.child = "child" + (i % MIN_ENTRIES);
+      allEntries.add(t);
+    }
+
+    // Shuffle the entries to avoid the insertion order matching the natural ordering. Just in case.
+    Collections.shuffle(allEntries, RND);
+    for (CustomType1 e : allEntries) {
+      db.write(e);
+    }
+
+    // Pick the first generated value, and forcefully create a few entries that will clash
+    // with the indexed values (id and name), to make sure the index behaves correctly when
+    // multiple entities are indexed by the same value.
+    //
+    // This also serves as a test for the test code itself, to make sure it's sorting indices
+    // the same way the store is expected to.
+    CustomType1 first = allEntries.get(0);
+    clashingEntries = new ArrayList<>();
+
+    int clashCount = RND.nextInt(MIN_ENTRIES) + 1;
+    for (int i = 0; i < clashCount; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "n-key" + (count + i);
+      t.id = first.id;
+      t.name = first.name;
+      t.num = first.num;
+      t.child = first.child;
+      allEntries.add(t);
+      clashingEntries.add(t);
+      db.write(t);
+    }
+
+    // Create another entry that could cause problems: take the first entry, and make its indexed
+    // name be an extension of the existing ones, to make sure the implementation sorts these
+    // correctly even considering the separator character (shorter strings first).
+    CustomType1 t = new CustomType1();
+    t.key = "extended-key-0";
+    t.id = first.id;
+    t.name = first.name + "a";
+    t.num = first.num;
+    t.child = first.child;
+    allEntries.add(t);
+    db.write(t);
+  }
+
+  @Test
+  public void naturalIndex() throws Exception {
+    testIteration(NATURAL_ORDER, view(), null, null);
+  }
+
+  @Test
+  public void refIndex() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
+  }
+
+  @Test
+  public void copyIndex() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
+  }
+
+  @Test
+  public void numericIndex() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
+  }
+
+  @Test
+  public void childIndex() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id), null, null);
+  }
+
+  @Test
+  public void naturalIndexDescending() throws Exception {
+    testIteration(NATURAL_ORDER, view().reverse(), null, null);
+  }
+
+  @Test
+  public void refIndexDescending() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
+  }
+
+  @Test
+  public void copyIndexDescending() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, null);
+  }
+
+  @Test
+  public void numericIndexDescending() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, null);
+  }
+
+  @Test
+  public void childIndexDescending() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).reverse(), null, null);
+  }
+
+  @Test
+  public void naturalIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().first(first.key), first, null);
+  }
+
+  @Test
+  public void refIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, null);
+  }
+
+  @Test
+  public void copyIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first, null);
+  }
+
+  @Test
+  public void numericIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first, null);
+  }
+
+  @Test
+  public void childIndexWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).first(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, null);
+  }
+
+  @Test
+  public void refIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first, null);
+  }
+
+  @Test
+  public void copyIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().reverse().index("name").first(first.name), first, null);
+  }
+
+  @Test
+  public void numericIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").first(first.num), first, null);
+  }
+
+  @Test
+  public void childIndexDescendingWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER,
+      view().index("child").parent(any.id).first(any.child).reverse(), null, null);
+  }
+
+  @Test
+  public void naturalIndexWithSkip() throws Exception {
+    testIteration(NATURAL_ORDER, view().skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void refIndexWithSkip() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void copyIndexWithSkip() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void childIndexWithSkip() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).skip(pickCount()),
+      null, null);
+  }
+
+  @Test
+  public void naturalIndexWithMax() throws Exception {
+    testIteration(NATURAL_ORDER, view().max(pickCount()), null, null);
+  }
+
+  @Test
+  public void copyIndexWithMax() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").max(pickCount()), null, null);
+  }
+
+  @Test
+  public void childIndexWithMax() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).max(pickCount()), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, last);
+  }
+
+  @Test
+  public void copyIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), null, last);
+  }
+
+  @Test
+  public void numericIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), null, last);
+  }
+
+  @Test
+  public void childIndexWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).last(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), null, last);
+  }
+
+  @Test
+  public void copyIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().reverse().index("name").last(last.name),
+      null, last);
+  }
+
+  @Test
+  public void numericIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").last(last.num),
+      null, last);
+   }
+
+  @Test
+  public void childIndexDescendingWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).last(any.child).reverse(),
+      null, null);
+  }
+
+  @Test
+  public void testRefWithIntNaturalKey() throws Exception {
+    LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
+    i.key = 1;
+    i.id = "1";
+    i.values = Arrays.asList("1");
+
+    db.write(i);
+
+    try(KVStoreIterator<?> it = db.view(i.getClass()).closeableIterator()) {
+      Object read = it.next();
+      assertEquals(i, read);
+    }
+  }
+
+  private CustomType1 pickLimit() {
+    // Picks an element that has clashes with other elements in the given index.
+    return clashingEntries.get(RND.nextInt(clashingEntries.size()));
+  }
+
+  private int pickCount() {
+    int count = RND.nextInt(allEntries.size() / 2);
+    return Math.max(count, 1);
+  }
+
+  /**
+   * Compares the two values and falls back to comparing the natural key of CustomType1
+   * if they're the same, to mimic the behavior of the indexing code.
+   */
+  private <T extends Comparable<T>> int compareWithFallback(
+      T v1,
+      T v2,
+      CustomType1 ct1,
+      CustomType1 ct2) {
+    int result = v1.compareTo(v2);
+    if (result != 0) {
+      return result;
+    }
+
+    return ct1.key.compareTo(ct2.key);
+  }
+
+  private void testIteration(
+      final BaseComparator order,
+      final KVStoreView<CustomType1> params,
+      final CustomType1 first,
+      final CustomType1 last) throws Exception {
+    List<CustomType1> indexOrder = sortBy(order.fallback());
+    if (!params.ascending) {
+      indexOrder = Lists.reverse(indexOrder);
+    }
+
+    Iterable<CustomType1> expected = indexOrder;
+    BaseComparator expectedOrder = params.ascending ? order : order.reverse();
+
+    if (params.parent != null) {
+      expected = Iterables.filter(expected, v -> params.parent.equals(v.id));
+    }
+
+    if (first != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(first, v) <= 0);
+    }
+
+    if (last != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(v, last) <= 0);
+    }
+
+    if (params.skip > 0) {
+      expected = Iterables.skip(expected, (int) params.skip);
+    }
+
+    if (params.max != Long.MAX_VALUE) {
+      expected = Iterables.limit(expected, (int) params.max);
+    }
+
+    List<CustomType1> actual = collect(params);
+    compareLists(expected, actual);
+  }
+
+  /** Could use assertEquals(), but that creates hard to read errors for large lists. */
+  private void compareLists(Iterable<?> expected, List<?> actual) {
+    Iterator<?> expectedIt = expected.iterator();
+    Iterator<?> actualIt = actual.iterator();
+
+    int count = 0;
+    while (expectedIt.hasNext()) {
+      if (!actualIt.hasNext()) {
+        break;
+      }
+      count++;
+      assertEquals(expectedIt.next(), actualIt.next());
+    }
+
+    String message;
+    Object[] remaining;
+    int expectedCount = count;
+    int actualCount = count;
+
+    if (expectedIt.hasNext()) {
+      remaining = Iterators.toArray(expectedIt, Object.class);
+      expectedCount += remaining.length;
+      message = "missing";
+    } else {
+      remaining = Iterators.toArray(actualIt, Object.class);
+      actualCount += remaining.length;
+      message = "stray";
+    }
+
+    assertEquals(String.format("Found %s elements: %s", message, Arrays.asList(remaining)),
+      expectedCount, actualCount);
+  }
+
+  private KVStoreView<CustomType1> view() throws Exception {
+    return db.view(CustomType1.class);
+  }
+
+  private List<CustomType1> collect(KVStoreView<CustomType1> view) throws Exception {
+    return Arrays.asList(Iterables.toArray(view, CustomType1.class));
+  }
+
+  private List<CustomType1> sortBy(Comparator<CustomType1> comp) {
+    List<CustomType1> copy = new ArrayList<>(allEntries);
+    Collections.sort(copy, comp);
+    return copy;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org