You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/05/07 16:51:39 UTC

samza git commit: SAMZA-1691: Support get iterable from KeyValueStore

Repository: samza
Updated Branches:
  refs/heads/master 2e461a880 -> fa49e7228


SAMZA-1691: Support get iterable from KeyValueStore

Right now for KeyValueStore we have a range query to return an iterator. For usage in BEAM, we need a iterable which will 1) create the snapshot when called, and 2) create an iterator when needed. Add the iterate() function in KeyValueStore to support it. It's implemented as follows:

1) for rocksDb, it will create the iterator when it's called, which will has a snapshot of the elements. Then every time when the iterator is needed, we will seek the iterator from beginning;

2) for inMemoryDb, it will create the snapshot submap when iterate() is called. The submap is an iterable and it can return a new iterator when needed.

Author: xinyuiscool <xi...@linkedin.com>

Reviewers: Boris S <sb...@apache.org>

Closes #492 from xinyuiscool/SAMZA-1691


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa49e722
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa49e722
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa49e722

Branch: refs/heads/master
Commit: fa49e7228319e845bb81336b97af8e28e04524c9
Parents: 2e461a8
Author: xinyuiscool <xi...@linkedin.com>
Authored: Mon May 7 09:51:28 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Mon May 7 09:51:28 2018 -0700

----------------------------------------------------------------------
 .../samza/storage/kv/KeyValueIterable.java      | 24 ++++++
 .../apache/samza/storage/kv/KeyValueStore.java  | 14 +++
 .../operators/util/InternalInMemoryStore.java   |  6 ++
 .../operators/impl/store/TestInMemoryStore.java | 12 +++
 .../kv/inmemory/InMemoryKeyValueStore.scala     | 12 ++-
 .../kv/inmemory/TestInMemoryKeyValueStore.java  | 81 ++++++++++++++++++
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 33 +++++++
 .../kv/TestRocksDbKeyValueStoreJava.java        | 90 ++++++++++++++++++++
 .../samza/storage/kv/AccessLoggedStore.scala    |  9 +-
 .../apache/samza/storage/kv/CachedStore.scala   |  4 +
 .../storage/kv/KeyValueStorageEngine.scala      |  7 ++
 .../kv/KeyValueStorageEngineMetrics.scala       |  2 +
 .../apache/samza/storage/kv/LoggedStore.scala   |  3 +
 .../storage/kv/NullSafeKeyValueStore.scala      |  6 ++
 .../storage/kv/SerializedKeyValueStore.scala    | 11 +++
 .../samza/storage/kv/MockKeyValueStore.scala    |  4 +
 16 files changed, 316 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
new file mode 100644
index 0000000..8fd00ed
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+public interface KeyValueIterable<K, V> extends Iterable<Entry<K, V>> {
+  KeyValueIterator<K, V> iterator();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 18a89ec..3f216bd 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -111,6 +111,20 @@ public interface KeyValueStore<K, V> {
   KeyValueIterator<K, V> range(K from, K to);
 
   /**
+   * Returns an iterable for a sorted range of entries specified by [{@code from}, {@code to}).
+   * Note that we snapshot the iterator when the iterable is created from this function, and
+   * the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time.
+   *
+   * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
+   * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
+   * @return an iterable for the specified key range.
+   * @throws NullPointerException if null is used for {@code from} or {@code to}.
+   */
+  default KeyValueIterable<K, V> iterate(K from, K to) {
+    return () -> range(from, to);
+  }
+
+  /**
    * Returns an iterator for all entries in this key-value store.
    *
    * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p>

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index b8672c6..7360474 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -20,6 +20,7 @@
 package org.apache.samza.operators.util;
 
 import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterable;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 
@@ -94,6 +95,11 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
   }
 
   @Override
+  public KeyValueIterable<K, V> iterate(K from, K to) {
+    throw new UnsupportedOperationException("iterate() is not supported in " + InternalInMemoryStore.class.getName());
+  }
+
+  @Override
   public KeyValueIterator<K, V> all() {
     final Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator();
     return new KeyValueIterator<K, V>() {

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index d16954d..e331703 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl.store;
 import com.google.common.primitives.UnsignedBytes;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterable;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 
@@ -99,6 +100,17 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
   }
 
   @Override
+  public KeyValueIterable<K, V> iterate(K from, K to) {
+    final ConcurrentNavigableMap<byte[], byte[]> values = map.subMap(keySerde.toBytes(from), keySerde.toBytes(to));
+    return new KeyValueIterable<K, V>() {
+      @Override
+      public KeyValueIterator<K, V> iterator() {
+        return new InMemoryIterator<>(values.entrySet().iterator(), keySerde, valSerde);
+      }
+    };
+  }
+
+  @Override
   public KeyValueIterator<K, V> all() {
     return new InMemoryIterator(map.entrySet().iterator(), keySerde, valSerde);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 7b83163..decaee0 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -20,7 +20,7 @@ package org.apache.samza.storage.kv.inmemory
 
 import com.google.common.primitives.UnsignedBytes
 import org.apache.samza.util.Logging
-import org.apache.samza.storage.kv.{KeyValueStoreMetrics, KeyValueIterator, Entry, KeyValueStore}
+import org.apache.samza.storage.kv._
 import java.util
 
 /**
@@ -112,4 +112,14 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
     }
     found
   }
+
+  override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
+    // snapshot the iterable
+    val entries = underlying.subMap(from, to).entrySet()
+    new KeyValueIterable[Array[Byte], Array[Byte]] {
+      override def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+        new InMemoryIterator(entries.iterator())
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
new file mode 100644
index 0000000..0fa5807
--- /dev/null
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory;
+
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestInMemoryKeyValueStore {
+  @Test
+  public void testIterate() throws Exception {
+    InMemoryKeyValueStore store = new InMemoryKeyValueStore(
+        new KeyValueStoreMetrics("testInMemory", new MetricsRegistryMap()));
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    String prefix = "prefix";
+    for(int i = 0; i < 100; i++) {
+      store.put(genKey(outputStream, prefix, i), genValue());
+    }
+
+    byte[] firstKey = genKey(outputStream, prefix, 0);
+    byte[] lastKey = genKey(outputStream, prefix, 100);
+    KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+    // Make sure the cached Iterable won't change when new elements are added
+    store.put(genKey(outputStream, prefix, 200), genValue());
+    assertTrue(Iterators.size(iterable.iterator()) == 100);
+
+    List<Integer> keys = new ArrayList<>();
+    for (Entry<byte[], byte[]> entry : iterable) {
+      int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
+      keys.add(key);
+    }
+    assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList()));
+
+    outputStream.close();
+    store.close();
+  }
+
+  private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
+    outputStream.reset();
+    outputStream.write(prefix.getBytes());
+    outputStream.write(Ints.toByteArray(i));
+    return outputStream.toByteArray();
+  }
+
+  private byte[] genValue() {
+    int randomVal = ThreadLocalRandom.current().nextInt(0, 100000);
+    return Ints.toByteArray(randomVal);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 856cc4e..06fb584 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
+import java.util
 import java.util.Comparator
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -203,6 +204,30 @@ class RocksDbKeyValueStore(
     new RocksDbIterator(iter)
   }
 
+  def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
+    //snapshot the iterator
+    val snapshotIter : RocksDbRangeIterator = range(from, to).asInstanceOf[RocksDbRangeIterator]
+    new KeyValueIterable[Array[Byte], Array[Byte]] {
+      var iter:RocksDbRangeIterator = null
+
+      def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+        this.synchronized {
+          if (iter == null) {
+            iter = snapshotIter
+            iter
+          } else if(iter.isOpen() && !iter.hasNext()) {
+            // use the cached iterator and reset the position to the beginning
+            iter.seek(from)
+            iter
+          } else {
+            // we need to create a new iterator since the cached one is still in use or already closed
+            range(from, to)
+          }
+        }
+      }
+    }
+  }
+
   def flush(): Unit = ifOpen {
     metrics.flushes.inc
     trace("Flushing store: %s" format storeName)
@@ -248,6 +273,10 @@ class RocksDbKeyValueStore(
       iter.close()
     }
 
+    def isOpen() = ifOpen {
+      open
+    }
+
     override def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove")
 
     override def hasNext() = ifOpen(iter.isValid)
@@ -301,6 +330,10 @@ class RocksDbKeyValueStore(
     override def hasNext() = ifOpen {
       super.hasNext() && comparator.compare(peekKey(), to) < 0
     }
+
+    def seek(key: Array[Byte]) = {
+      iter.seek(key)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
new file mode 100644
index 0000000..98688c6
--- /dev/null
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.junit.Test;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestRocksDbKeyValueStoreJava {
+  @Test
+  public void testIterate() throws Exception {
+    Config config = new MapConfig();
+    Options options = new Options();
+    options.setCreateIfMissing(true);
+
+    File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis());
+    RocksDbKeyValueStore store = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore",
+        new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap()));
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    String prefix = "prefix";
+    for(int i = 0; i < 100; i++) {
+      store.put(genKey(outputStream, prefix, i), genValue());
+    }
+
+    byte[] firstKey = genKey(outputStream, prefix, 0);
+    byte[] lastKey = genKey(outputStream, prefix, 1000);
+    KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+    // Make sure the cached Iterable won't change when new elements are added
+    store.put(genKey(outputStream, prefix, 200), genValue());
+    assertTrue(Iterators.size(iterable.iterator()) == 100);
+
+    List<Integer> keys = new ArrayList<>();
+    for (Entry<byte[], byte[]> entry : iterable) {
+      int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
+      keys.add(key);
+    }
+    assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList()));
+
+    outputStream.close();
+    store.close();
+  }
+
+  private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
+    outputStream.reset();
+    outputStream.write(prefix.getBytes());
+    outputStream.write(Ints.toByteArray(i));
+    return outputStream.toByteArray();
+  }
+
+  private byte[] genValue() {
+    int randomVal = ThreadLocalRandom.current().nextInt(0, 100000);
+    return Ints.toByteArray(randomVal);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index 879a144..67fd011 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -41,6 +41,7 @@ class AccessLoggedStore[K, V](
     val WRITE = 2
     val DELETE = 3
     val RANGE = 4
+    val ITERATE = 5
   }
 
   val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream)
@@ -91,6 +92,13 @@ class AccessLoggedStore[K, V](
     store.all()
   }
 
+  def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+    val list : util.ArrayList[K] = new util.ArrayList[K]()
+    list.add(from)
+    list.add(to)
+    logAccess(DBOperation.ITERATE, serializeKeys(list), store.iterate(from, to))
+  }
+
   def close(): Unit = {
     trace("Closing accessLogged store.")
 
@@ -151,5 +159,4 @@ class AccessLoggedStore[K, V](
     val bytes = keySerde.toBytes(key)
     bytes
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index d40999a..29efacb 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -286,6 +286,10 @@ class CachedStore[K, V](
   }
 
   def hasArrayKeys = containsArrayKeys
+
+  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+    store.iterate(from, to)
+  }
 }
 
 private case class CacheEntry[K, V](var value: V, var dirty: mutable.DoubleLinkedList[K])

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 5f7bbd8..b055ca5 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -160,4 +160,11 @@ class KeyValueStorageEngine[K, V](
   }
 
   override def getStoreProperties: StoreProperties = storeProperties
+
+  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+    updateTimer(metrics.iterateNs) {
+      metrics.iterates.inc
+      wrapperStore.iterate(from, to)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 8c42c7c..4162292 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -33,6 +33,7 @@ class KeyValueStorageEngineMetrics(
   val puts = newCounter("puts")
   val deletes = newCounter("deletes")
   val flushes = newCounter("flushes")
+  val iterates = newCounter("iterates")
 
   val restoredMessages = newCounter("messages-restored") //Deprecated
   val restoredMessagesGauge = newGauge("restored-messages", 0)
@@ -47,6 +48,7 @@ class KeyValueStorageEngineMetrics(
   val flushNs = newTimer("flush-ns")
   val allNs = newTimer("all-ns")
   val rangeNs = newTimer("range-ns")
+  val iterateNs = newTimer("iterate-ns")
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index e0c7a31..0d013f8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -114,4 +114,7 @@ class LoggedStore[K, V](
     store.close
   }
 
+  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+    store.iterate(from, to)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 7adffa9..1978710 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -89,4 +89,10 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
       throw new NullPointerException(msg)
     }
   }
+
+  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+    notNull(from, NullKeyErrorMessage)
+    notNull(to, NullKeyErrorMessage)
+    store.iterate(from, to)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 16dd980..5f59143 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -148,4 +148,15 @@ class SerializedKeyValueStore[K, V](
     }
     bytes
   }
+
+  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+    val fromBytes = toBytesOrNull(from, keySerde)
+    val toBytes = toBytesOrNull(to, keySerde)
+    val iterable = store.iterate(fromBytes, toBytes)
+    new KeyValueIterable[K, V] {
+      override def iterator(): KeyValueIterator[K, V] = {
+        new DeserializingIterator(iterable.iterator())
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index f66dc04..8affd5e 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -69,4 +69,8 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
   override def flush() {}  // no-op
 
   override def close() { kvMap.clear() }
+
+  override def iterate(from: String, to: String): KeyValueIterable[String, String] = {
+    throw new UnsupportedOperationException("iterator() not supported")
+  }
 }
\ No newline at end of file